In [1]:
import json
import time 

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

True

In [2]:
t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)

t1 = time.time()
print(f'took {(t1 - t0 - 0.05 * 10):.2f} seconds sending messages')

t2 = time.time()

producer.flush()

t3 = time.time()
print(f'took {(t3 - t2):.2f} seconds flushing')

print(f'took {(t3 - t0):.2f} seconds to complete the process')

Sent: {'number': 0}
Sent: {'number': 1}
Sent: {'number': 2}
Sent: {'number': 3}
Sent: {'number': 4}
Sent: {'number': 5}
Sent: {'number': 6}
Sent: {'number': 7}
Sent: {'number': 8}
Sent: {'number': 9}
took 0.03 seconds sending messages
took 0.00 seconds flushing
took 0.53 seconds to complete the process


In [None]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz

In [None]:
import gzip

with gzip.open('green_tripdata_2019-10.csv.gz', 'rt', newline='') as csv_file:
    csv_data = csv_file.read()
    with open('green_tripdata_2019-10.csv', 'wt') as out_file:
         out_file.write(csv_data)

In [3]:
!wc -l green_tripdata_2019-10.csv

  476387 green_tripdata_2019-10.csv


In [4]:
import pandas as pd
pd.DataFrame.iteritems = pd.DataFrame.items

In [5]:
columns = ["lpep_pickup_datetime", "lpep_dropoff_datetime", "PULocationID", "DOLocationID", "passenger_count", "trip_distance", "tip_amount"]
df_green = pd.read_csv('green_tripdata_2019-10.csv', usecols=columns)

In [6]:
df_green.head(10)

Unnamed: 0,lpep_pickup_datetime,lpep_dropoff_datetime,PULocationID,DOLocationID,passenger_count,trip_distance,tip_amount
0,2019-10-01 00:26:02,2019-10-01 00:39:58,112,196,1.0,5.88,0.0
1,2019-10-01 00:18:11,2019-10-01 00:22:38,43,263,1.0,0.8,0.0
2,2019-10-01 00:09:31,2019-10-01 00:24:47,255,228,2.0,7.5,0.0
3,2019-10-01 00:37:40,2019-10-01 00:41:49,181,181,1.0,0.9,0.0
4,2019-10-01 00:08:13,2019-10-01 00:17:56,97,188,1.0,2.52,2.26
5,2019-10-01 00:35:01,2019-10-01 00:43:40,65,49,1.0,1.47,1.86
6,2019-10-01 00:28:09,2019-10-01 00:30:49,7,179,1.0,0.6,1.0
7,2019-10-01 00:28:26,2019-10-01 00:32:01,41,74,1.0,0.56,0.0
8,2019-10-01 00:14:01,2019-10-01 00:26:16,255,49,1.0,2.42,0.0
9,2019-10-01 00:03:03,2019-10-01 00:17:13,130,131,1.0,3.4,2.85


In [22]:
t0 = time.time()

topic_name = 'green-trips'

for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    producer.send(topic_name, value=row_dict)

t1 = time.time()

print(f'took {(t1 - t0):.2f} seconds to send the data')

took 63.38 seconds to send the data


In [7]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
	.config("spark.executor.memory", "4g") \
	.config("spark.driver.memory", "4g") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()



:: loading settings :: url = jar:file:/usr/local/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/djr/.ivy2/cache
The jars for the packages stored in: /Users/djr/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-d128144e-7858-4c34-80a4-c5aec902cdc7;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.1 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in local-m2-cache
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-logging;1.1.3 in local-m2-cache
	found com.google.code.findbugs#jsr305;3.0.0 

In [8]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [9]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

In [10]:
query1 = green_stream.writeStream.foreachBatch(peek).start()

2024-03-23 12:55:16,081 WARN streaming.ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/80/gqm7t15x6nb4qkwkfb9v2_2r0000gn/T/temporary-e927a6fa-81c5-4767-a027-46d8926ceace. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
2024-03-23 12:55:16,119 WARN streaming.ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

Row(key=None, value=bytearray(b'{"lpep_pickup_datetime": "2019-10-01 00:26:02", "lpep_dropoff_datetime": "2019-10-01 00:39:58", "PULocationID": 112, "DOLocationID": 196, "passenger_count": 1.0, "trip_distance": 5.88, "tip_amount": 0.0}'), topic='green-trips', partition=0, offset=0, timestamp=datetime.datetime(2024, 3, 17, 13, 51, 3, 982000), timestampType=0)
Row(lpep_pickup_datetime='2019-10-01 00:26:02', lpep_dropoff_datetime='2019-10-01 00:39:58', PULocationID=112, DOLocationID=196, passenger_count=1.0, trip_distance=5.88, tip_amount=0.0)


In [11]:
query1.stop()

In [12]:
green_stream1 = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [13]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [14]:
from pyspark.sql import functions as F

green_stream1 = green_stream1 \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [15]:
query2 = green_stream1.writeStream.foreachBatch(peek).start()

2024-03-23 12:55:35,507 WARN streaming.ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/80/gqm7t15x6nb4qkwkfb9v2_2r0000gn/T/temporary-d013bb88-c573-425e-a304-ad077279106f. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
2024-03-23 12:55:35,514 WARN streaming.ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [16]:
query2.stop()

In [17]:
popular_destinations = green_stream1 \
    .withColumn("timestamp", F.current_timestamp()) \
    .select("timestamp", "DOLocationID") \
    .groupBy([F.window(F.col("timestamp"), "5 minutes"), "DOLocationID"]) \
    .count() \
    .orderBy("count", ascending=False)

In [18]:
query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

2024-03-23 12:56:13,525 WARN streaming.ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/80/gqm7t15x6nb4qkwkfb9v2_2r0000gn/T/temporary-10eefb0a-8e60-4fae-97af-c510a1780f41. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
2024-03-23 12:56:13,527 WARN streaming.ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+------------+------+
|window                                    |DOLocationID|count |
+------------------------------------------+------------+------+
|{2024-03-23 12:55:00, 2024-03-23 13:00:00}|74          |638078|
|{2024-03-23 12:55:00, 2024-03-23 13:00:00}|42          |573067|
|{2024-03-23 12:55:00, 2024-03-23 13:00:00}|41          |505904|
|{2024-03-23 12:55:00, 2024-03-23 13:00:00}|75          |461603|
|{2024-03-23 12:55:00, 2024-03-23 13:00:00}|129         |428991|
|{2024-03-23 12:55:00, 2024-03-23 13:00:00}|7           |414963|
|{2024-03-23 12:55:00, 2024-03-23 13:00:00}|166         |390433|
|{2024-03-23 12:55:00, 2024-03-23 13:00:00}|236         |284699|
|{2024-03-23 12:55:00, 2024-03-23 13:00:00}|223         |271427|
|{2024-03-23 12:55:00, 2024-03-23 13:00:00}|238         |263448|
|{2024-03-23 12:55:00, 2024-03-23 13:00:00}|82          |2

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Users/djr/miniconda3/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [19]:
query.stop()