In [226]:
import os
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, StreamTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf

# Source and Sink FileSystem StreamTable Environment 

In [227]:
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, EnvironmentSettings

s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
s_env.set_parallelism(1)

# use blink table planner
st_env = StreamTableEnvironment \
    .create(s_env, environment_settings=EnvironmentSettings
            .new_instance()
            .in_streaming_mode()
            .use_blink_planner().build())

sinkdbsms_ddl = """CREATE TABLE MySinkDbSmsTable (
    smstext varchar,
    smstype varchar) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://root:my-secret-pw@mysql:3306/as42594n',
        'connector.table' = 'as42594n',
        'connector.driver' = 'com.mysql.jdbc.Driver',
        'connector.write.flush.interval' = '10',
        'connector.username' = 'root',
        'connector.password' = 'my-secret-pw')
"""

# Kafka source
source_kafka_ddl = """CREATE TABLE MySourceKafkaTable (word varchar) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'test',
    'connector.startup-mode' = 'latest-offset',
    'connector.properties.bootstrap.servers' = 'kafka:9092',
    'connector.properties.group.id' = 'test',
    'format.type' = 'csv'
        )
"""

In [228]:
st_env.execute_sql(source_kafka_ddl)

<pyflink.table.table_result.TableResult at 0x7f0da7542550>

In [229]:
st_env.execute_sql(sinkdbsms_ddl)

<pyflink.table.table_result.TableResult at 0x7f0da7512ba8>

# Define UDF function using PyFlink

Let's use the mlflow model to have abstraction over multiple frameworks 

In [230]:
from pyflink.table.expressions import call 
from pyflink.table.udf import ScalarFunction, udf

class SpamClassifier(ScalarFunction):
    def __init__(self, model_id):
        import mlflow.sklearn
        self.model = mlflow.sklearn.load_model(f"/opt/flink/notebooks/flink-with-ai/data/mlflow/mlruns/2/{model_id}/artifacts/model")

    def eval(self, s):
        res=self.model.predict([s])
        return res[0]
        #return s

spam_classifier = udf(SpamClassifier("64a89b0a6b7346498316bfae4c298535"), input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())

st_env.register_function("SPAM_CLASSIFIER",spam_classifier)

# Define pipeline using SQL

In [231]:
#st_env.execute_sql("INSERT INTO MySinkDbSmsTable SELECT word as smstext, SPAM_CLASSIFIER(word) as smstype FROM MySourceKafkaTable")

In [232]:
st_env.execute_sql("INSERT INTO MySinkDbSmsTable SELECT word as smstext, SPAM_CLASSIFIER(word) as smstype FROM MySourceKafkaTable")

<pyflink.table.table_result.TableResult at 0x7f0da7512358>

# Define pipeline using PyFlink

In [233]:
st_env.from_path('MySourceKafkaTable') \
    .select('word as smstext, SPAM_CLASSIFIER(word)') \
    .insert_into('MySinkDbSmsTable')

#st_env.execute("5-word_count-mysql")

In [234]:
table = st_env.from_path('MySinkDbSmsTable')
table.get_schema()

root
 |-- smstext: STRING
 |-- smstype: STRING

In [235]:
df=table.to_pandas()

In [236]:
df

Unnamed: 0,smstext,smstype
0,hello12,ham
1,I will come tomorrow di,ham


# Drop Sources and Sink definitions 

In [210]:
#st_env.execute_sql("DROP TABLE MySourceKafkaTable")
#st_env.execute_sql("DROP TABLE MySinkDbSmsTable")

<pyflink.table.table_result.TableResult at 0x7f0da75c7a20>