# 시간적 연속성이 있다고 가정할 경우

> 서버 시스템 로그 파일인 `UserVitalSign`, `UserLocationLog`, `UserGyro` 데이터를 데이터베이스에 저장하고 데이터 동기화 및 예측 시스템 구성을 위해 고려해야 할 요소들을 단계별제시

## 1. 데이터베이스에 저장 시 수정할 열
- **중복되는 시간 정보 제거**: 각 데이터셋에 `REGISTERDATE`, 'MODIFYDATE' 등 저장 시점 관련 열이 있다면, 통합된 하난의 저장 시간 컬럼으로 대체 가능
- **일관된 사용자 코드**: `USERCODE`나 `USERID`가 일관되지 않을 경우, 통합 과정에서 이를 하나의 기준으로 통일하여 각 데이터셋 간 연결성을 강화
- **위치 정보 통합**: `UserLocationLog`의 `LATITUDE`, `LONGITUDE`는 중요한 위치 정보이므로 `UserVitalSign`, `UserGyro`와 결합할 경우 사용자가 위치를 기반으로 행동 예측이나 낙상 예측에 활용 가능

## 2. 측정 주기 차이에 따른 데이터 동기화
- **샘플링 주기 맞추기**: 각 데이터셋의 주기가 다르다면, 가장 짧은 주기를 기준으로 데이터를 리샘플링하거나 보간하여 동기화 가능. 예를 들어, `UserGyro`의 주기가 가장 짧다면 이를 기준으로 다른 데이터셋을 보간
- **타임 스탬프 기반 보간**: `VITALDATE`, `CHECKTIME` 등의 타임 스탬프를 기준으로 보간(interpolation)하여 일정한 주기(예: 초 단위)로 데이터를 맞춤
- **데이터 결합 시의 시간 오차 허용**: 동기화 과정에서 약간의 시간 차이를 허용할 수 있는 범위를 지정하여 각 측정 주기 간 차이를 줄일 수 있음

## 3. 측정 주기 및 레이블 없이 위험 예측 시스템 구성하기
- **주기 맞춤으로 행동 패턴 분석**: 기존 연구와 다른 방식으로 레이블 없는 데이터를 사용하여 행동 패턴을 추론 가능. 예를 들어, `UserGyro`에서 X, Y, Z 축의 변화로 특정 움직임을 파악하고, `UserVitalSign`의 심박수나 체온 변화와 결합해 위험 예측 가능
- **이상치 탐지를 통한 비정상 상황 예측**: 레이블 없이 위험 예측을 할 때 이상치 탐지 모델을 사용하여 비정상적인 행동(예: 급격한 움직임)이나 생체 신호 변화를 탐지하여 위험도를 예측 가능
- **데이터 클러스터링 및 패턴 분석**: 군집화(clustering)를 사용하여 패턴을 분류하고, 이러한 군집의 특성에 따라 위험 수준을 예측 가능. 예를 들어, 낮은 활동 패턴과 높은 심박수 변화 패턴을 가진 클러스터가 위험 가능성을 높게 판단하는 방식

### 각 데이터셋의 작업일자(`WORKDATE`)와 작업자(`USERCODE`)별로 측정된 불연속 구간을 고려하여 평균 주기를 계산하고, 이 주기에 맞추어 리샘플링 함

In [6]:
import pandas as pd
import numpy as np
from tqdm import tqdm

print("데이터 로드 중...")

# UserVitalSign 데이터 로드
vital_sign = pd.read_csv('Cleaned_UserVitalSign.csv', parse_dates=['VITALDATE'])
print("UserVitalSign 데이터 로드 완료.")

# UserLocationLog 데이터 로드
location_log = pd.read_csv('Cleaned_UserLocationLog.csv', parse_dates=['CHECKTIME'])
print("UserLocationLog 데이터 로드 완료.")

# UserGyro 데이터 로드
gyro_data = pd.read_csv('Cleaned_All_UserGyro.csv', parse_dates=['VITALDATE'])
print("UserGyro 데이터 로드 완료.")

# 데이터가 로드되었는지 확인
print("\n데이터 샘플 확인:")
print("UserVitalSign 데이터:", vital_sign.head(), sep="\n")
print("UserLocationLog 데이터:", location_log.head(), sep="\n")
print("UserGyro 데이터:", gyro_data.head(), sep="\n")

# 1. 불연속 구간을 고려한 평균 주기 계산 함수 정의
def calculate_avg_interval_with_discontinuity(df, date_column, threshold=3600):
    print(f"\n[{date_column}] 그룹별 평균 주기 계산 중....")
    
    avg_intervals = []
    # WORKDATE와 USERCODE로 그룹화
    grouped = df.groupby(['WORKDATE', 'USERCODE'])

    for (workdate, usercode), group in tqdm(grouped, desc="불연속 구간 계산 진행"):
        group = group.sort_values(by=date_column)
        
        # VITALDATE 간 시간 차이 계산
        time_diffs = group[date_column].diff().dt.total_seconds().dropna()

        # 특정 시간 간격을 넘는 차이를 불연속 구간으로 간주하고 필터링
        filtered_diffs = time_diffs[time_diffs <= threshold]

        # 불연속 구간 제외 후 평균 주기 계산
        avg_interval = filtered_diffs.mean() if not filtered_diffs.empty else None
        
        # 그룹의 첫 번째 행에서 필요한 열을 가져와 추가
        additional_data = group.iloc[0].to_dict()
        additional_data.update({'WORKDATE': workdate, 'USERCODE': usercode, 'AVG_INTERVAL': avg_interval})
        avg_intervals.append(additional_data)

    # 결과를 데이터프레임으로 반환
    avg_interval_df = pd.DataFrame(avg_intervals)
    print(f"[{date_column}] 평균 주기 계산 완료!\n")
    return avg_interval_df

# 2. 각 데이터셋에 대해 불연속 구간을 고려한 평균 주기 계산
print("1단계: 각 데이터셋의 그룹별 평균 주기 계산")
vital_avg_intervals = calculate_avg_interval_with_discontinuity(vital_sign, 'VITALDATE', threshold=3600)
location_avg_intervals = calculate_avg_interval_with_discontinuity(location_log, 'CHECKTIME', threshold=3600)
gyro_avg_intervals = calculate_avg_interval_with_discontinuity(gyro_data, 'VITALDATE', threshold=3600)

# 결과 확인
print("\nUserVitalSign 평균 주기 데이터:")
print(vital_avg_intervals.head())
print("\nUserLocationLog 평균 주기 데이터:")
print(location_avg_intervals.head())
print("\nUserGyro 평균 주기 데이터:")
print(gyro_avg_intervals.head())


# # 3. 주기를 반영한 리샘플링 수행 함수 정의
# def resample_by_avg_interval(df, date_column, avg_intervals_df):
#     print(f"\n[{date_column}] 주기 기반 리샘플링 시작...")
#     resampled_dfs = []

#     for (workdate, usercode), group in tqdm(df.groupby(['WORKDATE', 'USECODE']), desc="그룹별 리샘플링 진행"):
#         # 평균 주기 가져오기
#         avg_interval = avg_intervals_df.loc[
#             (avg_intervals_df['WORKDATE'] == workdate) &
#             (avg_intervals_df['USERCODE'] == usercode), 'AVG_INTERVAL'
#         ]

#         if not avg_interval.empty and not np.isnan(avg_interval.values[0]):
#             group = group.set_index(date_column)
#             # 주기 평균에 맞추어 리샘플링
#             resample_freq = f"{int(avg_interval.values[0])}S"
#             resampled_group = group.resample(resample_freq).ffill().reset_index()
#             resampled_group['WORKDATE'] = workdate
#             resampled_group['USERCODE'] = usercode
#             resampled_dfs.append(resampled_group)

#     # 모든 리샘플링된 그룹 결합
#     resampled_df = pd.concat(resamped_dfs, ignore_index=True)
#     print(f"[{date_column}] 리샘플링 완료!\n")
#     return resampled_df
    
# # 4. 각 데이터셋 동기화
# print("2단계: 각 데이터셋 주기 기반 리샘플링 수행")
# resampled_vital = resample_by_avg_interval(vital_sign, 'VITALDATE', vital_avg_intervals)
# resampled_location = resample_by_avg_interval(location_log, 'CHECKTIME', location_avg_intervals)
# resampled_gyro = resample_by_avg_interval(gyro, 'VITALDATE', gyro_avg_intervals)

# # 5. 동기화된 데이터 확인
# print("\n동기화된 UserVitalSign 데이터:")
# print(resampled_vital.head())
# print("\n동기화된 UserLocationLog 데이터:")
# print(resampled_location.head())
# print("\n동기화된 UserGyro 데이터:")
# print(resampled_gyro.head())

데이터 로드 중...
UserVitalSign 데이터 로드 완료.
UserLocationLog 데이터 로드 완료.
UserGyro 데이터 로드 완료.

데이터 샘플 확인:
UserVitalSign 데이터:
        NO  WORKDATE  USERCODE  ISWEAR  HEARTBEAT  TEMPERATURE  \
0  1722304  20240801        -1       0          0          0.0   
1  1722305  20240801        -1       0          0          0.0   
2  1722306  20240801        -1       0          0          0.0   
3  1722307  20240801        -1       0          0          0.0   
4  1722308  20240801        -1       0          0          0.0   

   OUTSIDETEMPERATURE  LATITUDE  LONGITUDE  DEVICEBATTERY  \
0                   0       0.0        0.0            100   
1                   0       0.0        0.0            100   
2                   0       0.0        0.0             -1   
3                   0       0.0        0.0            100   
4                   0       0.0        0.0            100   

                VITALDATE             REGISTERDATE               MODIFYDATE  \
0 2024-08-01 01:17:28.823  2024-08-01 01:1

불연속 구간 계산 진행: 100%|████████████████████████████████████████████████████████| 784/784 [00:00<00:00, 918.65it/s]


[VITALDATE] 평균 주기 계산 완료!


[CHECKTIME] 그룹별 평균 주기 계산 중....


불연속 구간 계산 진행: 100%|████████████████████████████████████████████████████████| 949/949 [00:01<00:00, 561.20it/s]


[CHECKTIME] 평균 주기 계산 완료!


[VITALDATE] 그룹별 평균 주기 계산 중....


불연속 구간 계산 진행: 100%|█████████████████████████████████████████████████████████| 472/472 [00:05<00:00, 80.30it/s]


[VITALDATE] 평균 주기 계산 완료!


UserVitalSign 평균 주기 데이터:
        NO  WORKDATE  USERCODE  ISWEAR  HEARTBEAT  TEMPERATURE  \
0  1722304  20240801        -1       0          0     0.000000   
1  1789774  20240801         1       0         -1    -1.000000   
2  1723961  20240801        13       1         79    29.634054   
3  1724316  20240801        14       1         83    31.905823   
4  1725375  20240801        15       1          0     0.000000   

   OUTSIDETEMPERATURE   LATITUDE   LONGITUDE  DEVICEBATTERY  \
0                   0   0.000000    0.000000            100   
1                  -1   0.000000    0.000000             53   
2                  30  35.166053  129.056287            100   
3                  31  35.165800  129.056021            100   
4                  -1  35.166056  129.056285            100   

                VITALDATE             REGISTERDATE               MODIFYDATE  \
0 2024-08-01 01:17:28.823  2024-08-01 01:17:28.579  2024-08-01 01:17:28.579   
1 2024-08-01 1

# 이벤트 발생 기반의 로그 데이터의 특성과 작업일자 및 작업자별 통합을 고려하여, 실시간 위험 예측 웹 서비스를 구현하기 위해서는 각 데이터셋의 발생 시점을 기준으로 동기화해야 함

## 1. 데이터셋 기본 구조 설정
- **데이터셋의 특성 파악**: `UserGyro`, `UserVitalSign`, `UserLocationLog`는 모두 변화가 감지될 때만 기록되는 이벤트 기반 로그
- **통합 기준**: 발생 시점(`VITALDATE`, `CHECKTIME`)을 기준으로 작업자별(`USERCODE`)과 작업일자별(`WORKDATE`)로 그룹화하여 동기화하는 것이 필요

## 2. 데이터 전처리 및 그룹화
1. **불필요한 열 제거**: `NO` 열은 단순히 로그 순서 키이므로 제거
2. **작업일자와 작업자별로 그룹화**: `WORKDATE`와 `USERCODE` 기준으로 그룹화하여 각각의 데이터를 다룸
3. **이벤트 간격 분석**: 연속성 여부를 파악하기 위해 이벤트 간 시간 간격을 계산. 연속적 간격은 그대로 두고, 불연속적인 간격의 경우는 특정 기준으로 간격을 조정하거나 비어있는 데이터를 보완할지 결정

## 3. 데이터 동기화
- **기준 시간 설정**: 위험 예측 모델에서 중요하게 다루는 이벤트 발생 시간을 기준으로 다른 데이터셋의 시계열 데이터를 보간하거나 맞춤
    - 예: UserGyro의 행동 변화 시점을 중심으로, 동일한 `VITALDATE` 또는 근접 시간(`±tolerance`초 이내)에 발생한 심박수, 위치 등의 데이터가 포함되도록 맞춤
- **시간적 불연속 구간 처리**: 불연속적인 구간이 발생하는 경우, 해당 구간에 대한 위험 예측 분석에서의 중요성을 고려하여 간격이 큰 경우는 비어 있는 데이터로 두거나, 중간값 보간으로 데이터를 보완 가능

## 4. 실시간 분석 및 모델 적용
- **이벤트 발생 시점 기준 윈도우 생성**: 이벤트 기반 로그이므로 슬라이딩 윈도우를 설정하여 각 이벤트가 발생한 시점을 중심으로 윈도우를 설정함
- **모델 훈련 및 예측**: 훈련된 위험 예측 모델을 이벤트 발생 시점별로 분석하여, 실시간으로 위험 여부를 판단함. 이때 예측된 행동 변화와 생체 신호, 위치 데이터를 조합하여 위험 수준을 산출

In [18]:
import pandas as pd
from datetime import timedelta
import logging
import sys

# 로깅 설정
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger()

# tolerance 시간 설정 (5초 이내의 시간으로 동기화)
TOLERANCE_SECONDS = 5
tolerance = timedelta(seconds=TOLERANCE_SECONDS)

def load_data(gyro_path, vital_path, location_path):
    """
    데이터 로드 함수.
    """
    try:
        logger.info("데이터 로드 중...")
        gyro_data = pd.read_csv(gyro_path, parse_dates=['VITALDATE'])
        vital_data = pd.read_csv(vital_path, parse_dates=['VITALDATE'])
        location_data = pd.read_csv(location_path, parse_dates=['CHECKTIME'])
        logger.info("데이터 로드 완료!")
        return gyro_data, vital_data, location_data
    except Exception as e:
        logger.error(f"데이터 로드 중 오류 발생: {e}")
        sys.exit(1)

def synchronize_event_based_data(main_df, secondary_df, main_time_col, secondary_time_col, desc=""):
    """
    이벤트 기반 데이터 동기화 함수.
    """
    logger.info(f"{desc} 데이터 동기화 시작...")
    try:
        # 불필요한 열 제거
        for col in ['NO', 'USERCODE', 'WORKDATE']:
            if col in main_df.columns:
                main_df = main_df.drop(columns=[col])
            if col in secondary_df.columns:
                secondary_df = secondary_df.drop(columns=[col])

        # 시간 열을 datetime 타입으로 변환
        secondary_df[secondary_time_col] = pd.to_datetime(secondary_df[secondary_time_col], errors='coerce')
        main_df[main_time_col] = pd.to_datetime(main_df[main_time_col], errors='coerce')

        # 시간 열에 결측치가 있는 행 제거
        main_df = main_df.dropna(subset=[main_time_col])
        secondary_df = secondary_df.dropna(subset=[secondary_time_col])

        # 데이터 타입 다운캐스팅 (메모리 최적화)
        for df in [main_df, secondary_df]:
            for col in df.select_dtypes(include=['float64', 'int64']).columns:
                df[col] = pd.to_numeric(df[col], downcast='float' if df[col].dtype == 'float64' else 'integer')

        # 병합 전에 secondary_df에서 중복 열 제거 (시간 열 제외)
        common_cols = set(main_df.columns).intersection(set(secondary_df.columns))
        cols_to_drop = common_cols - {secondary_time_col}
        if cols_to_drop:
            secondary_df = secondary_df.drop(columns=cols_to_drop)

        # 병합 (USERCODE 및 WORKDATE 없이)
        merged_df = pd.merge_asof(
            main_df.sort_values(main_time_col),
            secondary_df.sort_values(secondary_time_col),
            left_on=main_time_col,
            right_on=secondary_time_col,
            direction="nearest",
            tolerance=tolerance
        )

        # 동기화된 시간 열 이름 변경
        merged_df = merged_df.rename(columns={secondary_time_col: f"{secondary_time_col}_synced"})

        # 병합으로 생성된 _x, _y 접미사 열 제거
        suffixes = ['_x', '_y']
        cols_to_drop = [col for col in merged_df.columns if any(col.endswith(suffix) for suffix in suffixes)]
        if cols_to_drop:
            merged_df = merged_df.drop(columns=cols_to_drop)

        # 누락 데이터 확인
        missing_count = merged_df.isnull().sum().sum()
        logger.info(f"[{desc}] 동기화 후 누락 데이터 개수: {missing_count}")

        # 결측치 처리 (전체 데이터프레임에 대해)
        merged_df = merged_df.ffill()
        missing_count = merged_df.isnull().sum().sum()
        logger.info(f"[{desc}] ffill 처리 후 누락 데이터 개수: {missing_count}")

        if missing_count > 0:
            merged_df = merged_df.bfill()
            missing_count = merged_df.isnull().sum().sum()
            logger.info(f"[{desc}] bfill 처리 후 누락 데이터 개수: {missing_count}")

            if missing_count > 0:
                merged_df = merged_df.fillna('NaN')
                logger.info(f"[{desc}] 남은 누락 데이터 'NaN'으로 대체 완료")

        logger.info(f"{desc} 데이터 동기화 완료.")
        return merged_df

    except Exception as e:
        logger.error(f"{desc} 데이터 동기화 중 오류 발생: {e}")
        sys.exit(1)

def clean_final_columns(df):
    """
    최종 분석에 필요한 열만 유지하고 나머지 삭제하는 함수.
    """
    try:
        # 최종 분석에 필요한 열 리스트
        keep_columns = [
            'VITALDATE', 'VITALDATE_synced', 'CHECKTIME', 'CHECKTIME_synced',
            # 분석에 필요한 열 추가
            'HEARTBEAT', 'TEMPERATURE', 'OUTSIDETEMPERATURE',
            'LATITUDE', 'LONGITUDE', 'SPEED', 'X', 'Y', 'Z'
        ]
        # 존재하지 않는 열은 무시하고 유지
        keep_columns = [col for col in keep_columns if col in df.columns]
        df = df[keep_columns]
        logger.info("불필요한 열 정리 완료. 최종 데이터 구조:")
        logger.info(df.head())
        return df
    except Exception as e:
        logger.error(f"최종 열 정리 중 오류 발생: {e}")
        sys.exit(1)

def save_data(df, output_path):
    """
    최종 데이터를 저장하는 함수.
    """
    try:
        df.to_csv(output_path, index=False)
        logger.info(f"최종 동기화된 데이터를 '{output_path}'에 저장하였습니다.")
    except Exception as e:
        logger.error(f"데이터 저장 중 오류 발생: {e}")
        sys.exit(1)

def main():
    # 데이터 경로 설정
    gyro_path = 'Cleaned_All_UserGyro.csv'
    vital_path = 'Cleaned_UserVitalSign.csv'
    location_path = 'Cleaned_UserLocationLog.csv'
    output_path = 'Synchronized_All_Data.csv'

    # 데이터 로드
    gyro_data, vital_data, location_data = load_data(gyro_path, vital_path, location_path)

    # 1단계: UserVitalSign 데이터 동기화
    synchronized_data = synchronize_event_based_data(
        gyro_data, vital_data, 'VITALDATE', 'VITALDATE', desc="UserVitalSign"
    )

    # 2단계: UserLocationLog 데이터 동기화
    synchronized_data = synchronize_event_based_data(
        synchronized_data, location_data, 'VITALDATE_synced', 'CHECKTIME', desc="UserLocationLog"
    )

    # 3단계: 최종 데이터 정리
    synchronized_data = clean_final_columns(synchronized_data)

    # 최종 데이터 저장
    save_data(synchronized_data, output_path)

if __name__ == "__main__":
    main()

2024-10-26 23:44:25,995 [INFO] 데이터 로드 중...
2024-10-26 23:45:32,189 [INFO] 데이터 로드 완료!
2024-10-26 23:45:32,190 [INFO] UserVitalSign 데이터 동기화 시작...
2024-10-26 23:45:48,492 [INFO] [UserVitalSign] 동기화 후 누락 데이터 개수: 11300440
2024-10-26 23:46:03,258 [INFO] [UserVitalSign] ffill 처리 후 누락 데이터 개수: 14740


  merged_df = merged_df.bfill()


2024-10-26 23:46:14,783 [INFO] [UserVitalSign] bfill 처리 후 누락 데이터 개수: 0
2024-10-26 23:46:14,784 [INFO] UserVitalSign 데이터 동기화 완료.
2024-10-26 23:46:14,889 [INFO] UserLocationLog 데이터 동기화 시작...
2024-10-26 23:46:39,339 [INFO] [UserLocationLog] 동기화 후 누락 데이터 개수: 1864915
2024-10-26 23:46:52,448 [INFO] [UserLocationLog] ffill 처리 후 누락 데이터 개수: 19162
2024-10-26 23:47:05,133 [INFO] [UserLocationLog] bfill 처리 후 누락 데이터 개수: 0
2024-10-26 23:47:05,134 [INFO] UserLocationLog 데이터 동기화 완료.
2024-10-26 23:47:06,205 [INFO] 불필요한 열 정리 완료. 최종 데이터 구조:
2024-10-26 23:47:06,206 [INFO]             VITALDATE_synced    CHECKTIME_synced  HEARTBEAT  TEMPERATURE  \
0 2024-07-30 10:50:00.970684 2024-08-01 08:11:50       95.0    36.015884   
1 2024-07-30 10:50:01.130875 2024-08-01 08:11:50       95.0    36.015884   
2 2024-07-30 10:50:01.289972 2024-08-01 08:11:50       95.0    36.015884   
3 2024-07-30 10:50:01.451762 2024-08-01 08:11:50       95.0    36.015884   
4 2024-07-30 10:50:01.625562 2024-08-01 08:11:50       95.0  