In [1]:
import json
import time

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'
producer = KafkaProducer(
    bootstrap_servers = [server],
    value_serializer = json_serializer
)

producer.bootstrap_connected()

True

In [9]:
# Creating the topic green-trips
from kafka.admin import KafkaAdminClient, NewTopic

admin_client = KafkaAdminClient(
    bootstrap_servers=[server],
    client_id='test'
)

topic_name = 'green-trips'

# Check if the topic exists
if topic_name in admin_client.list_topics():
    print(f"Topic '{topic_name}' already exists.")
else:
    # Create the topic
    topic_list = [NewTopic(name=topic_name, num_partitions=1, replication_factor=1)]
    admin_client.create_topics(new_topics=topic_list, validate_only=False)
    print(f"Topic '{topic_name}' created successfully.")


Topic 'green-trips' already exists.


In [2]:
from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(
    bootstrap_servers=[server],
    client_id='test'
)
# Show all topics in kafka
topics = admin_client.list_topics()
print("Topics in Kafka:", topics)

Topics in Kafka: ['green-data', '__consumer_offsets', 'green-trips']


In [None]:
import csv
import json
import time
import logging
from kafka import KafkaProducer
from tqdm import tqdm

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def main():
    start_time = time.time()  # Start time

    # Create a Kafka producer
    producer = KafkaProducer(
        bootstrap_servers=server,
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )

    csv_file = 'green_tripdata_2019-10.csv'  # change to your CSV file path if needed

    with open(csv_file, 'r', newline='', encoding='utf-8') as file:
        reader = csv.DictReader(file)
        total_rows = sum(1 for row in reader)
        file.seek(0)  # Reset file pointer to the beginning
        next(reader)  # Skip header row

        for row in tqdm(reader, total=total_rows, desc="Sending data to Kafka"):
            # Each row will be a dictionary keyed by the CSV headers
            # Send data to Kafka topic "green-data"
            producer.send('green-trips', value=row)

    # Make sure any remaining messages are delivered
    producer.flush()
    producer.close()

    end_time = time.time()  # End time
    logging.info(f"Time taken: {round(end_time - start_time)} seconds")

if __name__ == "__main__":
    main()


In [None]:
# from kafka import KafkaConsumer

# # Create a Kafka consumer
# consumer = KafkaConsumer(
#     'green-trips',
#     bootstrap_servers=[server],
#     auto_offset_reset='earliest',
#     enable_auto_commit=True,
#     group_id='my-group',
#     value_deserializer=lambda x: json.loads(x.decode('utf-8'))
# )

# # Read data from the topic
# for message in consumer:
#     print(f"Received message: {message.value}")

In [None]:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.common.time import Duration

def create_events_aggregated_sink(t_env):
    table_name = 'processed_events_aggregated'
    sink_ddl = f"""
        CREATE TABLE {table_name} (
            window_start TIMESTAMP(3),
            window_end TIMESTAMP(3),
            PULocationID INT,
            DOLocationID INT,
            num_hits BIGINT,
            PRIMARY KEY (window_start, PULocationID, DOLocationID) NOT ENFORCED
        ) WITH (
            'connector' = 'jdbc',
            'url' = 'jdbc:postgresql://postgres:5432/postgres',
            'table-name' = '{table_name}',
            'username' = 'postgres',
            'password' = 'postgres',
            'driver' = 'org.postgresql.Driver'
        );
        """
    t_env.execute_sql(sink_ddl)
    return table_name

def create_events_source_kafka(t_env):
    table_name = "green_trips"
    source_ddl = f"""
        CREATE TABLE {table_name} (
            lpep_pickup_datetime TIMESTAMP(3),
            lpep_dropoff_datetime TIMESTAMP(3),
            PULocationID INT,
            DOLocationID INT,
            passenger_count INT,
            trip_distance FLOAT,
            tip_amount FLOAT,
            event_watermark AS lpep_dropoff_datetime,
            WATERMARK for event_watermark as event_watermark - INTERVAL '5' SECOND
        ) WITH (
            'connector' = 'kafka',
            'properties.bootstrap.servers' = 'redpanda-1:29092',
            'topic' = 'green-trips',
            'scan.startup.mode' = 'earliest-offset',
            'properties.auto.offset.reset' = 'earliest',
            'format' = 'json'
        );
        """
    t_env.execute_sql(source_ddl)
    return table_name

def log_aggregation():
    # Set up the execution environment
    env = StreamExecutionEnvironment.get_execution_environment()
    env.enable_checkpointing(10 * 1000)
    env.set_parallelism(3)

    # Set up the table environment
    settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
    t_env = StreamTableEnvironment.create(env, environment_settings=settings)

    try:
        # Create Kafka table
        source_table = create_events_source_kafka(t_env)
        aggregated_table = create_events_aggregated_sink(t_env)

        # Execute the SQL query
        t_env.execute_sql(f"""
        INSERT INTO {aggregated_table}
        SELECT
            window_start,
            window_end,
            PULocationID,
            DOLocationID,
            COUNT(*) AS num_hits
        FROM TABLE(
            SESSION(TABLE {source_table}, DESCRIPTOR(event_watermark), INTERVAL '5' MINUTE)
        )
        GROUP BY window_start, window_end, PULocationID, DOLocationID;
        """).wait()

    except Exception as e:
        print("Writing records from Kafka to JDBC failed:", str(e))

if __name__ == '__main__':
    log_aggregation()