# Question 3. Connecting to the Kafka server

In [1]:
import json
import time 

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

True

# Question 4. Sending data to the stream

In [5]:
t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)

producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

Sent: {'number': 0}
Sent: {'number': 1}
Sent: {'number': 2}
Sent: {'number': 3}
Sent: {'number': 4}
Sent: {'number': 5}
Sent: {'number': 6}
Sent: {'number': 7}
Sent: {'number': 8}
Sent: {'number': 9}
took 0.50 seconds


In [10]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz

--2024-03-17 15:27:40--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 140.82.121.3
Connecting to github.com (github.com)|140.82.121.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/ea580e9e-555c-4bd0-ae73-43051d8e7c0b?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240317%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240317T122741Z&X-Amz-Expires=300&X-Amz-Signature=080c3932d52e532b14841412e6d8ac3ac876aa69ab15e8a5f88fcfe7bf86fb0d&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dgreen_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-03-17 15:27:41--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/ea580e9e

In [11]:
!ls

05-batch-homework.ipynb      green_tripdata_2019-10.csv.gz
06-streaming-homework.ipynb  metabase
data-engineering-zoomcamp    plugins
de-zoomcamp-practice	     pytube_downloads
fhv			     redpanda
fhv_tripdata_2019-10.csv     taxi_zone_lookup.csv
green_tripdata_2019-10.csv   venv


In [4]:
filename = 'green_tripdata_2019-10.csv.gz'
selected_columns = ['lpep_pickup_datetime',
                    'lpep_dropoff_datetime',
                    'PULocationID',
                    'DOLocationID',
                    'passenger_count',
                    'trip_distance',
                    'tip_amount']

In [5]:
import pandas as pd

In [6]:
df_green = pd.read_csv(filename, compression='gzip', usecols=selected_columns)

In [7]:
df_green.head()

Unnamed: 0,lpep_pickup_datetime,lpep_dropoff_datetime,PULocationID,DOLocationID,passenger_count,trip_distance,tip_amount
0,2019-10-01 00:26:02,2019-10-01 00:39:58,112,196,1.0,5.88,0.0
1,2019-10-01 00:18:11,2019-10-01 00:22:38,43,263,1.0,0.8,0.0
2,2019-10-01 00:09:31,2019-10-01 00:24:47,255,228,2.0,7.5,0.0
3,2019-10-01 00:37:40,2019-10-01 00:41:49,181,181,1.0,0.9,0.0
4,2019-10-01 00:08:13,2019-10-01 00:17:56,97,188,1.0,2.52,2.26


In [8]:
df_green.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 476386 entries, 0 to 476385
Data columns (total 7 columns):
 #   Column                 Non-Null Count   Dtype  
---  ------                 --------------   -----  
 0   lpep_pickup_datetime   476386 non-null  object 
 1   lpep_dropoff_datetime  476386 non-null  object 
 2   PULocationID           476386 non-null  int64  
 3   DOLocationID           476386 non-null  int64  
 4   passenger_count        387007 non-null  float64
 5   trip_distance          476386 non-null  float64
 6   tip_amount             476386 non-null  float64
dtypes: float64(3), int64(2), object(2)
memory usage: 25.4+ MB


In [23]:
t0 = time.time()

for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    # print(row_dict)
    producer.send(topic_name, value=row_dict)
    
producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

took 28.88 seconds


# Question 5: Sending the Trip Data

In [9]:
t0 = time.time()

for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    producer.send("green-trips", value=row_dict)
    
producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

took 24.81 seconds


In [1]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
print(pyspark_version)
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

3.5.1


24/03/17 19:01:13 WARN Utils: Your hostname, debian resolves to a loopback address: 127.0.1.1; using 192.168.0.142 instead (on interface wlp0s20f3)
24/03/17 19:01:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /home/odeke/.ivy2/cache
The jars for the packages stored in: /home/odeke/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b21b7862-e2e4-4baf-a010-e14d2f0428fd;1.0
	confs: [default]


:: loading settings :: url = jar:file:/home/odeke/spark-3.5.1-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.1 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 377ms :: artifacts dl 10ms
	:: modules in use:
	com.google.code.findbugs#jsr305;3.0.0 from central in [default]
	commons-logging#commons-logging;1.1.3 from central in [default]
	org.apache.commons#commons-pool2;2.11.1 from central in [default]
	org.apache.hadoop#hadoop-client-api;3.3.4 from central in [default]
	org.apache.hadoop#

In [2]:
print("Spark version:", spark.version)

Spark version: 3.5.1


In [3]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [4]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)
    if first_row:
        display(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

query.awaitTermination(10)
query.stop()

24/03/17 19:01:16 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-c7d48361-58c5-4705-96f0-6d8ba59502c9. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/17 19:01:16 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/17 19:01:16 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


Row(key=None, value=bytearray(b'{"lpep_pickup_datetime": "2019-10-01 00:26:02", "lpep_dropoff_datetime": "2019-10-01 00:39:58", "PULocationID": 112, "DOLocationID": 196, "passenger_count": 1.0, "trip_distance": 5.88, "tip_amount": 0.0}'), topic='green-trips', partition=0, offset=0, timestamp=datetime.datetime(2024, 3, 17, 18, 52, 29, 411000), timestampType=0)

# Question 6. Parsing the data

In [5]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [6]:
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [7]:
green_stream.printSchema()

root
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- tip_amount: double (nullable = true)



In [8]:
query = green_stream.writeStream.foreachBatch(peek).start()
query.awaitTermination(10)
query.stop()

24/03/17 19:01:31 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-417c1a1c-f720-4f34-a428-fcd4e8a53468. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/17 19:01:31 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/17 19:01:31 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


Row(lpep_pickup_datetime='2019-10-01 00:26:02', lpep_dropoff_datetime='2019-10-01 00:39:58', PULocationID=112, DOLocationID=196, passenger_count=1.0, trip_distance=5.88, tip_amount=0.0)

# Question 7: Most popular destination

In [19]:
green_stream = green_stream.withColumn("timestamp", F.current_timestamp())

grouped_df = green_stream \
    .groupBy(F.window("timestamp", "5 minutes"), "DOLocationID") \
    .count()

popular_destinations = grouped_df \
    .orderBy(F.col("count").desc())

query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination(30)
query.stop()

24/03/17 19:14:32 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-c83a5f93-478d-43c1-994c-9c66f91ef984. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/17 19:14:32 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/17 19:14:32 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-03-17 19:10:00, 2024-03-17 19:15:00}|74          |17741|
|{2024-03-17 19:10:00, 2024-03-17 19:15:00}|42          |15942|
|{2024-03-17 19:10:00, 2024-03-17 19:15:00}|41          |14061|
|{2024-03-17 19:10:00, 2024-03-17 19:15:00}|75          |12840|
|{2024-03-17 19:10:00, 2024-03-17 19:15:00}|129         |11930|
|{2024-03-17 19:10:00, 2024-03-17 19:15:00}|7           |11533|
|{2024-03-17 19:10:00, 2024-03-17 19:15:00}|166         |10845|
|{2024-03-17 19:10:00, 2024-03-17 19:15:00}|236         |7913 |
|{2024-03-17 19:10:00, 2024-03-17 19:15:00}|223         |7542 |
|{2024-03-17 19:10:00, 2024-03-17 19:15:00}|238         |7318 |
|{2024-03-17 19:10:00, 2024-03-17 19:15:00}|82          |7292 |
|{2024-