In [1]:
# imports
import json
import time
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from kafka import KafkaProducer
from pyspark.sql import types
from pyspark.sql import functions as F
import warnings
warnings.filterwarnings('ignore')

#### q3

In [2]:
# testing connection to redpanda
def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

True

#### q4

In [3]:
# testing sending messages and flushing (with timing)
t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)
    
t1 = time.time()
print(f'sending messages took {(t1 - t0):.2f} seconds')

producer.flush()

t2 = time.time()
print(f'flushing took {(t2 - t1):.2f} seconds')
t3 = time.time()
print(f'entire process took {(t3 - t0):.2f} seconds')

Sent: {'number': 0}
Sent: {'number': 1}
Sent: {'number': 2}
Sent: {'number': 3}
Sent: {'number': 4}
Sent: {'number': 5}
Sent: {'number': 6}
Sent: {'number': 7}
Sent: {'number': 8}
Sent: {'number': 9}
sending messages took 0.80 seconds
flushing took 0.00 seconds
entire process took 0.80 seconds


In [4]:
# testing consuming messages
!docker exec redpanda-1 rpk topic consume test-topic

{
  "topic": "test-topic",
  "value": "{\"number\": 0}",
  "timestamp": 1711311263729,
  "partition": 0,
  "offset": 0
}
{
  "topic": "test-topic",
  "value": "{\"number\": 1}",
  "timestamp": 1711311263784,
  "partition": 0,
  "offset": 1
}
{
  "topic": "test-topic",
  "value": "{\"number\": 2}",
  "timestamp": 1711311263835,
  "partition": 0,
  "offset": 2
}
{
  "topic": "test-topic",
  "value": "{\"number\": 3}",
  "timestamp": 1711311263887,
  "partition": 0,
  "offset": 3
}
{
  "topic": "test-topic",
  "value": "{\"number\": 4}",
  "timestamp": 1711311263942,
  "partition": 0,
  "offset": 4
}
{
  "topic": "test-topic",
  "value": "{\"number\": 5}",
  "timestamp": 1711311263996,
  "partition": 0,
  "offset": 5
}
{
  "topic": "test-topic",
  "value": "{\"number\": 6}",
  "timestamp": 1711311264046,
  "partition": 0,
  "offset": 6
}
{
  "topic": "test-topic",
  "value": "{\"number\": 7}",
  "timestamp": 1711311264097,
  "partition"

#### q5

In [4]:
# reading the data and dropping unnecessary columns
df_green = pd.read_csv('data/green_tripdata_2019-10.csv.gz', compression='gzip', low_memory=False)

columns_to_keep = [
    'lpep_pickup_datetime',
    'lpep_dropoff_datetime',
    'PULocationID',
    'DOLocationID',
    'passenger_count',
    'trip_distance',
    'tip_amount'
]

df_green.drop(df_green.columns.difference(columns_to_keep), axis=1, inplace=True)

# checking the result
for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    print(row_dict)
    break

df_green.shape[0]

{'lpep_pickup_datetime': '2019-10-01 00:26:02', 'lpep_dropoff_datetime': '2019-10-01 00:39:58', 'PULocationID': 112, 'DOLocationID': 196, 'passenger_count': 1.0, 'trip_distance': 5.88, 'tip_amount': 0.0}


476386

In [5]:
# sending green trips data to redpanda
start = time.time()
print('Sending green trips data to redpanda...')
for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    producer.send('green-trips', value=row_dict)
    time.sleep(0.001) # introduce a delay to simulate real-time data, will take significantly longer
end = time.time()
print(f'Sending green trips data took {(end - start):.2f} seconds')

Sending green trips data to redpanda...
Sending green trips data took 716.72 seconds


Sending green trips data to redpanda...
Sending green trips data took 22.40 seconds

In [7]:
# create a local spark session
pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
    .getOrCreate()

24/03/27 12:40:01 WARN Utils: Your hostname, Dimis-MacBook-Pro-16.local resolves to a loopback address: 127.0.0.1; using 172.20.10.2 instead (on interface en0)
24/03/27 12:40:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/homebrew/Cellar/apache-spark/3.5.0/libexec/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/dimi/.ivy2/cache
The jars for the packages stored in: /Users/dimi/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-4baf95e6-d81a-4242-81e5-fa54839d563a;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 257ms :: artifacts dl 7ms
	:: mo

In [18]:
# connecting to the stream
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [11]:
# checking if stream is connected and consuming messages and peek at the 1st message
def peek(mini_batch, batch_id):
    mini_batch.isEmpty()
    first_row = mini_batch.take(1)
    print(f"Peeking at the first row: ")
    if first_row:
        print(first_row[0])
query_init = green_stream.writeStream.foreachBatch(peek).start()
time.sleep(1)

24/03/27 12:40:23 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/46/1b72q7597h1_zq7nb54zfr8h0000gn/T/temporary-82f1a811-dc69-436e-b3f2-27a5430e75be. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/27 12:40:23 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/27 12:40:23 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


Peeking at the first row: 
Row(key=None, value=bytearray(b'{"lpep_pickup_datetime": "2019-10-01 00:26:02", "lpep_dropoff_datetime": "2019-10-01 00:39:58", "PULocationID": 112, "DOLocationID": 196, "passenger_count": 1.0, "trip_distance": 5.88, "tip_amount": 0.0}'), topic='green-trips', partition=0, offset=0, timestamp=datetime.datetime(2024, 3, 27, 12, 21, 26, 431000), timestampType=0)


In [12]:
query_init.stop()
green_stream.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)


#### q6

In [13]:
# defining the schema
schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType()) 

In [19]:
# parsing the stream
green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data"), F.col("timestamp").alias("timestamp")) \
  .select("data.*", "timestamp")

In [20]:
query_parsed = green_stream.writeStream.foreachBatch(peek).start()
time.sleep(1)

24/03/27 12:41:26 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/46/1b72q7597h1_zq7nb54zfr8h0000gn/T/temporary-03baea21-c861-41bc-9e58-64c0f173fd3a. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/27 12:41:26 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/27 12:41:27 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


Peeking at the first row: 
Row(lpep_pickup_datetime='2019-10-01 00:26:02', lpep_dropoff_datetime='2019-10-01 00:39:58', PULocationID=112, DOLocationID=196, passenger_count=1.0, trip_distance=5.88, tip_amount=0.0, timestamp=datetime.datetime(2024, 3, 27, 12, 21, 26, 431000))


In [21]:
query_parsed.stop()
green_stream.printSchema()

root
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)


#### q7

In [22]:
popular_destinations = green_stream \
    .groupBy("DOLocationID", (F.window(F.col("timestamp"), "5 minutes"))) \
    .count() \
    .orderBy(F.desc("count")) \
    .limit(10)

In [23]:
query_q7 = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()
query_q7.awaitTermination(10)

24/03/27 12:41:37 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/46/1b72q7597h1_zq7nb54zfr8h0000gn/T/temporary-d6644958-06ad-480b-a578-6ba66e149113. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/27 12:41:37 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/27 12:41:37 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------+------------------------------------------+-----+
|DOLocationID|window                                    |count|
+------------+------------------------------------------+-----+
|74          |{2024-03-27 12:25:00, 2024-03-27 12:30:00}|8660 |
|42          |{2024-03-27 12:25:00, 2024-03-27 12:30:00}|7728 |
|41          |{2024-03-27 12:25:00, 2024-03-27 12:30:00}|7003 |
|74          |{2024-03-27 12:20:00, 2024-03-27 12:25:00}|6520 |
|129         |{2024-03-27 12:25:00, 2024-03-27 12:30:00}|6283 |
|75          |{2024-03-27 12:25:00, 2024-03-27 12:30:00}|6233 |
|7           |{2024-03-27 12:25:00, 2024-03-27 12:30:00}|5988 |
|42          |{2024-03-27 12:20:00, 2024-03-27 12:25:00}|5696 |
|166         |{2024-03-27 12:25:00, 2024-03-27 12:30:00}|5385 |
|41          |{2024-03-27 12:20:00, 2024-03-27 12:25:00}|5174 |
+------------+------------------------------------------+-----+


False