In [5]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import udf, to_timestamp, col, collect_list
from pyspark.sql.types import ArrayType, IntegerType
from pyspark.ml import PipelineModel
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import VectorUDT


In [None]:

spark = SparkSession.builder.appName("BigData_Heartrate_Predict").getOrCreate()


In [None]:

def pad_with_mean(heartrate_list):
    # if not isinstance(heartrate_list, list):
    #     return heartrate_list  # Nếu dữ liệu không phải là list, trả lại nguyên vẹn
    max_length = 34701
    current_length = len(heartrate_list)
    
    # Nếu chiều dài vector nhỏ hơn max_length, ta cần thêm các giá trị trung bình
    if current_length < max_length:
        mean_value = int(sum(heartrate_list) / current_length)  # Tính giá trị trung bình và chuyển sang kiểu int
        # Tạo một vector mới có chiều dài bằng max_length
        padded_vector = heartrate_list + [mean_value] * (max_length - current_length)
        return padded_vector
    else:
        return heartrate_list


In [None]:
def preprocess_data(df):
    df = df.withColumn("datetime_column", to_timestamp(df["Time"], "MM/dd/yyyy hh:mm:ss a"))
    df = df.select(
        'Id',
        'datetime_column',
        'Value'
    )
    df = df.withColumnRenamed("Value", "Heartrate")
    grouped_df = df.withColumn('timestamp', to_timestamp(col('datetime_column'))) \
    .withColumn('date', col('timestamp').cast('date'))  # Tạo cột 'date' chỉ chứa ngày

    # Gom nhóm theo ngày
    df = df.groupBy('Id','date').agg(
        collect_list("Heartrate").alias("heartrate_list")
    )
    # Đăng ký hàm UDF
    pad_with_mean_udf = udf(pad_with_mean, ArrayType(IntegerType()))
    
    # Áp dụng UDF để tạo cột mới với các vector đã được chuẩn hóa chiều dài
    df = df.withColumn("normalized_heartrate_list", pad_with_mean_udf(F.col("heartrate_list")))
    df = df.select('normalized_heartrate_list')
    return df


In [None]:
def vectorize_data(df):
    df = df.withColumn('heartrate_vector', 
                                   F.udf(lambda x: Vectors.dense([float(i) for i in x]), VectorUDT())('normalized_heartrate_list'))
    assembler = VectorAssembler(inputCols=["heartrate_vector"], outputCol="features")
    df = assembler.transform(df)
    return df


In [None]:
preprocess_stage = F.udf(preprocess_data)
vectorize_stage = F.udf(vectorize_data)

In [None]:
from pyspark.ml.classification import RandomForestClassificationModel

# heartrate_model_path = "DoAn\\results\\RandomForest_model"
heartrate_model_path = "RandomForest_model"
heartrate_model = RandomForestClassificationModel.load(heartrate_model_path)


In [None]:
from kafka import KafkaConsumer
from pymongo import MongoClient
from json import loads


kafka_topic = 'BigData_Heartrate_Predict'
kafka_server = 'localhost:9092'
mongo_uri = 'mongodb://localhost:27017'
mongo_db = 'BigData_Heartrate'
mongo_collection = 'Predict'

mongo_client = MongoClient(mongo_uri)
mongo_db = mongo_client[mongo_db]
mongo_collection = mongo_db[mongo_collection]

consumer = KafkaConsumer(
    kafka_topic,
    bootstrap_servers=kafka_server,
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='do_an',
    value_deserializer=lambda x: loads(x.decode('utf-8'))
)
data_list = []
try:
    for message in consumer:
        data = message.value
        data_list.append(data)
        df = spark.createDataFrame(data)
        df = preprocess_stage(df)
        df = vectorize_stage(df)
        df_predict = heartrate_model.transform(df)
        predict = df.select('prediction').collect()[0][0]
        predict_str = ""
        if predict == 0:
            predict_str = "Nhịp tim quá cao/quá thấp"
        elif predict == 1:
            predict_str = "Nhịp tim không ổn định"
        else:
            predict_str = "Nhịp tim bình thường"
        data["Heartrate"] = data.pop("Value")
        data["Prediction"] = predict_str
        print(f"Result row: {data}")
        mongo_collection.insert_one(data)

except KeyboardInterrupt:
    print("\nStopped receive data.")
consumer.close()