In [11]:
import json
import time 

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

In [12]:
producer.bootstrap_connected()

True

In [13]:
t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)

producer.flush()

t1 = time.time()

Sent: {'number': 0}
Sent: {'number': 1}
Sent: {'number': 2}
Sent: {'number': 3}
Sent: {'number': 4}
Sent: {'number': 5}
Sent: {'number': 6}
Sent: {'number': 7}
Sent: {'number': 8}
Sent: {'number': 9}


In [14]:
print(f'took {(t1 - t0):.2f} seconds')

took 0.51 seconds


In [5]:
import pandas as pd

In [6]:
df = pd.read_csv("green_tripdata_2019-10.csv")

  exec(code_obj, self.user_global_ns, self.user_ns)


In [7]:
df_green = df[['lpep_pickup_datetime', 'lpep_dropoff_datetime', 'PULocationID', 'DOLocationID', 'passenger_count', 'trip_distance', 'tip_amount']]

In [9]:
t0 = time.time()

for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    producer.send("green-trips", value=row_dict)

producer.flush()
t1 = time.time()
print(f'data sent in {(t1 - t0):.2f} seconds')

data sent in 60.09 seconds


In [15]:
import pyspark
from pyspark.sql import SparkSession

In [16]:
pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

In [18]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

In [20]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [21]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

In [22]:
query = green_stream.writeStream.foreachBatch(peek).start()

24/04/07 18:44:26 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-3fa513cf-7380-45a5-87fc-153727a4ff2a. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/07 18:44:27 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

Row(key=None, value=bytearray(b'{"lpep_pickup_datetime": "2019-10-01 00:26:02", "lpep_dropoff_datetime": "2019-10-01 00:39:58", "PULocationID": 112, "DOLocationID": 196, "passenger_count": 1.0, "trip_distance": 5.88, "tip_amount": 0.0}'), topic='green-trips', partition=0, offset=0, timestamp=datetime.datetime(2024, 4, 7, 15, 47, 38, 655000), timestampType=0)


In [23]:
query.stop()

In [24]:
from pyspark.sql import types

In [25]:
schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [26]:
from pyspark.sql import functions as F

In [27]:
green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [28]:
# to see what the record looks like after parsing
query = green_stream.writeStream.foreachBatch(peek).start()

24/04/07 18:49:06 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-9f681eba-b15c-4e02-bdac-90ffaedaf913. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/07 18:49:06 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
Row(lpep_pickup_datetime='2019-10-01 00:26:02', lpep_dropoff_datetime='2019-10-01 00:39:58', PULocationID=112, DOLocationID=196, passenger_count=1.0, trip_distance=5.88, tip_amount=0.0)


In [30]:
query.stop()

In [31]:
# Adding timestamp column
green_stream = green_stream.withColumn("timestamp", F.current_timestamp())

In [33]:
grouped_green_stream = green_stream \
    .groupBy(F.window(F.col("timestamp"), "5 minutes"), F.col("DOLocationID")) \
    .count() \
    .orderBy(F.col("count").desc())

In [None]:
query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

24/04/07 19:00:17 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-70e5643c-33ef-49e9-9131-e4929dd7b913. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/07 19:00:17 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-07 19:00:00, 2024-04-07 19:05:00}|74          |35482|
|{2024-04-07 19:00:00, 2024-04-07 19:05:00}|42          |31884|
|{2024-04-07 19:00:00, 2024-04-07 19:05:00}|41          |28122|
|{2024-04-07 19:00:00, 2024-04-07 19:05:00}|75          |25680|
|{2024-04-07 19:00:00, 2024-04-07 19:05:00}|129         |23860|
|{2024-04-07 19:00:00, 2024-04-07 19:05:00}|7           |23066|
|{2024-04-07 19:00:00, 2024-04-07 19:05:00}|166         |21690|
|{2024-04-07 19:00:00, 2024-04-07 19:05:00}|236         |15826|
|{2024-04-07 19:00:00, 2024-04-07 19:05:00}|223         |15084|
|{2024-04-07 19:00:00, 2024-04-07 19:05:00}|238         |14636|
|{2024-04-07 19:00:00, 2024-04-07 19:05:00}|82          |14584|
|{2024-

In [None]:
query.stop()