In [1]:
import json
import time 

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

True

In [13]:
t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    #time.sleep(0.05)

#producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

Sent: {'number': 0}
Sent: {'number': 1}
Sent: {'number': 2}
Sent: {'number': 3}
Sent: {'number': 4}
Sent: {'number': 5}
Sent: {'number': 6}
Sent: {'number': 7}
Sent: {'number': 8}
Sent: {'number': 9}
took 0.00 seconds


In [12]:
t0 = time.time()

topic_name = 'test-topic'

producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

took 0.00 seconds


Read the data

Data folder and file: Module06/green_tripdata_2019-10.csv

In [14]:
import pandas as pd

In [16]:
df = pd.read_csv('Module06/green_tripdata_2019-10.csv')
df.info()

  df = pd.read_csv('Module06/green_tripdata_2019-10.csv')


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 476386 entries, 0 to 476385
Data columns (total 20 columns):
 #   Column                 Non-Null Count   Dtype  
---  ------                 --------------   -----  
 0   VendorID               387007 non-null  float64
 1   lpep_pickup_datetime   476386 non-null  object 
 2   lpep_dropoff_datetime  476386 non-null  object 
 3   store_and_fwd_flag     387007 non-null  object 
 4   RatecodeID             387007 non-null  float64
 5   PULocationID           476386 non-null  int64  
 6   DOLocationID           476386 non-null  int64  
 7   passenger_count        387007 non-null  float64
 8   trip_distance          476386 non-null  float64
 9   fare_amount            476386 non-null  float64
 10  extra                  476386 non-null  float64
 11  mta_tax                476386 non-null  float64
 12  tip_amount             476386 non-null  float64
 13  tolls_amount           476386 non-null  float64
 14  ehail_fee              0 non-null   

In [17]:
df = df[['lpep_pickup_datetime',
'lpep_dropoff_datetime',
'PULocationID',
'DOLocationID',
'passenger_count',
'trip_distance',
'tip_amount']]

df.head()

Unnamed: 0,lpep_pickup_datetime,lpep_dropoff_datetime,PULocationID,DOLocationID,passenger_count,trip_distance,tip_amount
0,2019-10-01 00:26:02,2019-10-01 00:39:58,112,196,1.0,5.88,0.0
1,2019-10-01 00:18:11,2019-10-01 00:22:38,43,263,1.0,0.8,0.0
2,2019-10-01 00:09:31,2019-10-01 00:24:47,255,228,2.0,7.5,0.0
3,2019-10-01 00:37:40,2019-10-01 00:41:49,181,181,1.0,0.9,0.0
4,2019-10-01 00:08:13,2019-10-01 00:17:56,97,188,1.0,2.52,2.26


In [18]:
df.shape

(476386, 7)

Now, we send the dataframe to a kafka topic. We create the topic `green-rides-201910` for this messages.



In [None]:
t0 = time.time()

topic_name = 'green-rides-201910'

for row in df.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    producer.send(topic_name, value=row_dict)
    print(f"Sent: {row_dict}")

producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

## Creating a PySpark consumer

Spark needs a library (jar) to be able to connect to Kafka, so we need to tell PySpark that it needs to use it:tOrCreate()

In [None]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

Now we can connect to the stream:

In [None]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-rides-201910") \
    .option("startingOffsets", "earliest") \
    .load()

So we can execute a function over each mini-batch. Let's run take(1) there to see what do we have in the stream:

In [None]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

Now let's stop the query, so it doesn't keep consuming messages from the stream

In [None]:
query.stop()

## Parsing the data

The data is JSON, but currently it's in binary format. We need to parse it and turn it into a streaming dataframe with proper columns.

Similarly to PySpark, we define the schema

In [None]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

And apply this schema:

In [None]:
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [None]:
green_stream.printSchema()

## Most popular destination

Now let's finally do some streaming analytics. We will see what's the most popular destination currently based on our stream of data (which ideally we should have sent with delays like we did in workshop 2)
This is how you can do it:
    1. 
Add a column "timestamp" using the current_timestamp functi
    2. n
Group b        - y:
5 minutes window based on the timestamp column (F.window(col("timestamp"), "5 minutes        - "))
"DOLocati
    3. nID"
Order by
 count

In [None]:
from pyspark.sql.functions import current_timestamp, col, window, count

In [None]:
green_stream_with_timestamp = green_stream.withColumn("timestamp", current_timestamp())

In [None]:
popular_destinations = green_stream_with_timestamp \
    .groupBy(window(col("timestamp"), "5 minutes"), col("DOLocationID")) \
    .agg(count("*").alias("count")) \
    .orderBy(col("count").desc())

You can print the output to the console using this code

In [None]:
query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

In [None]:
query.stop()