In [1]:
from typing import Tuple
from pyflink.table import DataTypes, Row
from pyflink.table.types import RowType
from pyflink.table.udf import udf
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
from pyflink.table.window import Slide
from pyflink.table.window import Tumble
from pyflink.table.udf import ScalarFunction , TableFunction, udf
from pyflink.table import DataTypes
from datetime import datetime, timedelta
import time
import json
import requests
from pyflink.table.types import RowType, DataTypes
from beta_distribution_drift_detector.bdddc import BDDDC
import numpy as np




env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env_settings = (
    EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
)
st_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
st_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)
dir_kafka_sql_connect = os.path.join(os.path.abspath(os.path.dirname(__file__)),
                                     'flink-sql-connector-kafka_2.11-1.11.2.jar')
st_env.get_config().get_configuration().set_string("pipeline.jars", 'file://' + dir_kafka_sql_connect)
dir_requirements  =  '/requirements. txt'
dir_cache  = '/cached_dir/'
if  os.path.exists ( dir_requirements ) :
    if  os . path . exists ( dir_cache ):
        st_env.set_python_requirements ( dir_requirements , dir_cache ) 
    else :
        st_env.set_python_requirements ( dir_requirements)
        
        
        
class AccuracyPrecisionTuple:
    def __init__(self, accuracy, precision):
        self.accuracy = accuracy
        self.precision = precision

        
        
        
class Model(ScalarFunction):
    def __init__(self):
        
        # load the model
        self.model_name = 'online_ml_model'
        self.redis_params = dict(host='localhost', password='redis_password', port=6379, db=0)
        self.clf = self.load_model()

        self.interval_dump_seconds = 30  # Model save interval is 30 seconds
        self.last_dump_time = datetime.now() # last time the model was saved

        # domain of y
        self.classes = list(range(2))
        self.metric_counter = None # The number of all samples from the beginning of the job to the present
        self.metric_predict_acc = 0 # model prediction accuracy (evaluated with the past 10 samples)
        self.metric_distribution_y = None # distribution of label y
        self.metric_total_60_sec = None # number of samples trained in the past 10 seconds
        self.metric_right_60_sec = None # The number of correctly predicted samples in the past 10 seconds
        
        self.drift_check_interval =timedelta(seconds=60)
        self.drift_detector=BDDDC()
         # ... existing code ...
        self.accuracy = 0.0
        self.precision = 0.0   
        

    def open(self, function_context):
        """
        Access the indicator system and register the indicators so that you can view the operation of the algorithm in real time on the webui (localhost:8081).
        :param function_context:
        :return:
        """
        metric_group = function_context.get_metric_group().add_group("online_ml")

        self.metric_counter = metric_group.counter('sample_count') # number of trained samples
        metric_group.gauge("prediction_acc", lambda: int(self.metric_predict_acc * 100))
        self.metric_distribution_y = metric_group.distribution("metric_distribution_y")
        self.metric_total_60_sec = metric_group.meter("total_60_sec", time_span_in_seconds=60)
        self.metric_right_60_sec = metric_group.meter("right_60_sec", time_span_in_seconds=60)
        
 

        
       
    def eval(self, attrib1, attrib2, attrib3, label):
        """
         Model training
        :param attrib1: attribute 1 of the input data, float
        :param attrib2: attribute 2 of the input data, float
        :param attrib3: attribute 3 of the input data, float
        :param label: label of the input data, int
        :return: None
        """
        # Online learning, that is, training models one by one
        x = [attrib1, attrib2, attrib3]
        y = [label]
        self.clf.partial_fit([x], [y], classes=self.classes)

        # Predict the current
        y_pred = self.clf.predict([x])[0]

        # Update drift detector
        self.drift_detector.add_element(np.array(y_pred, ndmin = 1), np.array(label, ndmin = 1))

        # Check for concept drift
        if self.drift_detector.detected_change():
            print("Concept drift detected at time ", time.time())
            self.clf.partial_fit([x], [y], classes=self.classes)
            self.drift_detector=BDDDC()

        # update metrics
        self.metric_counter.inc(1) # Number of trained samples + 1
        self.metric_total_60_sec.mark_event(1) # Update the Meter: + 1 for a piece of data, and count the sample size within 10 seconds

        # calculate TP, TN, FP, FN values
        if y_pred == label:
            self.metric_right_60_sec.mark_event(1) # Update the Meter: + 1 for a piece of data, and count the sample size within 10 seconds
            self.metric_predict_acc = self.metric_right_60_sec.get_count() / self.metric_total_60_sec.get_count() # Accuracy _ _
            self.metric_distribution_y.update(label) # update distribution Distribution: the number of trained samples + 1
            if y_pred == 1:
                TP = 1
                TN = 0
                FP = 0
                FN = 0
            else:
                TP = 0
                TN = 1
                FP = 0
                FN = 0
        else:
            self.metric_distribution_y.update(label) # update distribution Distribution: the number of trained samples + 1
            if y_pred == 1:
                TP = 0
                TN = 0
                FP = 1
                FN = 0
            else:
                TP = 0
                TN = 0
                FP = 0
                FN = 1

        # calculate precision, recall, F1-score, and accuracy
        precision = TP / (TP + FP) if TP + FP != 0 else 0
        recall = TP / (TP + FN) if TP + FN != 0 else 0
        f1_score = 2 * precision * recall / (precision + recall) if precision + recall != 0 else 0
        accuracy = self.metric_predict_acc

         # Calculate accuracy
        accuracy = self.metric_right_60_sec.get_count() / self.metric_total_60_sec.get_count()
     
        # Update self.accuracy and self.precision
        self.accuracy = accuracy  # Replace with your actual computation
        self.precision =precision  # Replace with your actual computation

        # Return the values as a tuple
        return AccuracyPrecisionTuple(accuracy,precision)
        
        
    def load_model(self):
        """
        Load the model, if there is a model in redis, it will be loaded from redis first,
        otherwise a new model will be initialized
        :return:
        """
        import redis
        import pickle
        import logging
        from sklearn.linear_model import SGDClassifier

        r = redis.StrictRedis(**self.redis_params)
        clf = None

        try:
            clf = pickle.loads(r.get(self.model_name))
        except TypeError:
            logging.info('There is no model with the specified name in Redis, so a new model is initialized')
        except (redis.exceptions.RedisError, TypeError, Exception):
            logging.warning('Redis encountered an exception, so initialize a new model')
        finally:
            clf = clf or SGDClassifier(alpha=0.01, loss='log', penalty='l1')

    return clf

    def  dump_model ( self ):
     """
    Saves the model when the specified time interval has elapsed since the last attempt to save the model
    :return:
    """
     import  pickle
     import  redis
     import  logging

     if ( datetime . now () -  self . last_dump_time ). seconds  >=  self . interval_dump_seconds :
         r  =  redis . StrictRedis ( ** self . redis_params )

         try :
             r . set ( self . model_name , pickle . dumps ( self . clf , protocol = pickle . HIGHEST_PROTOCOL ))
         except ( redis . exceptions . RedisError , TypeError , Exception ):
             logging.warning ( ' Unable to connect to Redis to store model data' )

         self . last_dump_time  =  datetime . now () # Whether the update is successful or not, update the save time
  



# DataTypes.FIELD("accuracy", DataTypes.FLOAT())
# Define the result type for AccuracyPrecisionTuple
result_type = RowType([DataTypes.FIELD("accuracy", DataTypes.FLOAT()),
                      DataTypes.FIELD("precision", DataTypes.FLOAT())])


# model = udf(Model(), input_types=[DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.INT()],
#             result_type=DataTypes.FLOAT())

model = udf(Model(), input_types=[DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.INT()],
            result_type=result_type)
st_env.register_function('train_predict', model)

# model_udtf = udf(ModelUDTF(), input_types=[DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.INT()],
#                  result_type=DataTypes.ROW([DataTypes.FIELD("accuracy", DataTypes.FLOAT()), DataTypes.FIELD("precision", DataTypes.FLOAT())]))
# st_env.register_function('result_predict', model_udtf)




# Define source
st_env.execute_sql(
    f"""
    CREATE TABLE source (
        attrib1 DOUBLE,
        attrib2 DOUBLE,
        attrib3 DOUBLE,
        label INT,
        ts BIGINT,
        rowtime as TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')),
        WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND
    ) WITH (
        'connector' ='kafka-0.11',
        'topic' = '{os.environ["KAFKA_TOPIC"]}',
        'scan.startup.mode' = 'latest-offset',
        'properties.bootstrap.servers' = '{os.environ["KAFKA_HOST"]}',
        'properties.zookeeper.connect' = '{os.environ["ZOOKEEPER_HOST"]}',
        'properties.group.id' = '{os.environ["KAFKA_CONSUMER_GROUP"]}',
        'format' = 'json'
    )
    """
)


# # Define output sink
# st_env.execute_sql(
#     """
#     CREATE TABLE sink (
#         accuracy FLOAT,
#         precision FLOAT,
#         window_start TIMESTAMP(3),
#         window_end TIMESTAMP(3)
#      ) WITH (
#          'connector' = 'print',
#          'print-identifier' = 'Drift data: '
#      )
#     """
# )

# # Calculate average accuracy per window and insert into sink
# st_env.from_path("source") \
#     .window(Tumble.over("60.seconds").on("rowtime").alias("w")) \
#     .group_by("w") \
#     .select("AVG(train_predict(attrib1, attrib2, attrib3, label)) AS accuracy, w.start AS window_start, w.end AS window_end") \
#     .insert_into("sink")


# Define output sink
st_env.execute_sql(
    """
    CREATE TABLE sink (
        `accuracy` FLOAT,
        `precision` FLOAT,
        window_start TIMESTAMP(3),
        window_end TIMESTAMP(3)
     ) WITH (
         'connector' = 'print',
         'print-identifier' = 'Drift data: '
     )
    """
)

# Calculate average accuracy and precision per window and insert into sink
st_env.from_path("source") \
    .window(Tumble.over("60.seconds").on("rowtime").alias("w")) \
    .group_by("w") \
    .select("AVG(train_predict(attrib1, attrib2, attrib3, label).f0) AS accuracy, "
            "AVG(train_predict(attrib1, attrib2, attrib3, label).f1) AS precision, "
            "w.start AS window_start, w.end AS window_end") \
    .insert_into("sink")

# Execute the job
st_env.execute("PyFlink job")






ModuleNotFoundError: No module named 'pyflink'