In [1]:
import json
import os

from pyflink.common import WatermarkStrategy
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import (
    KafkaOffsetsInitializer, KafkaRecordSerializationSchema, KafkaSink,
    KafkaSource)

In [2]:

def print_features(record):
    """
    Merged feature columns into one single data column
    and keep other columns unchanged.
    """
    # Convert Row to dict
    record = json.loads(record)
    data = record["payload"]["after"]
    print(data)
    return json.dumps(data)




In [3]:
def filter_features(record):
    """
    Remove unnecessary columns
    """
    record = json.loads(record)
    data = {}
    for key in record:
        if key != "created" and key != "content":
            data[key] = record[key]

    return json.dumps({"created": record["created"], "data": data})




In [4]:
def check_record_keys(record):
    """
    Check messages have "payload" key or not
    """
    # Convert Row to dict
    record = json.loads(record)
    keys = list(record.keys())
    if "payload" in keys:
        return True

    print("record: ", list(record.keys()))
    return True





In [5]:
env = StreamExecutionEnvironment.get_execution_environment()

In [6]:
# define lại jar files path
JARS_PATH = "/Users/dangthiphuongthao/Documents/Apps/jars"

In [7]:

# The other commented lines are for Avro format
env.add_jars(
    f"file://{JARS_PATH}/flink-connector-kafka-4.0.0-2.0.jar",
    # f"file://{JARS_PATH}/flink-avro-1.17.1.jar",
    # f"file://{JARS_PATH}/flink-avro-confluent-registry-1.17.1.jar",
    # f"file://{JARS_PATH}/avro-1.11.1.jar",
    # f"file://{JARS_PATH}/jackson-databind-2.14.2.jar",
    # f"file://{JARS_PATH}/jackson-core-2.14.2.jar",
    # f"file://{JARS_PATH}/jackson-annotations-2.14.2.jar",
    f"file://{JARS_PATH}/kafka-clients-3.6.0.jar",
    # f"file://{JARS_PATH}/kafka-schema-registry-client-5.3.0.jar",
)

# Avro will need it for validation from the schema registry
# schema_path = "./data_ingestion/kafka_producer/avro_schemas/schema_0.avsc"
# with open(schema_path) as f:
#     schema = f.read()




In [8]:
from pyflink.common import Configuration

config = Configuration()
config.set_string(
    "pipeline.jars",
    ";".join([
        f"file://{JARS_PATH}/flink-connector-kafka-4.0.0-2.0.jar",
        f"file://{JARS_PATH}/kafka-clients-3.6.0.jar"
        # add thêm nếu cần nữa
    ])
)
env = StreamExecutionEnvironment.get_execution_environment(configuration=config)


In [9]:
print(f"file://{JARS_PATH}/flink-connector-kafka-4.0.0-2.0.jar")

file:///Users/dangthiphuongthao/Documents/Apps/jars/flink-connector-kafka-4.0.0-2.0.jar


In [10]:
# Define the source to take data from
source = (
    KafkaSource.builder()
    .set_bootstrap_servers("localhost:9092")
    .set_topics("diabetes_cdc.public.diabetes_new")
    .set_group_id("diabetes-consumer-group")
    .set_starting_offsets(KafkaOffsetsInitializer.latest())
    .set_value_only_deserializer(SimpleStringSchema())
    .build()
)


In [11]:
# Define the sink to save the processed data to
sink = (
    KafkaSink.builder()
    .set_bootstrap_servers("http://localhost:9092")
    .set_record_serializer(
        KafkaRecordSerializationSchema.builder()
        .set_topic("diabetes_out.public.sink_diabetes")
        .set_value_serialization_schema(SimpleStringSchema())
        .build()
    )
    .build()
)

# No sink, just print out to the terminal
# env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source").filter(
#     filter_small_features
# ).map(merge_features).print()




In [14]:
# ==== Pipeline xử lý dữ liệu ====
(
    env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
    .filter(check_record_keys)
    .map(print_features, output_type=Types.STRING())
    .map(filter_features, output_type=Types.STRING())
    .sink_to(sink)
)

# ==== Thực thi Flink job ====
env.execute("flink_datastream_demo")
print("Your job has been started successfully!")


2025-05-10T09:58:59.181949Z Source Data Fetcher for Source: Kafka Source -> Filter, Map, Map -> Sink: Writer -> Sink: Committer (8/8)#0 ERROR Unable to write to stream /opt/anaconda3/envs/flink-env/lib/python3.10/site-packages/pyflink/log/flink-nguyenthiphuongthao-python-Thaos-mac.lan.log for appender FileAppender org.apache.logging.log4j.core.appender.AppenderLoggingException: Error writing to stream /opt/anaconda3/envs/flink-env/lib/python3.10/site-packages/pyflink/log/flink-nguyenthiphuongthao-python-Thaos-mac.lan.log
	at org.apache.logging.log4j.core.appender.OutputStreamManager.writeToDestination(OutputStreamManager.java:265)
	at org.apache.logging.log4j.core.appender.FileManager.writeToDestination(FileManager.java:335)
	at org.apache.logging.log4j.core.appender.OutputStreamManager.flushBuffer(OutputStreamManager.java:296)
	at org.apache.logging.log4j.core.appender.OutputStreamManager.flush(OutputStreamManager.java:307)
	at org.apache.logging.log4j.core.appender.AbstractOutputStre

KeyboardInterrupt: 