# Homework


In this homework, we're going to learn about streaming with PyFlink.

Instead of Kafka, we will use Red Panda, which is a drop-in
replacement for Kafka. It implements the same interface, 
so we can use the Kafka library for Python for communicating
with it, as well as use the Kafka connector in PyFlink.

For this homework we will be using the Taxi data:
- Green 2019-10 data from [here](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz)


## Setup

We need:

- Red Panda
- Flink Job Manager
- Flink Task Manager
- Postgres

It's the same setup as in the [pyflink module](../../../06-streaming/pyflink/), so go there and start docker-compose:

```bash
cd ../../../06-streaming/pyflink/
docker-compose up
```

(Add `-d` if you want to run in detached mode)

Visit http://localhost:8081 to see the Flink Job Manager

Connect to Postgres with pgcli, pg-admin, [DBeaver](https://dbeaver.io/) or any other tool.

The connection credentials are:

- Username `postgres`
- Password `postgres`
- Database `postgres`
- Host `localhost`
- Port `5432`

With pgcli, you'll need to run this to connect:

```bash
pgcli -h localhost -p 5432 -u postgres -d postgres
```

Run these query to create the Postgres landing zone for the first events and windows:

```sql 
CREATE TABLE processed_events (
    test_data INTEGER,
    event_timestamp TIMESTAMP
);

CREATE TABLE processed_events_aggregated (
    event_hour TIMESTAMP,
    test_data INTEGER,
    num_hits INTEGER 
);
```

## Question 1: Redpanda version


Now let's find out the version of redpandas.

For that, check the output of the command rpk help inside the container. The name of the container is redpanda-1.

Find out what you need to execute based on the help output.

What's the version, based on the output of the command you executed? (copy the entire version)

In [8]:
!docker ps -a | grep redpanda

c3b3ea31e215   redpandadata/redpanda:v24.2.18        "/entrypoint.sh redp…"   43 minutes ago   Up 43 minutes            0.0.0.0:8082->8082/tcp, [::]:8082->8082/tcp, 8081/tcp, 0.0.0.0:9092->9092/tcp, [::]:9092->9092/tcp, 0.0.0.0:28082->28082/tcp, [::]:28082->28082/tcp, 9644/tcp, 0.0.0.0:29092->29092/tcp, [::]:29092->29092/tcp   redpanda-1


In [15]:
CONTAINER_ID="c3b3ea31e215"

In [16]:
!docker exec "{CONTAINER_ID}" rpk help 

rpk is the Redpanda CLI & toolbox

Usage:
  rpk [flags]
  rpk [command]

Available Commands:
  cloud       Interact with Redpanda cloud
  cluster     Interact with a Redpanda cluster
  connect     A stream processor for mundane tasks - https://docs.redpanda.com/redpanda-connect
  container   Manage a local container cluster
  debug       Debug the local Redpanda process
  generate    Generate a configuration template for related services
  group       Describe, list, and delete consumer groups and manage their offsets
  help        Help about any command
  iotune      Measure filesystem performance and create IO configuration file
  plugin      List, download, update, and remove rpk plugins
  profile     Manage rpk profiles
  redpanda    Interact with a local Redpanda process
  registry    Commands to interact with the schema registry
  security    Manage Redpanda security
  topic       Create, delete, produce to and consume from Redpanda topics
  transform   Develop, deploy and manage

In [17]:
!docker exec "{CONTAINER_ID}" rpk --version 

rpk version v24.2.18 (rev f9a22d4430)


## Question 2. Creating a topic


Before we can send data to the redpanda server, we need to create a topic. We do it also with the `rpk` command we used previously for figuring out the version of redpandas.

Read the output of `help` and based on it, create a topic with name `green-trips`

What's the output of the command for creating a topic? Include the entire output in your answer.

In [18]:
!docker exec "{CONTAINER_ID}" rpk topic --help

Create, delete, produce to and consume from Redpanda topics

Usage:
  rpk topic [flags]
  rpk topic [command]

Available Commands:
  add-partitions   Add partitions to existing topics
  alter-config     Set, delete, add, and remove key/value configs for a topic
  consume          Consume records from topics
  create           Create topics
  delete           Delete topics
  describe         Describe topics
  describe-storage Describe the topic storage status
  list             List topics, optionally listing specific topics
  produce          Produce records to a topic
  trim-prefix      Trim records from topics

Flags:
  -h, --help   Help for topic

Global Flags:
      --config string            Redpanda or rpk config file; default
                                 search paths are
                                 "/var/lib/redpanda/.config/rpk/rpk.yaml",
                                 $PWD/redpanda.yaml, and
                                 /etc/redpanda/redpanda.yaml
  -X, --config

In [19]:
!docker exec "{CONTAINER_ID}" rpk topic create --help

Create topics.

All topics created with this command will have the same number of partitions,
replication factor, and key/value configs.

For example,

	create -c cleanup.policy=compact -r 3 -p 20 foo bar

will create two topics, foo and bar, each with 20 partitions, 3 replicas, and
the cleanup.policy=compact config option set.

Usage:
  rpk topic create [TOPICS...] [flags]

Flags:
  -d, --dry                        Dry run: validate the topic creation
                                   request; do not create topics
  -h, --help                       Help for create
  -p, --partitions int32           Number of partitions to create per
                                   topic; -1 defaults to the cluster's
                                   default_topic_partitions (default -1)
  -r, --replicas int16             Replication factor (must be odd); -1
                                   defaults to the cluster's
                                   default_topic_replications (default -1)
  -c,

In [25]:
#!docker exec 32d98be1b7df rpk topic delete green-trips

In [20]:
!docker exec "{CONTAINER_ID}" rpk topic create green-trips

TOPIC        STATUS
green-trips  OK


## Question 3. Connecting to the Kafka server



We need to make sure we can connect to the server, so
later we can send some data to its topics

First, let's install the kafka connector (up to you if you
want to have a separate virtual environment for that)

```bash
pip install kafka-python
```

You can start a jupyter notebook in your solution folder or
create a script

Let's try to connect to our server:

```python
import json

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()
```

Provided that you can connect to the server, what's the output
of the last command?

In [1]:
import json
import time 

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

True

## Question 4: Sending the Trip Data


Now we need to send the data to the `green-trips` topic

Read the data, and keep only these columns:

* `'lpep_pickup_datetime',`
* `'lpep_dropoff_datetime',`
* `'PULocationID',`
* `'DOLocationID',`
* `'passenger_count',`
* `'trip_distance',`
* `'tip_amount'`

Now send all the data using this code:

```python
producer.send(topic_name, value=message)
```

For each row (`message`) in the dataset. In this case, `message`
is a dictionary.

After sending all the messages, flush the data:

```python
producer.flush()
```

Use `from time import time` to see the total time 

```python
from time import time

t0 = time()

# ... your code

t1 = time()
took = t1 - t0
```

How much time did it take to send the entire dataset and flush? 

In [12]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz

--2025-03-18 01:58:25--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/ea580e9e-555c-4bd0-ae73-43051d8e7c0b?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=releaseassetproduction%2F20250318%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250318T005826Z&X-Amz-Expires=300&X-Amz-Signature=79121dbc9d5f3670ffb269f5d46142905b10af04079cbd9674d4720dbb392776&X-Amz-SignedHeaders=host&response-content-disposition=attachment%3B%20filename%3Dgreen_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2025-03-18 01:58:26--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/ea580e9e-555c-4bd0-ae73-43051d8e7c0b?X-Amz-A

In [22]:
!ls -lh

total 8,1M
-rw-rw-r-- 1 aztleclan aztleclan 7,9M jul 14  2022 green_tripdata_2019-10.csv.gz
-rw-rw-r-- 1 aztleclan aztleclan 145K mar 19 02:48 Homework.ipynb
-rw-rw-r-- 1 aztleclan aztleclan 3,5K mar 18 02:40 session_job.py


In [23]:
import pandas as pd

# Leer el archivo CSV comprimido
df = pd.read_csv('green_tripdata_2019-10.csv.gz', compression='gzip')

  exec(code_obj, self.user_global_ns, self.user_ns)


In [24]:
# Mostrar las primeras filas del DataFrame
df.head(5)

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2.0,2019-10-01 00:26:02,2019-10-01 00:39:58,N,1.0,112,196,1.0,5.88,18.0,0.5,0.5,0.0,0.0,,0.3,19.3,2.0,1.0,0.0
1,1.0,2019-10-01 00:18:11,2019-10-01 00:22:38,N,1.0,43,263,1.0,0.8,5.0,3.25,0.5,0.0,0.0,,0.3,9.05,2.0,1.0,0.0
2,1.0,2019-10-01 00:09:31,2019-10-01 00:24:47,N,1.0,255,228,2.0,7.5,21.5,0.5,0.5,0.0,0.0,,0.3,22.8,2.0,1.0,0.0
3,1.0,2019-10-01 00:37:40,2019-10-01 00:41:49,N,1.0,181,181,1.0,0.9,5.5,0.5,0.5,0.0,0.0,,0.3,6.8,2.0,1.0,0.0
4,2.0,2019-10-01 00:08:13,2019-10-01 00:17:56,N,1.0,97,188,1.0,2.52,10.0,0.5,0.5,2.26,0.0,,0.3,13.56,1.0,1.0,0.0


In [25]:
from time import time

t0 = time()

topic_name = 'green-trips'

for index, row in df.iterrows():
    msg = {
        'lpep_pickup_datetime': row['lpep_pickup_datetime'],
        'lpep_dropoff_datetime': row['lpep_dropoff_datetime'],
        'PULocationID': row['PULocationID'],
        'DOLocationID': row['DOLocationID'],
        'passenger_count': row['passenger_count'],
        'trip_distance': row['trip_distance'],
        'tip_amount': row['tip_amount']
    }

    #print(f"Sending {msg}...")
    producer.send(topic_name, value=msg)
    
producer.flush()
producer.close()  

t1 = time()
took = t1 - t0

In [26]:
print(took)

88.2982075214386


In [27]:
from kafka import KafkaConsumer, TopicPartition

# Configura el consumidor para conectarse al broker Kafka
consumer = KafkaConsumer(
    bootstrap_servers=[server],  # Dirección del broker Kafka
    group_id=None,                 # No necesitamos un grupo de consumidores aquí
    enable_auto_commit=False,      # No necesitamos auto-commit
)

# Especifica el topic del cual deseas obtener el número de mensajes
topic = 'green-trips'

# Obtener las particiones del topic
partitions = consumer.partitions_for_topic(topic)

# Inicializar una variable para contar el número total de mensajes
total_messages = 0


# Iterar sobre las particiones
for partition in partitions:
    # Crear un objeto TopicPartition para cada partición
    tp = TopicPartition(topic, partition)
    
    # Obtener el offset más alto (último mensaje) para esta partición
    end_offset = consumer.end_offsets([tp])[tp]
    
    # Sumar el offset más alto de esta partición al total
    total_messages += end_offset

# Cerrar el consumidor
consumer.close()

# Mostrar el número total de mensajes en el topic
print(f'Número total de mensajes en el topic "{topic}": {total_messages}')

Número total de mensajes en el topic "green-trips": 476386



## Question 5: Build a Sessionization Window (2 points)



Now we have the data in the Kafka stream. It's time to process it.

* Copy `aggregation_job.py` and rename it to `session_job.py`
* Have it read from `green-trips` fixing the schema
* Use a [session window](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/) with a gap of 5 minutes
* Use `lpep_dropoff_datetime` time as your watermark with a 5 second tolerance
* Which pickup and drop off locations have the longest unbroken streak of taxi trips?

In [44]:
#!pip install apache-flink==1.16.0

```bash
CREATE TABLE aggregated_trips (
    PULocationID INTEGER,
    DOLocationID INTEGER,
    session_start TIMESTAMP(3),
    session_end TIMESTAMP(3),
    num_trips BIGINT,
    session_duration BIGINT ,
    PRIMARY KEY (PULocationID, DOLocationID, session_start)
);
```


```bash
docker compose exec jobmanager ./bin/flink run -py /opt/src/job/session_job.py --pyFiles /opt/src -d
```


![Screenshot from 2025-03-19 02-55-27.png](attachment:82da9d7c-ba79-4d6c-a4aa-9ebcc7a0fccc.png)

locations 74 (East Harlem North) and 75 (East Harlem South)
