In [1]:
import pandas as pd
from pprint import pprint

  from pandas.core.computation.check import NUMEXPR_INSTALLED
  from pandas.core import (


# Setup

In [2]:
# docker compose up -d

In [3]:
! alias rpk="docker exec -ti redpanda-1 rpk"

In [4]:
! docker exec -ti redpanda-1 rpk help

rpk is the Redpanda CLI & toolbox

Usage:
  rpk [command]

Available Commands:
  acl         Manage ACLs and SASL users
  cloud       Interact with Redpanda cloud
  cluster     Interact with a Redpanda cluster
  container   Manage a local container cluster
  debug       Debug the local Redpanda process
  generate    Generate a configuration template for related services
  group       Describe, list, and delete consumer groups and manage their offsets
  help        Help about any command
  iotune      Measure filesystem performance and create IO configuration file
  plugin      List, download, update, and remove rpk plugins
  redpanda    Interact with a local Redpanda process
  topic       Create, delete, produce to and consume from Redpanda topics
  version     Check the current version
  wasm        Deploy and remove inline WASM engine scripts

Flags:
  -h, --help      Help for rpk
  -v, --verbose   Enable verbose logging (default: false)

Use "rpk [command] -

# Question 1: Redpanda version
Answer: v22.3.5 (rev 28b2443)

In [6]:
! docker exec -ti redpanda-1 rpk version

v22.3.5 (rev 28b2443)


# Question 2. Creating a topic
Answer: 
```sh
TOPIC       STATUS
test-topic  OK

```

In [10]:
! docker exec -ti redpanda-1 rpk topic delete test-topic

TOPIC       STATUS
test-topic  OK


In [11]:
! docker exec -ti redpanda-1 rpk topic create test-topic

TOPIC       STATUS
test-topic  OK


# Question 3. Connecting to the Kafka server
Answer: True

In [12]:
import json
import time 

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

True

# Question 4. Sending data to the stream
Answer: Sending the messages

In [13]:
t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)

producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

Sent: {'number': 0}
Sent: {'number': 1}
Sent: {'number': 2}
Sent: {'number': 3}
Sent: {'number': 4}
Sent: {'number': 5}
Sent: {'number': 6}
Sent: {'number': 7}
Sent: {'number': 8}
Sent: {'number': 9}
took 0.54 seconds


# Reading data with rpk


In [9]:
! docker exec -ti redpanda-1 rpk topic consume test-topic

{
  "topic": "test-topic",
  "value": "{\"number\": 0}",
  "timestamp": 1710978742906,
  "partition": 0,
  "offset": 0
}
{
  "topic": "test-topic",
  "value": "{\"number\": 1}",
  "timestamp": 1710978742960,
  "partition": 0,
  "offset": 1
}
{
  "topic": "test-topic",
  "value": "{\"number\": 2}",
  "timestamp": 1710978743016,
  "partition": 0,
  "offset": 2
}
{
  "topic": "test-topic",
  "value": "{\"number\": 3}",
  "timestamp": 1710978743070,
  "partition": 0,
  "offset": 3
}
{
  "topic": "test-topic",
  "value": "{\"number\": 4}",
  "timestamp": 1710978743124,
  "partition": 0,
  "offset": 4
}
{
  "topic": "test-topic",
  "value": "{\"number\": 5}",
  "timestamp": 1710978743179,
  "partition": 0,
  "offset": 5
}
{
  "topic": "test-topic",
  "value": "{\"number\": 6}",
  "timestamp": 1710978743234,
  "partition": 0,
  "offset": 6
}
{
  "topic": "test-topic",
  "value": "{\"number\": 7}",
  "timestamp": 1710978743287,
  "partition": 0,
  "offset": 7
}
{
  "topic": "test-topic",
  "va

# Sending the taxi data


In [14]:
columns = [
    'lpep_pickup_datetime','lpep_dropoff_datetime','PULocationID',
    'DOLocationID','passenger_count','trip_distance','tip_amount'
]
df_green = pd.read_csv('green_tripdata_2019-10.csv.gz', compression='gzip', usecols=columns)

In [15]:
for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    pprint(row_dict)
    break

{'DOLocationID': 196,
 'PULocationID': 112,
 'lpep_dropoff_datetime': '2019-10-01 00:39:58',
 'lpep_pickup_datetime': '2019-10-01 00:26:02',
 'passenger_count': 1.0,
 'tip_amount': 0.0,
 'trip_distance': 5.88}


In [16]:
! docker exec -ti redpanda-1 rpk topic delete green-trips

TOPIC        STATUS
green-trips  OK


In [17]:
! docker exec -ti redpanda-1 rpk topic create green-trips

TOPIC        STATUS
green-trips  OK


# Question 5: Sending the Trip Data
Answer: 24.85 seconds

In [18]:
t0 = time.time()

topic_name = 'green-trips'

print(df_green.shape)

for row in df_green.itertuples(index=False):
    message = {col: getattr(row, col) for col in row._fields}
    producer.send(topic_name, value=message)

    t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

(476386, 7)
took 24.32 seconds


In [15]:
# ! docker exec -ti redpanda-1 rpk topic consume green-trips

# Creating the PySpark consumer

In [19]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

:: loading settings :: url = jar:file:/opt/homebrew/anaconda3/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/heanh/.ivy2/cache
The jars for the packages stored in: /Users/heanh/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-34924bb5-c88f-4b13-aeca-c4ad63f10310;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.1 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 280ms :: artifacts dl 7ms
	:: 

In [20]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [21]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

24/03/20 20:26:43 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/qj/3nrl2lg92pv09vj6961hy6zr0000gn/T/temporary-68d87dcf-89c5-4a43-a845-dfe909b7599a. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/20 20:26:43 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/20 20:26:43 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


Row(key=None, value=bytearray(b'{"lpep_pickup_datetime": "2019-10-01 00:26:02", "lpep_dropoff_datetime": "2019-10-01 00:39:58", "PULocationID": 112, "DOLocationID": 196, "passenger_count": 1.0, "trip_distance": 5.88, "tip_amount": 0.0}'), topic='green-trips', partition=0, offset=0, timestamp=datetime.datetime(2024, 3, 20, 20, 24, 44, 685000), timestampType=0)


In [22]:
query.stop()

# Question 6. Parsing the data
Answer:
```sh
Row(lpep_pickup_datetime='2019-10-01 00:26:02', lpep_dropoff_datetime='2019-10-01 00:39:58', PULocationID=112, DOLocationID=196, passenger_count=1.0, trip_distance=5.88, tip_amount=0.0)
```

In [23]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [24]:
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [25]:
green_stream.printSchema()

root
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- tip_amount: double (nullable = true)



In [26]:
query = green_stream.writeStream.foreachBatch(peek).start()

24/03/20 20:26:55 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/qj/3nrl2lg92pv09vj6961hy6zr0000gn/T/temporary-56c876d7-a8b1-4f5b-b1c8-eb1709f9d1c2. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/20 20:26:55 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/20 20:26:55 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


Row(lpep_pickup_datetime='2019-10-01 00:26:02', lpep_dropoff_datetime='2019-10-01 00:39:58', PULocationID=112, DOLocationID=196, passenger_count=1.0, trip_distance=5.88, tip_amount=0.0)


# Question 7: Most popular destination
Answer: 74

In [27]:
green_stream.printSchema()

root
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- tip_amount: double (nullable = true)



In [28]:
green_stream = green_stream \
  .withColumn("timestamp", F.current_timestamp())

In [29]:
green_stream.printSchema()

root
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- timestamp: timestamp (nullable = false)



In [30]:
popular_destinations = green_stream \
    .groupBy(
        F.window(F.col("timestamp"), "5 minutes"),
        F.col("DOLocationID")
    ) \
    .count() \
    .orderBy(F.col("count").desc())

In [31]:
query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

# query.awaitTermination()

24/03/20 20:27:45 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/qj/3nrl2lg92pv09vj6961hy6zr0000gn/T/temporary-5ac05f1f-e779-4726-a488-f98513420c9b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/20 20:27:45 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/20 20:27:45 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-03-20 20:25:00, 2024-03-20 20:30:00}|74          |17741|
|{2024-03-20 20:25:00, 2024-03-20 20:30:00}|42          |15942|
|{2024-03-20 20:25:00, 2024-03-20 20:30:00}|41          |14061|
|{2024-03-20 20:25:00, 2024-03-20 20:30:00}|75          |12840|
|{2024-03-20 20:25:00, 2024-03-20 20:30:00}|129         |11930|
|{2024-03-20 20:25:00, 2024-03-20 20:30:00}|7           |11533|
|{2024-03-20 20:25:00, 2024-03-20 20:30:00}|166         |10845|
|{2024-03-20 20:25:00, 2024-03-20 20:30:00}|236         |7913 |
|{2024-03-20 20:25:00, 2024-03-20 20:30:00}|223         |7542 |
|{2024-03-20 20:25:00, 2024-03-20 20:30:00}|238         |7318 |
|{2024-03-20 20:25:00, 2024-03-20 20:30:00}|82          |7292 |
|{2024-

In [32]:
query.stop()