In [3]:
import json
import time 

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

True

In [4]:
t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    t2 = time.time()
    print(t2 - t0)
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)

producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

0.0001964569091796875
Sent: {'number': 0}
0.06194496154785156
Sent: {'number': 1}
0.11256718635559082
Sent: {'number': 2}
0.16314053535461426
Sent: {'number': 3}
0.2136368751525879
Sent: {'number': 4}
0.2645430564880371
Sent: {'number': 5}
0.3150327205657959
Sent: {'number': 6}
0.3656182289123535
Sent: {'number': 7}
0.4162154197692871
Sent: {'number': 8}
0.4667344093322754
Sent: {'number': 9}
took 0.52 seconds


In [5]:
import pandas as pd

df_green = pd.read_csv('oct2019.gz', compression='gzip')


  df_green = pd.read_csv('oct2019.gz', compression='gzip')


In [None]:
df_green

In [6]:
selected_columns = [
    'lpep_pickup_datetime',
    'lpep_dropoff_datetime',
    'PULocationID',
    'DOLocationID',
    'passenger_count',
    'trip_distance',
    'tip_amount'
]

df_green = df_green[selected_columns]

In [None]:
topic_name = 'green-trips'
t0 = time.time()

for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    producer.send(topic_name, value=row_dict)
    # print(f"Sent: {row_dict}") 

producer.flush()

t1 = time.time() 
print(f'Took {(t1 - t0):.2f} seconds')

Took 91.36 seconds


In [8]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

:: loading settings :: url = jar:file:/home/mikaelnystrom/bin/spark-3.3.2-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/mikaelnystrom/.ivy2/cache
The jars for the packages stored in: /home/mikaelnystrom/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-d8928437-7dad-4e11-a33b-86742572e3f1;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.2 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in c

24/03/18 12:21:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [9]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [10]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

24/03/18 12:23:13 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-9199bf29-fa15-43fc-a87b-96ab33b2e6c6. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/18 12:23:13 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [11]:
query.stop()

In [12]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [13]:
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [14]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

24/03/18 12:23:26 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-afba0b12-046a-4b7c-a8a7-7f6e2209739c. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/18 12:23:26 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

Row(lpep_pickup_datetime='2019-10-01 00:26:02', lpep_dropoff_datetime='2019-10-01 00:39:58', PULocationID=112, DOLocationID=196, passenger_count=1.0, trip_distance=5.88, tip_amount=0.0)


In [None]:
query = green_stream \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", False) \
    .option("numRows", 3) \
    .start()

# Let the query run for a short duration and then stop it
query.awaitTermination(timeout=120)  # Waits for 10 seconds before stopping automatically
query.stop()


In [15]:
green_stream_with_timestamp = green_stream.withColumn("timestamp", F.current_timestamp())

popular_destinations = green_stream_with_timestamp.groupBy(
    F.window(F.col("timestamp"), "5 minutes"),
    F.col("DOLocationID")
).count().orderBy(F.desc("count"))

query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

24/03/18 12:23:52 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-2f979d04-ed21-4581-9bb2-f53c7f578a26. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/18 12:23:52 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-03-18 12:20:00, 2024-03-18 12:25:00}|74          |35482|
|{2024-03-18 12:20:00, 2024-03-18 12:25:00}|42          |31884|
|{2024-03-18 12:20:00, 2024-03-18 12:25:00}|41          |28122|
|{2024-03-18 12:20:00, 2024-03-18 12:25:00}|75          |25680|
|{2024-03-18 12:20:00, 2024-03-18 12:25:00}|129         |23860|
|{2024-03-18 12:20:00, 2024-03-18 12:25:00}|7           |23066|
|{2024-03-18 12:20:00, 2024-03-18 12:25:00}|166         |21690|
|{2024-03-18 12:20:00, 2024-03-18 12:25:00}|236         |15826|
|{2024-03-18 12:20:00, 2024-03-18 12:25:00}|223         |15084|
|{2024-03-18 12:20:00, 2024-03-18 12:25:00}|238         |14636|
|{2024-03-18 12:20:00, 2024-03-18 12:25:00}|82          |14584|
|{2024-