In [2]:
%pip install influxdb_client

Note: you may need to restart the kernel to use updated packages.


In [4]:
import os
import pandas as pd
import glob
from influxdb_client import InfluxDBClient
from datetime import datetime, timedelta

# 디바이스 ID와 위치를 매핑
location_mapping = {
    '24e124126d152919': 'indoor',
    '24e124126d152969': 'bottom_right_corner',
    '24e124128c067999': 'indoor',
    '24e124785c389818': 'bottom_left_corner',
    '24e124785c421885': 'top_right_corner'
}

# InfluxDB 설정 정보
url = "http://133.186.144.22:8086"
token = "BPJ1pnKvoaov4Tte971t0zpRSTUXNZvrshU7u3UPheAIsBeUJEFfbKjfsZjtwZmugkHJEGRW17lH4bR9ybanNQ=="
org = "smoothing"

# 디바이스 ID와 위치를 매핑
location_mapping = {
    '24e124126d152919': 'indoor',
    '24e124126d152969': 'bottom_right_corner',
    '24e124128c067999': 'indoor',
    '24e124785c389818': 'bottom_left_corner',
    '24e124785c421885': 'top_right_corner'
}

# InfluxDB 클라이언트 생성
def create_client(url, token, org):
    return InfluxDBClient(url=url, token=token, org=org)

# 쿼리 실행 및 DataFrame으로 변환
def query_to_dataframe(client, query):
    result = client.query_api().query(query=query)
    results = []
    
    for table in result:
        for record in table.records:
            results.append({
                "time": record.get_time(),
                "value": record.get_value(),
                "place": record.values.get("place"),
                "location": record.values.get("location"),
                "device": record.values.get("device")
            })
    
    df = pd.DataFrame(results)
    df['time'] = df['time'].astype(str).str.replace(r'\+00:00$', '', regex=True)
    return df

# 데이터를 날짜를 지정하여 CSV 파일로 저장
def save_csv(df, file_pattern, directory):
    # 경로가 존재하는지 확인하고, 없다면 생성
    if not os.path.exists(directory):
        os.makedirs(directory)
        
    current_date = datetime.now()
    previous_date = current_date - timedelta(days=1)
    filename = f"{directory}{previous_date.strftime(file_pattern)}"
    df.to_csv(filename, index=False)
    
# 온도 Data에서 'device' 열에 따라 'location' 열을 업데이트    
def update_location(df, location_mapping):
    df['location'] = df['device'].map(location_mapping)
    return df

# 결측치 확인 및 처리
def load_and_clean_data(file_path):
    print("\n--- 데이터 처리 시작 ---\n")
    # 데이터 불러오기
    df = pd.read_csv(file_path)

    # 파일명 출력
    print(f"파일명: {file_path.split('/')[-1]}")

    # 결측치 확인
    print("결측치 수:")
    print(df.isnull().sum())

    # 결측치 제거 전의 데이터 크기 출력
    print(f"결측치 제거 전의 데이터 크기 : {df.shape}")

    # 결측치 제거
    df.dropna(inplace=True)

    # 결측치 제거 후의 데이터 크기 출력
    print(f"결측치 제거 후의 데이터 크기 : {df.shape}")

    print("\n--- 데이터 처리 완료 ---\n")
    return df

# 처리한 DF를 다른 폴더에 저장
def process_csv_files(folder_path, output_folder):
    summary = []

    for filename in os.listdir(folder_path):
        if filename.endswith('.csv'):
            file_path = os.path.join(folder_path, filename)
            df = load_and_clean_data(file_path)

            # 저장할 파일의 경로 설정
            output_file_path = os.path.join(output_folder, filename)

            # CSV 파일로 저장
            df.to_csv(output_file_path, index=False)
            print(f"저장 완료: {output_file_path}")
            
            summary.append({
                'filename': filename,
                'original_rows': df.shape[0] + df.isnull().any().sum(),
                'rows_after_cleaning': df.shape[0]
            })

    # 결과 요약 정보 출력
    for info in summary:
        print(f"{info['filename']}: 원래 행 수 = {info['original_rows']}, 정리 후 행 수 = {info['rows_after_cleaning']}")

### 쿼리 사용하여 조회

In [14]:
# 클라이언트 생성 및 쿼리 실행
client = create_client(url, token, org)

# 전력 조회 Flux 쿼리
query_powermetrics = '''
import "date"

from(bucket: "powermetrics_data")
  |> range(start: 2024-04-22T00:00:00Z, stop: 2024-04-23T00:00:00Z)
  |> filter(fn: (r) => r["phase"] == "total")
  |> filter(fn: (r) => r["description"] == "w")
  |> filter(fn: (r) => r["place"] == "class_a" or r["place"] == "office")
  |> filter(fn: (r) => r["location"] != "main")
  |> aggregateWindow(every: 2m, fn: mean, createEmpty: false)
  |> keep(columns: ["_time", "_value", "place", "location", "device"])
'''

# CSV 변환
df_powermetrics = query_to_dataframe(client, query_powermetrics)
print(df_powermetrics.head())
save_csv(df_powermetrics, "04_22_powermetrics_data.csv", "update_location/power/")

# 온도 조회 Flux 쿼리
query_environmental = '''
import "date"
from(bucket: "environmentalsensors_data")
  |> range(start: 2024-04-22T00:00:00Z, stop: 2024-04-23T00:00:00Z)
  |> filter(fn: (r) => r["measurement"] == "temperature")
  |> filter(fn: (r) => r["place"] == "class_a" or r["place"] == "office")
  |> aggregateWindow(every: 2m, fn: mean, createEmpty: false)
  |> keep(columns: ["_time", "_value", "place", "location", "device"])
'''

# CSV 변환
df_environmental = query_to_dataframe(client, query_environmental)
df_environmental_fix = update_location(df_environmental, location_mapping)
print(df_environmental_fix.head())
save_csv(df_environmental_fix, "04_22_environmentalsensors_data.csv", "update_location/temp/")

# 클라이언트 종료
client.close()


                  time  value   place   location     device
0  2024-04-22 00:02:00   25.0  office  a_project  gems-3500
1  2024-04-22 00:04:00   25.0  office  a_project  gems-3500
2  2024-04-22 00:06:00   26.0  office  a_project  gems-3500
3  2024-04-22 00:08:00   26.0  office  a_project  gems-3500
4  2024-04-22 00:10:00   26.0  office  a_project  gems-3500
                  time  value   place location            device
0  2024-04-22 00:02:00   23.3  office   indoor  24e124126d152919
1  2024-04-22 00:04:00   23.3  office   indoor  24e124126d152919
2  2024-04-22 00:06:00   23.4  office   indoor  24e124126d152919
3  2024-04-22 00:08:00   23.4  office   indoor  24e124126d152919
4  2024-04-22 00:10:00   23.4  office   indoor  24e124126d152919


### 기존 파일 location 업데이트 (구 버전으로 생성된 파일들에 사용)

In [None]:
# 온도 Data에서 'device' 열에 따라 'location' 열을 업데이트    
def update_location(df, location_mapping):
    df['location'] = df['device'].map(location_mapping)
    return df

def process_files(fix_directory, directory, location_mapping):
    # 폴더 내의 모든 파일을 리스트업
    for filename in os.listdir(fix_directory):
        if filename.endswith('.csv'):
            file_path = os.path.join(fix_directory, filename)
            df = pd.read_csv(file_path)
            updated_df = update_location(df, location_mapping)
            
            # 새 파일 경로 생성
            new_file_path = os.path.join(directory, filename)
            
            # 업데이트된 DataFrame을 새 경로에 저장
            updated_df.to_csv(new_file_path, index=False)
            print(f"Processed and saved: {new_file_path}")

# 수정할 파일 경로
fix_directory = 'update_location/'

# 저장할 파일 경로
directory = 'environmentalsensors/'

# 파일 처리 실행
process_files(fix_directory, directory, location_mapping)