# Q1. Redpanda version

Now let's find out the version of redpandas. 

For that, check the output of the command `rpk help` _inside the container_. The name of the container is `redpanda-1`.

Find out what you need to execute based on the `help` output.

What's the version, based on the output of the command you executed? (copy the entire version)

In [1]:
!docker exec -it redpanda-1 rpk version

Version:     v24.2.18
Git ref:     f9a22d4430
Build date:  2025-02-14T12:59:41Z
OS/Arch:     linux/arm64
Go version:  go1.23.1

Redpanda Cluster
  node-1  v24.2.18 - f9a22d443087b824803638623d6b7492ec8221f9


# Q2. Creating a topic

Before we can send data to the redpanda server, we
need to create a topic. We do it also with the `rpk`
command we used previously for figuring out the version of 
redpandas.

Read the output of `help` and based on it, create a topic with name `green-trips` 

What's the output of the command for creating a topic? Include the entire output in your answer.

In [2]:
!docker exec -it redpanda-1 rpk topic create green-trips

TOPIC        STATUS
green-trips  OK


# Q3. Connecting to the Kafka server

We need to make sure we can connect to the server, so
later we can send some data to its topics

First, let's install the kafka connector (up to you if you
want to have a separate virtual environment for that)

```bash
pip install kafka-python
```

You can start a jupyter notebook in your solution folder or
create a script

Let's try to connect to our server:

```python
import json

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()
```

Provided that you can connect to the server, what's the output
of the last command?

In [30]:
import json

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

True

# Q4. Sending the Trip Data

Now we need to send the data to the `green-trips` topic

Read the data, and keep only these columns:

* `'lpep_pickup_datetime',`
* `'lpep_dropoff_datetime',`
* `'PULocationID',`
* `'DOLocationID',`
* `'passenger_count',`
* `'trip_distance',`
* `'tip_amount'`

Now send all the data using this code:

```python
producer.send(topic_name, value=message)
```

For each row (`message`) in the dataset. In this case, `message`
is a dictionary.

After sending all the messages, flush the data:

```python
producer.flush()
```

Use `from time import time` to see the total time 

```python
from time import time

t0 = time()

# ... your code

t1 = time()
took = t1 - t0
```

How much time did it take to send the entire dataset and flush? 

In [31]:
import pandas as pd
from time import time

df = pd.read_csv("green_tripdata_2019-10.csv.gz")
df = df[['lpep_pickup_datetime', 'lpep_dropoff_datetime', 'PULocationID', 'DOLocationID', 'passenger_count', 'trip_distance', 'tip_amount']]

t0 = time()

for row in df.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    producer.send('green-trips', value=row_dict)

producer.flush()

print(f"Took {(time()-t0):.2f} seconds")

  df = pd.read_csv("green_tripdata_2019-10.csv.gz")


Took 25.88 seconds


# Q5. Build a Sessionization Window

Now we have the data in the Kafka stream. It's time to process it.

* Copy `aggregation_job.py` and rename it to `session_job.py`
* Have it read from `green-trips` fixing the schema
* Use a [session window](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/) with a gap of 5 minutes
* Use `lpep_dropoff_datetime` time as your watermark with a 5 second tolerance
* Which pickup and drop off locations have the longest unbroken streak of taxi trips?

In [None]:
!pgcli -h localhost -p 5432 -u postgres -d postgres

In [None]:
!docker-compose exec jobmanager ./bin/flink run -py /opt/src/session_job.py

```sql
CREATE TABLE trip_sessions (
            pickup_location_id INT,
            dropoff_location_id INT,
            session_start TIMESTAMP(3),
            session_end TIMESTAMP(3),
            session_duration_seconds BIGINT,
            trip_count BIGINT,
            PRIMARY KEY (pickup_location_id, dropoff_location_id, session_start)
         ) 
```

```sql
SELECT * FROM trip_sessions
ORDER BY session_duration_seconds DESC
LIMIT 5;
```