In [2]:
%pip install influxdb_client

Collecting influxdb_client
  Downloading influxdb_client-1.42.0-py3-none-any.whl.metadata (64 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m64.8/64.8 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting reactivex>=4.0.4 (from influxdb_client)
  Downloading reactivex-4.0.4-py3-none-any.whl.metadata (5.5 kB)
Downloading influxdb_client-1.42.0-py3-none-any.whl (744 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m744.6/744.6 kB[0m [31m13.7 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading reactivex-4.0.4-py3-none-any.whl (217 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m217.8/217.8 kB[0m [31m13.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: reactivex, influxdb_client
Successfully installed influxdb_client-1.42.0 reactivex-4.0.4
Note: you may need to restart the kernel to use updated packages.


In [3]:
import os
import pytz
import pandas as pd
from datetime import datetime, timedelta
from influxdb_client import InfluxDBClient

# 디바이스 ID와 위치를 매핑
location_mapping = {
    '24e124126d152919': 'indoor',
    '24e124126d152969': 'bottom_right_corner',
    '24e124128c067999': 'indoor',
    '24e124785c389818': 'bottom_left_corner',
    '24e124785c421885': 'top_right_corner'
}

# InfluxDB 설정 정보
url = "http://133.186.144.22:8086"
token = "r3Ecro-rJQ82UpyNScnHXYDZ3KaE45AzweCXz6QIv2jeo7eOP4hL4-A9uKvkAVQDg_xavWorGUGZn7MI_sPCwg=="
org = "smoothing"

def create_client(url, token, org):
    """
    Influx DB 연결 Client를 생성합니다.
    
    :param url: InfluxDB 연결 주소
    :param token: InfluxDB 토큰
    :param org:  InfluxDB 조직
    :return: InfluxDBClient
    """
    return InfluxDBClient(url=url, token=token, org=org)

def query_to_dataframe(client, query, field = "location"):
    """
    구성된 쿼리를 실행하고 전달받은 데이터를 Dataframe으로 만듭니다.
    
    :param field: 기본값 location
    :param client: InfluxDBClient
    :param query: 요청할 쿼리
    :return: DataFrame 
    """
    result = client.query_api().query(query=query)
    results = []

    for table in result:
        for record in table.records:
            results.append({
                "time": record.get_time(),
                "value": record.get_value(),
                "place": record.values.get("place"),
                "location": record.values.get(field),
                "device": record.values.get("device")
            })

    df = pd.DataFrame(results)
    df['time'] = df['time'].astype(str).str.replace(r'\+00:00$', '', regex=True)
    return df

def save_csv(df, file_pattern, directory):

    """
    DataFrame을 CSV로 변환하여 저장합니다.
    
    :param df: DataFrame
    :param file_pattern: 파일 이름 패턴
    :param directory: 저장할 위치
    """
    # 파일 경로를 확인 하고 없다면 생성 합니다.
    if not os.path.exists(directory):
        os.makedirs(directory)

    previous_date = datetime.now() - timedelta(days=1)
    filename = f"{directory}{previous_date.strftime(file_pattern)}"
    df.to_csv(filename, index=False)

def update_location(df):
    """
    환경 센서 Data에서 Device ID를 확인 하여 'location' 열을 업데이트 합니다.
    
    :param df: 환경 센서 DataFrame
    :return: 'location' 열을 업데이트한 DataFrame
    """
    df['location'] = df['device'].map(location_mapping)
    return df

### 쿼리 사용하여 조회

In [15]:
# 클라이언트 생성 및 쿼리 실행
client = create_client(url, token, org)

# 한국 시간대 설정
korea_tz = pytz.timezone('Asia/Seoul')

# 사용자로부터 입력받은 날짜와 시간
input_date_str = "2024-05-12 00:00:00"
input_format = "%Y-%m-%d %H:%M:%S"
sensor_name = "illumination"

# 입력 날짜를 datetime 객체로 변환
input_datetime = datetime.strptime(input_date_str, input_format)

# 한국 시간대로 localize
localized_kst = korea_tz.localize(input_datetime)

# UTC로 변환
start_time_utc = localized_kst.astimezone(pytz.utc)
end_time_utc = start_time_utc + timedelta(days=1)  # 24시간 후

query_data = f'''
import "experimental"
from(bucket: "environmentalsensors_data")
  |> range(start: {start_time_utc.strftime('%Y-%m-%dT%H:%M:%SZ')}, stop: {end_time_utc.strftime('%Y-%m-%dT%H:%M:%SZ')})
  |> filter(fn: (r) => r["place"] == "class_a")
  |> filter(fn: (r) => r["measurement"] == "{sensor_name}")
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
  |> map(fn: (r) => ({{r with _time: experimental.addDuration(d: 9h, to: r._time)}}))
  |> keep(columns: ["_time", "_value", "place", "location", "device"])
'''

# CSV 변환
df_sensor_data = query_to_dataframe(client, query_data)
df_sensor_data_fix = update_location(df_sensor_data)
print(df_sensor_data_fix.head())
save_csv(df_sensor_data_fix, input_datetime.strftime("%m_%d")+"_"+sensor_name+"_data.csv", "all_data/"+sensor_name+"/")

# client 종료
client.close()

                  time      value    place location            device
0  2024-05-12 01:00:00  69.000000  class_a   indoor  24e124128c067999
1  2024-05-12 02:00:00  25.596774  class_a   indoor  24e124128c067999
2  2024-05-12 03:00:00   0.000000  class_a   indoor  24e124128c067999
3  2024-05-12 04:00:00   0.000000  class_a   indoor  24e124128c067999
4  2024-05-12 05:00:00   0.000000  class_a   indoor  24e124128c067999


In [40]:
# 클라이언트 생성 및 쿼리 실행
client = create_client(url, token, org)

# 한국 시간대 설정
korea_tz = pytz.timezone('Asia/Seoul')

# 사용자로부터 입력받은 날짜와 시간
input_date_str = "2024-05-12 00:00:00"
input_format = "%Y-%m-%d %H:%M:%S"

# 입력 날짜를 datetime 객체로 변환
input_datetime = datetime.strptime(input_date_str, input_format)

# 한국 시간대로 localize
localized_kst = korea_tz.localize(input_datetime)

# UTC로 변환
start_time_utc = localized_kst.astimezone(pytz.utc)
end_time_utc = start_time_utc + timedelta(days=1)  # 24시간 후

# 전력(W) 조회 (class a : 콘센트)
query_power_socket_data = f'''
import "experimental"
from(bucket: "powermetrics_data")
  |> range(start: {start_time_utc.strftime('%Y-%m-%dT%H:%M:%SZ')}, stop: {end_time_utc.strftime('%Y-%m-%dT%H:%M:%SZ')})
  |> filter(fn: (r) => r["phase"] == "total")
  |> filter(fn: (r) => r["description"] == "w")
  |> filter(fn: (r) => r["place"] == "office")
  |> filter(fn: (r) => r["location"] == "class_a_floor_heating_1" or r["location"] == "class_a_floor_heating_2")
  |> aggregateWindow(every: 1m, fn: last, createEmpty: false)
  |> map(fn: (r) => ({{r with _time: experimental.addDuration(d: 9h, to: r._time)}}))
  |> keep(columns: ["_time", "_value", "place", "location", "device"])
'''

# 전력 CSV 생성(콘센트)
df_power_socket = query_to_dataframe(client, query_power_socket_data)
print(df_power_socket.head(2))
save_csv(df_power_socket, input_datetime.strftime("%m_%d")+"_power_socket_data.csv", "all_data/power/socket/")

# 전력 사용량 조회 (class a : 콘센트)
query_power_usage_socket_data = f'''
import "experimental"
from(bucket: "powermetrics_data")
  |> range(start: {start_time_utc.strftime('%Y-%m-%dT%H:%M:%SZ')}, stop: {end_time_utc.strftime('%Y-%m-%dT%H:%M:%SZ')})
 |> filter(fn: (r) => r["phase"] == "kwh")
  |> filter(fn: (r) => r["place"] == "office")
  |> filter(fn: (r) => r["location"] == "class_a_floor_heating_1" or r["location"] == "class_a_floor_heating_2")
  |> filter(fn: (r) => r["description"] == "sum")
  |> aggregateWindow(every: 1m, fn: last, createEmpty: false)
  |> map(fn: (r) => ({{r with _time: experimental.addDuration(d: 9h, to: r._time)}}))
  |> keep(columns: ["_time", "_value", "place", "location", "device"])
'''

# 전력 사용량 CSV 생성(콘센트)
df_power_usage_socket = query_to_dataframe(client, query_power_usage_socket_data)
print(df_power_usage_socket.head(2))
save_csv(df_power_usage_socket, input_datetime.strftime("%m_%d")+"_power_usage_socket_data.csv", "all_data/power_usage/socket/")

# client 종료
client.close()

                  time  value   place                 location     device
0  2024-05-12 00:01:00  141.0  office  class_a_floor_heating_1  gems-3500
1  2024-05-12 00:02:00  141.0  office  class_a_floor_heating_1  gems-3500
                  time  value   place                 location     device
0  2024-05-12 00:01:00  164.6  office  class_a_floor_heating_1  gems-3500
1  2024-05-12 00:02:00  164.6  office  class_a_floor_heating_1  gems-3500


In [18]:
# 클라이언트 생성 및 쿼리 실행
client = create_client(url, token, org)

# 한국 시간대 설정
korea_tz = pytz.timezone('Asia/Seoul')

# 사용자로부터 입력받은 날짜와 시간
input_date_str = "2024-05-12 00:00:00"
input_format = "%Y-%m-%d %H:%M:%S"

# 입력 날짜를 datetime 객체로 변환
input_datetime = datetime.strptime(input_date_str, input_format)

# 한국 시간대로 localize
localized_kst = korea_tz.localize(input_datetime)

# UTC로 변환
start_time_utc = localized_kst.astimezone(pytz.utc)
end_time_utc = start_time_utc + timedelta(days=1)  # 24시간 후

# 이동 감지 카운터 조회 Flux 쿼리
query_counter_data = f'''
import "experimental"
from(bucket: "milesight")
  |> range(start: {start_time_utc.strftime('%Y-%m-%dT%H:%M:%SZ')}, stop: {end_time_utc.strftime('%Y-%m-%dT%H:%M:%SZ')})
  |> filter(fn: (r) => r["place"] == "class_a")
  |> filter(fn: (r) => r["_field"] == "line_periodic_data_1_out" or r["_field"] == "line_periodic_data_1_in")
  |> aggregateWindow(every: 1h, fn: last, createEmpty: false)
  |> map(fn: (r) => ({{r with _time: experimental.addDuration(d: 9h, to: r._time)}}))
  |> keep(columns: ["_time", "_value", "place", "_field", "device"])
'''

# 이동 감지 카운터 CSV 생성
df_counter = query_to_dataframe(client, query_counter_data, "_field")
print(df_counter.head(2))
save_csv(df_counter, input_datetime.strftime("%m_%d")+"_counter_data.csv", "all_data/counter/total/")

                  time  value    place                 location device
0  2024-05-12 01:00:00    0.0  class_a  line_periodic_data_1_in  vs133
1  2024-05-12 02:00:00    0.0  class_a  line_periodic_data_1_in  vs133
