In [1]:
import boto3
import numpy as np
import scipy.signal
import cx_Oracle
import tensorflow as tf
from apscheduler.schedulers.blocking import BlockingScheduler
from pywt import wavedec
from datetime import datetime
import pandas as pd
from Pan_Tompkins import Pan_Tompkins_QRS
from Heart_Rate import HeartRate
import os
import re
import tempfile

# AWS S3 설정
s3_client = boto3.client(
    's3',
    aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
    aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
    region_name='ap-northeast-2'
)
bucket_name = 'smhrdpulsepulse'

# Oracle DB 연결 설정
connection = cx_Oracle.connect('CGI_24S_IOT3_1', 'smhrd1', 'project-db-cgi.smhrd.com:1524/xe')
cursor = connection.cursor()

# TensorFlow 모델 로드
model = tf.keras.models.load_model('model0812_6s.keras')

# 이미 처리된 파일 목록
processed_files = set()

# 데이터 처리 함수
def extract_features(X):
    Fil = []
    for signal in X:
        signal = scipy.signal.decimate(signal, 2)
        features = wavedec(signal, 'db6', level=4)
        padded_all = []
        for f in features:
            padded_feature = np.pad(f, (0, len(features[-1])-len(f)), 'constant')
            padded_all.append(padded_feature)
        K = np.swapaxes(np.array(padded_all), 0, 1)
        Fil.append(K)
    F = np.array(Fil)
    return F

# RR 간격 및 심박수 계산 함수
def calculate_rr_intervals(signals, sampling_rate):
    ecg_df = pd.DataFrame({'TimeStamp': np.arange(len(signals)), 'ecg': signals})
    QRS_detector = Pan_Tompkins_QRS(fs=sampling_rate)
    mwin, bpass = QRS_detector.solve(ecg_df)

    hr = HeartRate(signals, sampling_rate, mwin, bpass)
    rpeaks = hr.find_r_peaks()

    if len(rpeaks) < 2:
        print("Insufficient R-peaks detected, unable to calculate intervals.")
        return 0.0, 0.0, 0.0, 0.0, 0.0

    rpeaks = np.array(rpeaks)
    rr_intervals = np.diff(rpeaks) / sampling_rate
    bpm = (60.0 / rr_intervals.mean())
    rr_min = rr_intervals.min() * 1000
    rr_max = rr_intervals.max() * 1000
    rr_avg = rr_intervals.mean() * 1000
    rr_std = rr_intervals.std() * 1000

    return bpm, rr_min, rr_max, rr_avg, rr_std


# 이전 파일의 데이터를 저장할 변수
previous_data = np.array([])

def process_file(file_key):
    global previous_data
    
    safe_filename = re.sub(r'[<>:"/\\|?*]', '_', file_key.split('/')[-1])
    local_filename = os.path.join(tempfile.gettempdir(), safe_filename)
    
    try:
        s3_client.download_file(bucket_name, file_key, local_filename)
    except Exception as e:
        print(f"Error downloading file {file_key}: {e}")
        return

    try:
        data = np.fromfile(local_filename, dtype=np.float32)
        data_rounded = np.round(data, 3)
    except Exception as e:
        print(f"Error reading file {local_filename}: {e}")
        return

    sampling_rate = 360.0
    signals = data.flatten()

    # 이전 데이터를 현재 파일의 데이터와 결합
    if len(previous_data) > 0:
        combined_signals = np.concatenate((previous_data, signals))
    else:
        combined_signals = signals

    # 윈도우를 6초(360 * 6 = 2160 samples) 길이로 자름
    segment_length = int(6 * sampling_rate)

    if len(combined_signals) < segment_length:
        # 데이터가 6초 미만일 경우, 다음 파일을 기다림
        previous_data = combined_signals
        print(f"Combined data length is less than 6 seconds. Waiting for next file...")
        return

    # 첫 번째 6초 길이의 데이터를 추출
    segment = combined_signals[:segment_length]

    # 나머지 데이터를 다음 파일 처리 시 사용할 수 있도록 저장
    previous_data = combined_signals[segment_length:]

    # 모델에 입력하기 위한 데이터 처리 및 예측
    segment = segment.reshape(1, -1)
    test_features = extract_features(segment)
    predictions = model.predict(test_features)

    avg_predictions = predictions[0]  # 하나의 윈도우에 대한 예측이므로 평균화는 불필요
    class_labels = ['/', 'L', 'N', 'R', 'V']
    analysis_result = ", ".join([f"{label}: {prob:.4f}" for label, prob in zip(class_labels, avg_predictions)])

    bpm, rr_min, rr_max, rr_avg, rr_std = calculate_rr_intervals(segment.flatten(), sampling_rate)

    user_id = '202502@jnu.ac.kr'
    ecg_data = data_rounded.tobytes()
    analysis_idx = f'{user_id}_{datetime.now().strftime("%Y%m%d%H%M%S")}'

    print(f"BPM : {bpm}, RR_MIN: {rr_min}, RR_MAX: {rr_max}, RR_AVG: {rr_avg}, RR_STD: {rr_std}")
    print(f"Analysis Result: {analysis_result}")

    bpm = int(bpm)
    rr_min = int(rr_min)
    rr_max = int(rr_max)
    rr_avg = int(rr_avg)
    rr_std = int(rr_std)

    sql = '''
        INSERT INTO TB_ANALYSIS 
        (ANALYSIS_IDX, ID, ECG, BP_AVG, RR_MIN, RR_MAX, RR_AVG, RR_STD, ANALISYS_RESULT, CREATED_AT)
        VALUES (:analysis_idx, :user_id, :ecg_data, :bpm, :rr_min, :rr_max, :rr_avg, :rr_std, :analysis_result, :created_at)
    '''
    try:
        cursor.setinputsizes(ecg_data=cx_Oracle.BLOB)
        cursor.execute(sql, analysis_idx=analysis_idx, user_id=user_id, ecg_data=ecg_data, 
                       bpm=bpm, rr_min=rr_min, rr_max=rr_max, rr_avg=rr_avg, rr_std=rr_std, 
                       analysis_result=analysis_result, created_at=datetime.now())
        
        connection.commit()
        processed_files.add(file_key)

    except cx_Oracle.DatabaseError as e:
        error, = e.args
        print(f"Oracle-Error-Code: {error.code}")
        print(f"Oracle-Error-Message: {error.message}")
        print("Values causing the error:")
        print(f"BPM : {bpm}, RR_MIN: {rr_min}, RR_MAX: {rr_max}, RR_AVG: {rr_avg}, RR_STD: {rr_std}")



# 새 파일을 체크하고 처리하는 함수
def check_new_files():
    response = s3_client.list_objects_v2(Bucket=bucket_name)
    for obj in response.get('Contents', []):
        file_key = obj['Key']
        if file_key.endswith('.bin') and file_key not in processed_files:
            print(f"Processing {file_key}")
            process_file(file_key)

# 스케줄러 설정 및 실행
scheduler = BlockingScheduler()
scheduler.add_job(check_new_files, 'interval', seconds=6)

print("Scheduler started. Monitoring new files...")
try:
    scheduler.start()
except (KeyboardInterrupt, SystemExit):
    print("Scheduler stopped.")
finally:
    cursor.close()
    connection.close()


Scheduler started. Monitoring new files...
Processing ecg0_2024-08-14T15:00:36Z.bin
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 426ms/step
BPM : 92.13270142180095, RR_MIN: 375.0, RR_MAX: 933.3333333333334, RR_AVG: 651.2345679012345, RR_STD: 182.54183386168484
Analysis Result: /: 0.0000, L: 0.0000, N: 1.0000, R: 0.0000, V: 0.0000
Processing ecg0_2024-08-14T15:01:11Z.bin
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 16ms/step
BPM : 104.28485214242608, RR_MIN: 0.0, RR_MAX: 822.2222222222222, RR_AVG: 575.3472222222223, RR_STD: 281.02421492640093
Analysis Result: /: 0.0000, L: 0.0000, N: 1.0000, R: 0.0000, V: 0.0000
Processing ecg0_2024-08-14T15:01:35Z.bin
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step
BPM : 171.7492984097287, RR_MIN: 0.0, RR_MAX: 788.8888888888889, RR_AVG: 349.3464052287582, RR_STD: 276.6045967655927
Analysis Result: /: 0.0000, L: 0.0000, N: 1.0000, R: 0.0000, V: 0.0000
Processing ecg0_2024-08-14T15:01:58Z.bin
[1m

Execution of job "check_new_files (trigger: interval[0:00:06], next run at: 2024-08-15 20:04:07 KST)" skipped: maximum number of running instances reached (1)


Processing ecg2_2024-08-14T15:00:52Z.bin
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step
BPM : 115.62043795620436, RR_MIN: 0.0, RR_MAX: 1608.3333333333335, RR_AVG: 518.939393939394, RR_STD: 414.31460409230925
Analysis Result: /: 0.0000, L: 0.0000, N: 1.0000, R: 0.0000, V: 0.0000
Processing ecg2_2024-08-14T15:01:27Z.bin
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step
BPM : 90.28213166144201, RR_MIN: 225.0, RR_MAX: 1130.5555555555554, RR_AVG: 664.5833333333333, RR_STD: 286.10352605911163
Analysis Result: /: 0.0000, L: 0.0000, N: 1.0000, R: 0.0000, V: 0.0000
Processing ecg2_2024-08-14T15:01:50Z.bin
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 14ms/step
BPM : 84.95575221238937, RR_MIN: 297.22222222222223, RR_MAX: 1011.1111111111111, RR_AVG: 706.25, RR_STD: 228.96771667153814
Analysis Result: /: 0.0000, L: 0.0000, N: 1.0000, R: 0.0000, V: 0.0000
Processing ecg2_2024-08-14T15:02:14Z.bin
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m