In [1]:
import json
import time
import pyspark

In [2]:
from kafka import KafkaProducer

In [3]:
def json_serializer(data):
    return json.dumps(data).encode('utf-8')

In [4]:
server = 'localhost:9092'

In [5]:
producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)
producer.bootstrap_connected()

True

In [7]:
t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    message = {'number': i}
    t_before_send = time.time()
    producer.send(topic_name, value=message)
    t_after_send = time.time()
    print(f'send msg took {(t_after_send - t_before_send):.9f} seconds')
    print(f"Sent: {message}")
    time.sleep(0.05)

t_before_flush = time.time()
producer.flush()
t_after_flush = time.time()
print(f'flush took {(t_after_send - t_before_send):.9f} seconds')

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

send msg took 0.000821114 seconds
Sent: {'number': 0}
send msg took 0.000353098 seconds
Sent: {'number': 1}
send msg took 0.000165939 seconds
Sent: {'number': 2}
send msg took 0.000200987 seconds
Sent: {'number': 3}
send msg took 0.000196695 seconds
Sent: {'number': 4}
send msg took 0.000252247 seconds
Sent: {'number': 5}
send msg took 0.000423908 seconds
Sent: {'number': 6}
send msg took 0.000567913 seconds
Sent: {'number': 7}
send msg took 0.000522137 seconds
Sent: {'number': 8}
send msg took 0.000603199 seconds
Sent: {'number': 9}
flush took 0.000603199 seconds
took 0.54 seconds


In [6]:
import pandas as pd
from settings import PRODUCE_TOPIC_GREEN_TRIPS, CONSUME_TOPIC_GREEN_TRIPS, INPUT_DATA_PATH

In [9]:
df = pd.read_csv('resources/green_tripdata_2019-10.csv.gz', compression='gzip', header=0, sep=',', quotechar='"', dtype={'store_and_fwd_flag': str})
df_green = df.loc[:, ["lpep_pickup_datetime","lpep_dropoff_datetime","PULocationID","DOLocationID","passenger_count","trip_distance","tip_amount"]]
t0_send_data = time.time()
count_data = 0
for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    # print(row_dict)
    producer.send(PRODUCE_TOPIC_GREEN_TRIPS, value=row_dict)
    count_data = count_data + 1

t1_send_data = time.time()
print(f"Sent total {count_data} records to {PRODUCE_TOPIC_GREEN_TRIPS} topic took {(t1_send_data - t0_send_data):.2f} seconds")



Sent total 476386 records to green-trips topic took 19.78 seconds


In [7]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,org.apache.spark:spark-avro_2.12:3.5.1 pyspark-shell'

In [8]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

:: loading settings :: url = jar:file:/Users/trankiendang/Projects/DTC-DE-ZoomCamp-2024/module-06/module-06-env/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/trankiendang/.ivy2/cache
The jars for the packages stored in: /Users/trankiendang/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f531f4a1-2573-483b-9b12-0db5d84a69d6;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.1 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in local-m2-cache
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11

In [9]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
    .option("subscribe", CONSUME_TOPIC_GREEN_TRIPS) \
    .option("startingOffsets", "earliest") \
    .load()

In [10]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

24/04/10 00:09:19 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/zn/wwsm9k291x1f9qdgt458y3vh0000gn/T/temporary-61b0946e-ac30-488e-adce-c9dc320b929a. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/10 00:09:19 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


24/04/10 00:09:20 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

Row(key=None, value=bytearray(b'{"lpep_pickup_datetime": "2019-10-01 00:26:02", "lpep_dropoff_datetime": "2019-10-01 00:39:58", "PULocationID": 112, "DOLocationID": 196, "passenger_count": 1.0, "trip_distance": 5.88, "tip_amount": 0.0}'), topic='green-trips', partition=0, offset=0, timestamp=datetime.datetime(2024, 4, 9, 10, 47, 7, 979000), timestampType=0)


In [13]:
print(query)

<pyspark.sql.streaming.query.StreamingQuery object at 0x000001BE81C36650>


In [11]:
query.stop()

In [12]:
green_stream.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [14]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [15]:
from pyspark.sql import functions as F

green_stream_encoded = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [16]:
green_stream_encoded

DataFrame[lpep_pickup_datetime: string, lpep_dropoff_datetime: string, PULocationID: int, DOLocationID: int, passenger_count: double, trip_distance: double, tip_amount: double]

In [18]:
green_stream_encoded.printSchema()

root
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- tip_amount: double (nullable = true)



In [18]:
def sink_memory(df, query_name, query_template):
    write_query = df \
        .writeStream \
        .queryName(query_name) \
        .start()
    query_str = query_template.format(table_name=query_name)
    query_results = spark.sql(query_str)
    return write_query, query_results

In [19]:
def sink_console(df, output_mode: str = 'complete', processing_time: str = '5 seconds'):
    write_query = df.writeStream \
        .outputMode(output_mode) \
        .trigger(processingTime=processing_time) \
        .format("console") \
        .option("truncate", False) \
        .start()
    return write_query # pyspark.sql.streaming.StreamingQuer

In [None]:
write_query = sink_console(green_stream, output_mode='append')

24/04/09 11:02:48 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/zn/wwsm9k291x1f9qdgt458y3vh0000gn/T/temporary-01912a75-6f3c-44dc-b4c8-6f32ef567cd1. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/09 11:02:48 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


24/04/09 11:02:49 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---------------------+------------+------------+---------------+-------------+----------+
|lpep_pickup_datetime|lpep_dropoff_datetime|PULocationID|DOLocationID|passenger_count|trip_distance|tip_amount|
+--------------------+---------------------+------------+------------+---------------+-------------+----------+
|2019-10-01 00:26:02 |2019-10-01 00:39:58  |112         |196         |1.0            |5.88         |0.0       |
|2019-10-01 00:18:11 |2019-10-01 00:22:38  |43          |263         |1.0            |0.8          |0.0       |
|2019-10-01 00:09:31 |2019-10-01 00:24:47  |255         |228         |2.0            |7.5          |0.0       |
|2019-10-01 00:37:40 |2019-10-01 00:41:49  |181         |181         |1.0            |0.9          |0.0       |
|2019-10-01 00:08:13 |2019-10-01 00:17:56  |97          |188         |1.0            |2.52         |2.26      |
|2019-1

24/04/09 11:02:56 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000 milliseconds, but spent 7364 milliseconds


In [24]:
write_query.awaitTermination()

24/04/09 13:41:31 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 946665 ms exceeds timeout 120000 ms
24/04/09 13:41:31 WARN SparkContext: Killing executors is not supported by current scheduler.
24/04/09 13:48:17 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [23]:
write_query.stop()

In [None]:
def sink_memory(df, query_name, query_template):
    write_query = df \
        .writeStream \
        .queryName(query_name) \
        .format('memory') \
        .start()
    query_str = query_template.format(table_name=query_name)
    query_results = spark.sql(query_str)
    return write_query, query_results

In [19]:
green_stream_encoded_with_ts = green_stream \
    .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
    .select("data.*") \
    .withColumn("timestamp", F.current_timestamp()) # Add timestamp column

In [20]:
popular_destinations = green_stream_encoded_with_ts \
    .groupBy(F.window(F.col("timestamp"), "5 minutes"), F.col("DOLocationID")) \
    .count() \
    .orderBy(F.desc("count")) \
    .select("window", "DOLocationID", "count")

In [None]:
query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

24/04/10 00:15:48 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/zn/wwsm9k291x1f9qdgt458y3vh0000gn/T/temporary-88c29e48-ac41-4d36-b08f-e23898398ec8. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/10 00:15:48 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


24/04/10 00:15:48 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-10 00:15:00, 2024-04-10 00:20:00}|74          |17741|
|{2024-04-10 00:15:00, 2024-04-10 00:20:00}|42          |15942|
|{2024-04-10 00:15:00, 2024-04-10 00:20:00}|41          |14061|
|{2024-04-10 00:15:00, 2024-04-10 00:20:00}|75          |12840|
|{2024-04-10 00:15:00, 2024-04-10 00:20:00}|129         |11930|
|{2024-04-10 00:15:00, 2024-04-10 00:20:00}|7           |11533|
|{2024-04-10 00:15:00, 2024-04-10 00:20:00}|166         |10845|
|{2024-04-10 00:15:00, 2024-04-10 00:20:00}|236         |7913 |
|{2024-04-10 00:15:00, 2024-04-10 00:20:00}|223         |7542 |
|{2024-04-10 00:15:00, 2024-04-10 00:20:00}|238         |7318 |
|{2024-04-10 00:15:00, 2024-04-10 00:20:00}|82          |7292 |
|{2024-

In [22]:
query.awaitTermination()

In [None]:
print(query)