In [1]:
import pyspark
from pyspark.sql import SparkSession

from pyspark.sql import types
from pyspark.sql import functions as F

In [2]:
server = 'localhost:9092'
topic = 'green-trips'

In [3]:
pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

In [4]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", server) \
    .option("subscribe", topic) \
    .option("startingOffsets", "earliest") \
    .load()

In [5]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        display(first_row[0])

In [6]:
query = green_stream.writeStream.foreachBatch(peek).start()

Row(key=None, value=bytearray(b'{"lpep_pickup_datetime": "2019-10-01 00:26:02", "lpep_dropoff_datetime": "2019-10-01 00:39:58", "PULocationID": 112, "DOLocationID": 196, "passenger_count": 1.0, "trip_distance": 5.88, "tip_amount": 0.0}'), topic='green-trips', partition=0, offset=0, timestamp=datetime.datetime(2024, 3, 19, 11, 29, 5, 609000), timestampType=0)

In [7]:
query.stop()

### Question 6

In [8]:
schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [9]:
green_stream_with_schema = green_stream \
    .select(F.from_json(green_stream["value"].cast("string"), schema).alias("data")) \
    .select("data.*")


In [10]:
query = green_stream_with_schema.writeStream.foreachBatch(peek).start()

Row(lpep_pickup_datetime='2019-10-01 00:26:02', lpep_dropoff_datetime='2019-10-01 00:39:58', PULocationID=112, DOLocationID=196, passenger_count=1.0, trip_distance=5.88, tip_amount=0.0)

In [11]:
query.stop()

### Question 7

In [12]:
green_stream_timestamp = green_stream_with_schema.withColumn("timestamp", F.current_timestamp())

In [13]:
popular_destinations = green_stream_timestamp.groupBy( \
    F.window(F.col('timestamp'), '5 minutes'),
    F.col('DOLocationID')) \
    .count() \
    .orderBy(F.desc('count'))

In [None]:
query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()
query.awaitTermination()

![Output of question 7](img/Q7.JPG)