In [5]:
from kafka import KafkaProducer
import pandas as pd
import json
from time import time
from kafka import KafkaConsumer
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, DataTypes, TableEnvironment, StreamTableEnvironment
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.common.time import Duration

from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema

In [None]:
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
    s.bind(("0.0.0.0", 9092))
    print("Port 9092 is available.")
except Exception as e:
    print(f"Port 9092 is unavailable: {e}")
s.close()


In [None]:
def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

In [None]:
producer.bootstrap_connected()

In [6]:
df = pd.read_csv("green_tripdata_2019-10.csv",usecols=[
    'lpep_pickup_datetime',
    'lpep_dropoff_datetime',
    'PULocationID',
    'DOLocationID',
    'passenger_count',
    'trip_distance',
    'tip_amount']
)
df

Unnamed: 0,lpep_pickup_datetime,lpep_dropoff_datetime,PULocationID,DOLocationID,passenger_count,trip_distance,tip_amount
0,2019-10-01 00:26:02,2019-10-01 00:39:58,112,196,1.0,5.88,0.00
1,2019-10-01 00:18:11,2019-10-01 00:22:38,43,263,1.0,0.80,0.00
2,2019-10-01 00:09:31,2019-10-01 00:24:47,255,228,2.0,7.50,0.00
3,2019-10-01 00:37:40,2019-10-01 00:41:49,181,181,1.0,0.90,0.00
4,2019-10-01 00:08:13,2019-10-01 00:17:56,97,188,1.0,2.52,2.26
...,...,...,...,...,...,...,...
476381,2019-10-31 23:30:00,2019-11-01 00:00:00,65,102,,7.04,0.00
476382,2019-10-31 23:03:00,2019-10-31 23:24:00,129,136,,0.00,0.00
476383,2019-10-31 23:02:00,2019-10-31 23:23:00,61,222,,3.90,0.00
476384,2019-10-31 23:42:00,2019-10-31 23:56:00,76,39,,3.08,0.00


In [None]:
df.dtypes

In [None]:
topic_name = 'green-trips'
message = df.to_dict(orient='records')

In [None]:
message

In [None]:
producer.send(topic_name, value=message)

In [None]:
producer.flush()

In [None]:
t0 = time()

producer.send(topic_name, value=message)
producer.flush()

t1 = time()
took = t1 - t0
print('It took: '+str(took))

QUESTION 5

In [None]:
def create_events_source_kafka(t_env):
    table_name = "green_trips_rpk"
    source_ddl = f"""
        CREATE or replace TABLE {table_name} (
            lpep_pickup_datetime TIMESTAMP(3),
            lpep_dropoff_datetime TIMESTAMP(3),
            PULocationID INT,
            DOLocationID INT,
            passenger_count DOUBLE,
            trip_distance DOUBLE,
            tip_amount DOUBLE,
            event_watermark AS TO_TIMESTAMP_LTZ(lpep_dropoff_datetime, 3),
            WATERMARK for event_watermark as event_watermark - INTERVAL '5' SECOND
        ) WITH (
            'connector' = 'kafka',
            'properties.bootstrap.servers' = 'redpanda-1:29092',
            'topic' = 'test-topic',
            'scan.startup.mode' = 'earliest-offset',
            'properties.auto.offset.reset' = 'earliest',
            'format' = 'json'
        );
        """
    t_env.execute_sql(source_ddl)
    return table_name


In [None]:
def create_events_aggregated_sink(t_env):
    table_name = 'green_trips_agg'
    sink_ddl = f"""
        CREATE or replace TABLE {table_name} (
            lpep_pickup_datetime TIMESTAMP(3),
            lpep_dropoff_datetime TIMESTAMP(3),
            PULocationID INT,
            DOLocationID INT,
            passenger_count DOUBLE,
            trip_distance DOUBLE,
            tip_amount DOUBLE,
            event_watermark
        ) WITH (
            'connector' = 'jdbc',
            'url' = 'jdbc:postgresql://postgres:5432/postgres',
            'table-name' = '{table_name}',
            'username' = 'postgres',
            'password' = 'postgres',
            'driver' = 'org.postgresql.Driver'
        );
        """
    t_env.execute_sql(sink_ddl)
    return table_name

In [None]:
producer.bootstrap_connected()

In [None]:
env = StreamExecutionEnvironment.get_execution_environment()

In [None]:
# Set up the execution environment
env.enable_checkpointing(10 * 1000)
env.set_parallelism(3)

# Set up the table environment
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=settings)

In [None]:
def log_aggregation():
    # Set up the execution environment
    env = StreamExecutionEnvironment.get_execution_environment()
    env.enable_checkpointing(10 * 1000)
    env.set_parallelism(3)

    # Set up the table environment
    settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
    t_env = StreamTableEnvironment.create(env, environment_settings=settings)

    watermark_strategy = (
        WatermarkStrategy
        .for_bounded_out_of_orderness(Duration.of_seconds(5))
        .with_timestamp_assigner(
            # This lambda is your timestamp assigner:
            #   event -> The data record
            #   timestamp -> The previously assigned (or default) timestamp
            lambda event, timestamp: event[2]  # We treat the second tuple element as the event-time (ms).
        )
    )
    try:
        # Create Kafka table
        source_table = create_events_source_kafka(t_env)
        aggregated_table = create_events_aggregated_sink(t_env)

        t_env.execute_sql(f"""
        INSERT INTO {aggregated_table}
        SELECT
            lpep_pickup_datetime,
            lpep_dropoff_datetime,
            PULocationID,
            DOLocationID,
            passenger_count,
            trip_distance,
            tip_amount,
            event_watermark FROM TABLE(
            TUMBLE(TABLE {source_table}, DESCRIPTOR(event_watermark), INTERVAL '1' MINUTE)
        );
        
        """).wait()

    except Exception as e:
        print("Writing records from Kafka to JDBC failed:", str(e))

In [None]:
log_aggregation()

In [10]:
import sqlite3

conn = sqlite3.connect(":memory:")

df.to_sql("green_trips", conn, index=False, if_exists="replace")

476386

In [None]:

# Query the database
query = """
    select day( from green_trips limit 10
"""
result = pd.read_sql_query(query, conn)
result

Unnamed: 0,lpep_pickup_datetime,lpep_dropoff_datetime,PULocationID,DOLocationID,passenger_count,trip_distance,tip_amount
0,2019-10-01 00:26:02,2019-10-01 00:39:58,112,196,1.0,5.88,0.0
1,2019-10-01 00:18:11,2019-10-01 00:22:38,43,263,1.0,0.8,0.0
2,2019-10-01 00:09:31,2019-10-01 00:24:47,255,228,2.0,7.5,0.0
3,2019-10-01 00:37:40,2019-10-01 00:41:49,181,181,1.0,0.9,0.0
4,2019-10-01 00:08:13,2019-10-01 00:17:56,97,188,1.0,2.52,2.26
5,2019-10-01 00:35:01,2019-10-01 00:43:40,65,49,1.0,1.47,1.86
6,2019-10-01 00:28:09,2019-10-01 00:30:49,7,179,1.0,0.6,1.0
7,2019-10-01 00:28:26,2019-10-01 00:32:01,41,74,1.0,0.56,0.0
8,2019-10-01 00:14:01,2019-10-01 00:26:16,255,49,1.0,2.42,0.0
9,2019-10-01 00:03:03,2019-10-01 00:17:13,130,131,1.0,3.4,2.85
