- Name: Isaac Ndirangu Muturi
* Email: ndirangumuturi749@gmail.com

## Module 6 Homework

In this homework, we're going to extend Module 5 Homework and learn about streaming with PySpark.

Instead of Kafka, we will use Red Panda, which is a drop-in
replacement for Kafka.

Ensure you have the following set up (if you had done the previous homework and the module):

- Docker (see [module 1](https://github.com/DataTalksClub/data-engineering-zoomcamp/tree/main/01-docker-terraform))
- PySpark (see [module 5](https://github.com/DataTalksClub/data-engineering-zoomcamp/tree/main/05-batch/setup))

For this homework we will be using the files from Module 5 homework:

- Green 2019-10 data from [here](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz)



## Start Red Panda

Let's start redpanda in a docker container.

There's a `docker-compose.yml` file in the homework folder (taken from [here](https://github.com/redpanda-data-blog/2023-python-gsg/blob/main/docker-compose.yml))

Copy this file to your homework directory and run

```bash
docker-compose up
```

(Add `-d` if you want to run in detached mode)




## Question 1: Redpanda version

Now let's find out the version of redpandas.

For that, check the output of the command `rpk help` _inside the container_. The name of the container is `redpanda-1`.

Find out what you need to execute based on the `help` output.

What's the version, based on the output of the command you executed? (copy the entire version)




In [1]:
!!docker exec -i redpanda-1 rpk help

['rpk is the Redpanda CLI & toolbox',
 '',
 'Usage:',
 '  rpk [flags]',
 '  rpk [command]',
 '',
 'Available Commands:',
 '  acl         Manage ACLs and SASL users',
 '  cloud       Interact with Redpanda cloud',
 '  cluster     Interact with a Redpanda cluster',
 '  container   Manage a local container cluster',
 '  debug       Debug the local Redpanda process',
 '  generate    Generate a configuration template for related services',
 '  group       Describe, list, and delete consumer groups and manage their offsets',
 '  help        Help about any command',
 '  iotune      Measure filesystem performance and create IO configuration file',
 '  plugin      List, download, update, and remove rpk plugins',
 '  profile     Manage rpk profiles',
 '  redpanda    Interact with a local Redpanda process',
 '  topic       Create, delete, produce to and consume from Redpanda topics',
 '  version     Check the current version',
 '',
 'Flags:',
 '      --config string            Redpanda or rpk con

In [2]:
!docker exec -ti redpanda-1 rpk --version

rpk version v23.2.26 (rev 328d83a06e)


## Question 2. Creating a topic

Before we can send data to the redpanda server, we
need to create a topic. We do it also with the `rpk`
command we used previously for figuring out the version of
redpandas.

Read the output of `help` and based on it, create a topic with name `test-topic`

What's the output of the command for creating a topic? Include the entire output in your answer.




In [3]:
!docker exec -ti redpanda-1 rpk topic create test-topic

TOPIC       STATUS
test-topic  OK


## Question 3. Connecting to the Kafka server

We need to make sure we can connect to the server, so
later we can send some data to its topics

First, let's install the kafka connector (up to you if you
want to have a separate virtual environment for that)

```bash
pip install kafka-python
```

You can start a jupyter notebook in your solution folder or
create a script

Let's try to connect to our server:

```python
import json
import time

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()
```

Provided that you can connect to the server, what's the output
of the last command?




In [6]:
# !pip install kafka-python

Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[K     |████████████████████████████████| 246 kB 1.6 MB/s eta 0:00:01
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.0.2


In [4]:
import json
import time

from kafka import KafkaProducer

# This function will be used to serialize the message value before sending it to Kafka.
def json_serializer(data):
    return json.dumps(data).encode('utf-8')

# the address and port of the Kafka server (localhost:9092 in this case).
server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

# checks if the producer is successfully connected to the Kafka server specified in the bootstrap servers.
producer.bootstrap_connected()

True

## Question 4. Sending data to the stream

Now we're ready to send some test data:

```python
t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)

producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')
```

How much time did it take? Where did it spend most of the time?

* Sending the messages
* Flushing
* Both took approximately the same amount of time

(Don't remove `time.sleep` when answering this question)


In [3]:
# Start Time Measurement
t0 = time.time()

# Kafka Topic Name
topic_name = 'test-topic'

# Sending Messages to Kafka Topic
for i in range(10):
    message = {'number': i}                # Message content (dictionary)
    producer.send(topic_name, value=message)  # Send message to Kafka topic
    print(f"Sent: {message}")              # Print sent message
    time.sleep(0.05)                        # Delay between sending messages

# Flush Producer Buffer - ensure that all messages in the producer's buffer are
# sent to the Kafka broker before the program exits.
producer.flush()

# End Time Measurement
t1 = time.time()

# Print Time Taken
print(f'Took {(t1 - t0):.2f} seconds')

Sent: {'number': 0}
Sent: {'number': 1}
Sent: {'number': 2}
Sent: {'number': 3}
Sent: {'number': 4}
Sent: {'number': 5}
Sent: {'number': 6}
Sent: {'number': 7}
Sent: {'number': 8}
Sent: {'number': 9}
Took 0.78 seconds


How much time did it take? Where did it spend most of the time?

* Sending the messages
* Flushing
* Both took approximately the same amount of time


The total time taken was approximately 0.52 seconds.

Based on the provided code and the output:

Sending the messages: Sending the messages to the Kafka topic took most of the time.
Flushing: Flushing the producer buffer, ensuring all messages are sent to the Kafka broker, took a negligible amount of time compared to sending the messages.
Therefore, sending the messages to the Kafka topic was the main time-consuming task in this scenario. The time.sleep(0.05) function call adds a delay of 0.05 seconds between sending each message, contributing to the overall time taken for message transmission.

## Reading data with `rpk`

You can see the messages that you send to the topic
with `rpk`:

```bash
rpk topic consume test-topic
```

Run the command above and send the messages one more time to
see them




In [4]:
!docker exec -it redpanda-1 rpk topic consume test-topic

{
  "topic": "test-topic",
  "value": "{\"number\": 0}",
  "timestamp": 1711050709429,
  "partition": 0,
  "offset": 0
}
{
  "topic": "test-topic",
  "value": "{\"number\": 1}",
  "timestamp": 1711050709479,
  "partition": 0,
  "offset": 1
}
{
  "topic": "test-topic",
  "value": "{\"number\": 2}",
  "timestamp": 1711050709532,
  "partition": 0,
  "offset": 2
}
{
  "topic": "test-topic",
  "value": "{\"number\": 3}",
  "timestamp": 1711050709583,
  "partition": 0,
  "offset": 3
}
{
  "topic": "test-topic",
  "value": "{\"number\": 4}",
  "timestamp": 1711050709635,
  "partition": 0,
  "offset": 4
}
{
  "topic": "test-topic",
  "value": "{\"number\": 5}",
  "timestamp": 1711050709688,
  "partition": 0,
  "offset": 5
}
{
  "topic": "test-topic",
  "value": "{\"number\": 6}",
  "timestamp": 1711050709739,
  "partition": 0,
  "offset": 6
}
{
  "topic": "test-topic",
  "value": "{\"number\": 7}",
  "timestamp": 1711050709791,
  "partition": 0,
  "offset": 7
}
{
  "topic": "test-topic",
  "va

In [5]:
topic_name = 'test-topic'
for i in range(10):
    message = {'number': i}    
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)
producer.flush()

Sent: {'number': 0}
Sent: {'number': 1}
Sent: {'number': 2}
Sent: {'number': 3}
Sent: {'number': 4}
Sent: {'number': 5}
Sent: {'number': 6}
Sent: {'number': 7}
Sent: {'number': 8}
Sent: {'number': 9}


In [6]:
!docker exec -it redpanda-1 rpk topic consume test-topic

{
  "topic": "test-topic",
  "value": "{\"number\": 0}",
  "timestamp": 1711050709429,
  "partition": 0,
  "offset": 0
}
{
  "topic": "test-topic",
  "value": "{\"number\": 1}",
  "timestamp": 1711050709479,
  "partition": 0,
  "offset": 1
}
{
  "topic": "test-topic",
  "value": "{\"number\": 2}",
  "timestamp": 1711050709532,
  "partition": 0,
  "offset": 2
}
{
  "topic": "test-topic",
  "value": "{\"number\": 3}",
  "timestamp": 1711050709583,
  "partition": 0,
  "offset": 3
}
{
  "topic": "test-topic",
  "value": "{\"number\": 4}",
  "timestamp": 1711050709635,
  "partition": 0,
  "offset": 4
}
{
  "topic": "test-topic",
  "value": "{\"number\": 5}",
  "timestamp": 1711050709688,
  "partition": 0,
  "offset": 5
}
{
  "topic": "test-topic",
  "value": "{\"number\": 6}",
  "timestamp": 1711050709739,
  "partition": 0,
  "offset": 6
}
{
  "topic": "test-topic",
  "value": "{\"number\": 7}",
  "timestamp": 1711050709791,
  "partition": 0,
  "offset": 7
}
{
  "topic": "test-topic",
  "va

## Sending the taxi data

Now let's send our actual data:

* Read the green csv.gz file
* We will only need these columns:
  * `'lpep_pickup_datetime',`
  * `'lpep_dropoff_datetime',`
  * `'PULocationID',`
  * `'DOLocationID',`
  * `'passenger_count',`
  * `'trip_distance',`
  * `'tip_amount'`

Iterate over the records in the dataframe

```python
for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    print(row_dict)
    break

    # TODO implement sending the data here
```

Note: this way of iterating over the records is more efficient compared
to `iterrows`


In [7]:
!ls ./resources/

green_tripdata_2019-10.csv.gz


In [5]:
import pandas as pd

file_path = "./resources/green_tripdata_2019-10.csv.gz"
# Filter the DataFrame to include only the required columns
columns = [
    'lpep_pickup_datetime',
    'lpep_dropoff_datetime',
    'PULocationID',
    'DOLocationID',
    'passenger_count',
    'trip_distance',
    'tip_amount'
]
# Read the Green 2019-10 CSV file
df_green_filtered = pd.read_csv(file_path, compression='gzip', header=0, usecols=columns)
df_green_filtered.head()

Unnamed: 0,lpep_pickup_datetime,lpep_dropoff_datetime,PULocationID,DOLocationID,passenger_count,trip_distance,tip_amount
0,2019-10-01 00:26:02,2019-10-01 00:39:58,112,196,1.0,5.88,0.0
1,2019-10-01 00:18:11,2019-10-01 00:22:38,43,263,1.0,0.8,0.0
2,2019-10-01 00:09:31,2019-10-01 00:24:47,255,228,2.0,7.5,0.0
3,2019-10-01 00:37:40,2019-10-01 00:41:49,181,181,1.0,0.9,0.0
4,2019-10-01 00:08:13,2019-10-01 00:17:56,97,188,1.0,2.52,2.26


In [6]:
# row._fields returns a tuple of column names, and getattr(row, col) fetches the value of the column for the current row. 
# So, it creates a dictionary where the keys are column names and the values are the corresponding row values.

# Iterate over the records in the DataFrame
for row in df_green_filtered.itertuples(index=False):
    # Convert the row to a dictionary
    row_dict = {col: getattr(row, col) for col in row._fields}
    print(row_dict)
    break


{'lpep_pickup_datetime': '2019-10-01 00:26:02', 'lpep_dropoff_datetime': '2019-10-01 00:39:58', 'PULocationID': 112, 'DOLocationID': 196, 'passenger_count': 1.0, 'trip_distance': 5.88, 'tip_amount': 0.0}


## Question 5: Sending the Trip Data

* Create a topic `green-trips` and send the data there
* How much time in seconds did it take? (You can round it to a whole number)
* Make sure you don't include sleeps in your code


In [25]:
df_green_filtered.shape

(476386, 7)

In [7]:
count = 0
t0 = time.time()

topic_name = 'green-trips'

# Sending Trip Data to Kafka Topic
for row in df_green_filtered.itertuples(index=False):
    message = {col: getattr(row, col) for col in row._fields}  # Convert the row to a dictionary
    producer.send(topic_name, value=message)  # Send message to Kafka topic
    count += 1

t1 = time.time()

# Calculate time taken in seconds
time_taken = round(t1 - t0)
print(f'Time taken: {time_taken} seconds')
print('total records published', count)

Time taken: 61 seconds
total records published 476386


In [8]:
!docker exec -ti redpanda-1 rpk topic list


NAME         PARTITIONS  REPLICAS
green-trips  1           1
test-topic   1           1


In [9]:
!docker exec -ti redpanda-1 rpk topic describe green-trips

SUMMARY
NAME        green-trips
PARTITIONS  1
REPLICAS    1

CONFIGS
KEY                           VALUE       SOURCE
cleanup.policy                delete      DYNAMIC_TOPIC_CONFIG
compression.type              producer    DEFAULT_CONFIG
max.message.bytes             1048576     DEFAULT_CONFIG
message.timestamp.type        CreateTime  DEFAULT_CONFIG
redpanda.remote.delete        true        DEFAULT_CONFIG
redpanda.remote.read          false       DEFAULT_CONFIG
redpanda.remote.write         false       DEFAULT_CONFIG
retention.bytes               -1          DEFAULT_CONFIG
retention.local.target.bytes  -1          DEFAULT_CONFIG
retention.local.target.ms     86400000    DEFAULT_CONFIG
retention.ms                  604800000   DEFAULT_CONFIG
segment.bytes                 134217728   DEFAULT_CONFIG
segment.ms                    1209600000  DEFAULT_CONFIG



## Creating the PySpark consumer

Now let's read the data with PySpark.

Spark needs a library (jar) to be able to connect to Kafka,
so we need to tell PySpark that it needs to use it:

```python
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()
```

Now we can connect to the stream:

```python
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()
```

In order to test that we can consume from the stream,
let's see what will be the first record there.

In Spark streaming, the stream is represented as a sequence of
small batches, each batch being a small RDD (or a small dataframe).

So we can execute a function over each mini-batch.
Let's run `take(1)` there to see what do we have in the stream:

```python
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()
```

You should see a record like this:

```
Row(key=None, value=bytearray(b'{"lpep_pickup_datetime": "2019-10-01 00:26:02", "lpep_dropoff_datetime": "2019-10-01 00:39:58", "PULocationID": 112, "DOLocationID": 196, "passenger_count": 1.0, "trip_distance": 5.88, "tip_amount": 0.0}'), topic='green-trips', partition=0, offset=0, timestamp=datetime.datetime(2024, 3, 12, 22, 42, 9, 411000), timestampType=0)
```

Now let's stop the query, so it doesn't keep consuming messages
from the stream

```python
query.stop()
```



In [10]:
!echo $JAVA_HOME
!echo $SPARK_HOME
!echo $PYTHONPATH

/home/ndirangu749/spark/jdk-11.0.2
/home/ndirangu749/spark/spark-3.3.2-bin-hadoop3
/home/ndirangu749/spark/spark-3.3.2-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip:/home/ndirangu749/spark/spark-3.3.2-bin-hadoop3/python/:


In [11]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"
print(kafka_jar_package)

org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2


In [12]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

This code sets up a streaming DataFrame (`green_stream`) in Apache Spark structured streaming to consume data from a Kafka topic named "green-trips". Here's a breakdown of each part of the code:

1. **`spark.readStream`**: This creates a DataFrameReader for reading streaming data. It's used to configure the stream processing.

2. **`.format("kafka")`**: Specifies the format of the stream data source. In this case, it's set to "kafka" to indicate that the source is Kafka.

3. **`.option("kafka.bootstrap.servers", "localhost:9092")`**: Sets the Kafka bootstrap servers that Spark will connect to. This is necessary for establishing communication with the Kafka cluster. Here, it's set to localhost on port 9092, assuming Kafka is running on the same machine.

4. **`.option("subscribe", "green-trips")`**: Specifies the Kafka topic to subscribe to. The streaming DataFrame will consume data from the "green-trips" topic.

5. **`.option("startingOffsets", "earliest")`**: Sets the starting point for reading data from the Kafka topic. "earliest" means that it will start reading from the earliest available offset in the topic, ensuring that all existing data in the topic is consumed.

6. **`.load()`**: Loads the streaming data as a DataFrame. This establishes the connection to Kafka and initializes the stream, but it doesn't start processing data yet.

After executing this code, `green_stream` will represent a streaming DataFrame that continuously reads data from the "green-trips" Kafka topic. Further operations or transformations can be applied to this DataFrame to process the incoming data stream.

In [13]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [14]:
def peek(mini_batch, batch_id):
    """
    Function to print the first row of each micro-batch.
    
    Parameters:
        mini_batch (DataFrame): A DataFrame containing a micro-batch of data.
        batch_id (int): The ID of the current batch.
    """
    # Take the first row from the micro-batch DataFrame
    first_row = mini_batch.take(1)
    
    # Check if the micro-batch contains at least one row
    if first_row:
        # Print the first row of the micro-batch
        print(first_row[0])

# Set up the streaming query to apply the peek function to each micro-batch
query = green_stream.writeStream.foreachBatch(peek).start()
query

<pyspark.sql.streaming.StreamingQuery at 0x7f91b275f760>

Row(key=None, value=bytearray(b'{"lpep_pickup_datetime": "2019-10-01 00:26:02", "lpep_dropoff_datetime": "2019-10-01 00:39:58", "PULocationID": 112, "DOLocationID": 196, "passenger_count": 1.0, "trip_distance": 5.88, "tip_amount": 0.0}'), topic='green-trips', partition=0, offset=0, timestamp=datetime.datetime(2024, 3, 22, 7, 25, 51, 890000), timestampType=0)


In [15]:
query.stop()

In [16]:
if query.isActive:
    print("Streaming query is active")
else:
    print("Streaming query has stopped")


Streaming query has stopped


## Question 6. Parsing the data

The data is JSON, but currently it's in binary format. We need
to parse it and turn it into a streaming dataframe with proper
columns.

Similarly to PySpark, we define the schema

```python
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())
```

And apply this schema:

```python
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")
```

How does the record look after parsing? Copy the output.




In [17]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [18]:
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [19]:
green_stream.printSchema()

root
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- tip_amount: double (nullable = true)



In [None]:
query = green_stream \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()


In [None]:
24/03/22 07:33:25 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query
            didn't fail: /tmp/temporary-f22476b2-c089-458e-bc2b-035700528cb9. If it's required to delete it under any
            circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to
            know deleting temp checkpoint folder is best effort.
24/03/22 07:33:25 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets
            and will be disabled.
-------------------------------------------                                     
Batch: 0
-------------------------------------------
+--------------------+---------------------+------------+------------+---------------+-------------+----------+
|lpep_pickup_datetime|lpep_dropoff_datetime|PULocationID|DOLocationID|passenger_count|trip_distance|tip_amount|
+--------------------+---------------------+------------+------------+---------------+-------------+----------+
| 2019-10-01 00:26:02|  2019-10-01 00:39:58|         112|         196|            1.0|         5.88|       0.0|
| 2019-10-01 00:18:11|  2019-10-01 00:22:38|          43|         263|            1.0|          0.8|       0.0|
| 2019-10-01 00:09:31|  2019-10-01 00:24:47|         255|         228|            2.0|          7.5|       0.0|
| 2019-10-01 00:37:40|  2019-10-01 00:41:49|         181|         181|            1.0|          0.9|       0.0|
| 2019-10-01 00:08:13|  2019-10-01 00:17:56|          97|         188|            1.0|         2.52|      2.26|
| 2019-10-01 00:35:01|  2019-10-01 00:43:40|          65|          49|            1.0|         1.47|      1.86|
| 2019-10-01 00:28:09|  2019-10-01 00:30:49|           7|         179|            1.0|          0.6|       1.0|
| 2019-10-01 00:28:26|  2019-10-01 00:32:01|          41|          74|            1.0|         0.56|       0.0|
| 2019-10-01 00:14:01|  2019-10-01 00:26:16|         255|          49|            1.0|         2.42|       0.0|
| 2019-10-01 00:03:03|  2019-10-01 00:17:13|         130|         131|            1.0|          3.4|      2.85|
| 2019-10-01 00:07:10|  2019-10-01 00:23:38|          24|          74|            3.0|         3.18|       0.0|
| 2019-10-01 00:25:48|  2019-10-01 00:49:52|         255|         188|            1.0|          4.7|       1.0|
| 2019-10-01 00:03:12|  2019-10-01 00:14:43|         129|         160|            1.0|          3.1|       0.0|
| 2019-10-01 00:44:56|  2019-10-01 00:51:06|          18|         169|            1.0|         1.19|      0.25|
| 2019-10-01 00:55:14|  2019-10-01 01:00:49|         223|           7|            1.0|         1.09|      1.46|
| 2019-10-01 00:06:06|  2019-10-01 00:11:05|          75|         262|            1.0|         1.24|      2.01|
| 2019-10-01 00:00:19|  2019-10-01 00:14:32|          97|         228|            1.0|         3.03|      3.58|
| 2019-10-01 00:09:31|  2019-10-01 00:20:41|          41|          74|            1.0|         2.03|      2.16|
| 2019-10-01 00:30:36|  2019-10-01 00:34:30|          41|          42|            1.0|         0.73|      1.26|
| 2019-10-01 00:58:32|  2019-10-01 01:05:08|          41|         116|            1.0|         1.48|       0.0|
+--------------------+---------------------+------------+------------+---------------+-------------+----------+
only showing top 20 rows

In [None]:
query.stop()


### Question 7: Most popular destination

Now let's finally do some streaming analytics. We will
see what's the most popular destination currently
based on our stream of data (which ideally we should
have sent with delays like we did in workshop 2)


This is how you can do it:

* Add a column "timestamp" using the `current_timestamp` function
* Group by:
  * 5 minutes window based on the timestamp column (`F.window(col("timestamp"), "5 minutes")`)
  * `"DOLocationID"`
* Order by count

You can print the output to the console using this
code

```python
query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()
```

Write the most popular destination, your answer should be *either* the zone ID or the zone name of this destination. (You will need to re-send the data for this to work)


In [None]:
from pyspark.sql import functions as F

# Add a timestamp column
stream_with_timestamp = green_stream.withColumn("timestamp", F.current_timestamp())

# Group by 5-minute window and DOLocationID, count occurrences, and order by count
popular_destinations = stream_with_timestamp \
    .groupBy(F.window("timestamp", "5 minutes"), "DOLocationID") \
    .count() \
    .orderBy(F.desc("count"))

# Start the streaming query and write the output to the console
query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()


In [None]:
24/03/22 07:41:36 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query
            didn't fail: /tmp/temporary-87f64411-070e-45a0-b5e9-42f4bca67f45. If it's required to delete it under any 
            circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know
            deleting temp checkpoint folder is best effort.
24/03/22 07:41:36 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets 
            and will be disabled.
-------------------------------------------                                     
Batch: 0
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-03-22 07:40:00, 2024-03-22 07:45:00}|74          |17741|
|{2024-03-22 07:40:00, 2024-03-22 07:45:00}|42          |15942|
|{2024-03-22 07:40:00, 2024-03-22 07:45:00}|41          |14061|
|{2024-03-22 07:40:00, 2024-03-22 07:45:00}|75          |12840|
|{2024-03-22 07:40:00, 2024-03-22 07:45:00}|129         |11930|
|{2024-03-22 07:40:00, 2024-03-22 07:45:00}|7           |11533|
|{2024-03-22 07:40:00, 2024-03-22 07:45:00}|166         |10845|
|{2024-03-22 07:40:00, 2024-03-22 07:45:00}|236         |7913 |
|{2024-03-22 07:40:00, 2024-03-22 07:45:00}|223         |7542 |
|{2024-03-22 07:40:00, 2024-03-22 07:45:00}|238         |7318 |
|{2024-03-22 07:40:00, 2024-03-22 07:45:00}|82          |7292 |
|{2024-03-22 07:40:00, 2024-03-22 07:45:00}|181         |7282 |
|{2024-03-22 07:40:00, 2024-03-22 07:45:00}|95          |7244 |
|{2024-03-22 07:40:00, 2024-03-22 07:45:00}|244         |6733 |
|{2024-03-22 07:40:00, 2024-03-22 07:45:00}|61          |6606 |
|{2024-03-22 07:40:00, 2024-03-22 07:45:00}|116         |6339 |
|{2024-03-22 07:40:00, 2024-03-22 07:45:00}|138         |6144 |
|{2024-03-22 07:40:00, 2024-03-22 07:45:00}|97          |6050 |
|{2024-03-22 07:40:00, 2024-03-22 07:45:00}|49          |5221 |
|{2024-03-22 07:40:00, 2024-03-22 07:45:00}|151         |5153 |
+------------------------------------------+------------+-----+
only showing top 20 rows

In [None]:
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-03-22 07:40:00, 2024-03-22 07:45:00}|74          |17741|

In [22]:
query.stop()
