In [None]:
import json
from pyflink.common import WatermarkStrategy, DeserializationSchema
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
import pickle
import pandas as pd
from pyflink.datastream import StreamExecutionEnvironment, WindowFunction, CountWindow
import numpy as np
from scipy import stats

with open('../weights/xgb.pickle', 'rb') as f:
    xgb = pickle.load(f)

env = StreamExecutionEnvironment.get_execution_environment()

# 定义反序列化器，用于将 Kafka 中的 JSON 字符串转换为字典
class JsonDeserializationSchema(DeserializationSchema):
    def deserialize(self, message):
        return json.loads(message)

source = KafkaSource.builder() \
    .set_bootstrap_servers("127.0.0.1:9092") \
    .set_topics("trajectory-topic") \
    .set_group_id("trajectory-group") \
    .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
    .set_value_only_deserializer(JsonDeserializationSchema()) \
    .build()
stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")

class MyWindowFunction(WindowFunction):
    def apply(self, key, window,input, out):
        # 收集窗口内的数据元素
        # 假设 input 是一个列表，包含了多个字典
        # 假设每部字典都有一个 'y' 和 'z' 键，对应的值是 NumPy 数组
        # 预先计算差值
        df_features = pd.DataFrame()
        y = []
        z = []
        for item in input:
            y_item = item['y']
            z_item = item['z']
            y.append(y_item)
            z.append(z_item)
        y = np.array(y)
        z = np.array(z)
        MedianAbsoluteDiff_z = np.median(np.absolute(np.diff(z)))
        log_MeanAbsoluteDiff_z = np.log(np.median(np.absolute(np.diff(z))))
        log_MeanAbsoluteDiff_y = np.log(np.median(np.absolute(np.diff(y))))
        MedianAbsoluteDiff_y = np.median(np.absolute(np.diff(y)))
        max_y = np.max(y)
        e_max_z = np.exp(np.max(z))
        e_min_y = np.exp(np.min(y))
        min_z = np.min(z)
        MedianDiff_y = np.median(np.diff(y))
        MedianDiff_z = np.median(np.diff(z))
        e_kurt_y = np.exp(stats.kurtosis(y))
        median_z = np.median(z)
        df_features['MedianAbsoluteDiff_z'] = MedianAbsoluteDiff_z
        df_features['log_MeanAbsoluteDiff_z'] = log_MeanAbsoluteDiff_z
        df_features['log_MeanAbsoluteDiff_y'] = log_MeanAbsoluteDiff_y
        df_features['MedianAbsoluteDiff_y'] = MedianAbsoluteDiff_y
        df_features['max_y'] = max_y
        df_features['e_max_z'] = e_max_z
        df_features['e_min_y'] = e_min_y
        df_features['min_z'] = min_z
        df_features['MedianDiff_y'] = MedianDiff_y
        df_features['MedianDiff_z'] = MedianDiff_z
        df_features['e_kurt_y'] = e_kurt_y
        df_features['median_z'] = median_z
        # 使用 DataFrame 进行预测
        prediction = xgb.predict(df_features)
        # 发射预测结果
        out.collect(prediction)

# 使用 CountWindow 创建基于事件数的窗口
# 这里创建了一个每80个事件触发一次的窗口
result_stream = stream.window(CountWindow.of(80)).apply(MyWindowFunction())
result_stream.print()
env.execute()