## Question 3. Connecting to the Kafka server

In [17]:
import json
import time 

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

True

## Question 4. Sending data to the stream

In [2]:
t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)

producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

Sent: {'number': 0}
Sent: {'number': 1}
Sent: {'number': 2}
Sent: {'number': 3}
Sent: {'number': 4}
Sent: {'number': 5}
Sent: {'number': 6}
Sent: {'number': 7}
Sent: {'number': 8}
Sent: {'number': 9}
took 0.51 seconds


In [None]:
# !wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz

### Sending the taxi data

In [18]:
import pandas as pd

In [19]:
columns = ['lpep_pickup_datetime', 'lpep_dropoff_datetime', 'PULocationID', 'DOLocationID', 'passenger_count', 'trip_distance', 'tip_amount']

In [20]:
df_green = pd.read_csv('green_tripdata_2019-10.csv.gz', compression='gzip', usecols=columns)

In [21]:
for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    print(row_dict)
    break

{'lpep_pickup_datetime': '2019-10-01 00:26:02', 'lpep_dropoff_datetime': '2019-10-01 00:39:58', 'PULocationID': 112, 'DOLocationID': 196, 'passenger_count': 1.0, 'trip_distance': 5.88, 'tip_amount': 0.0}


In [None]:
# t0 = time.time()

# topic_name = 'green-trips'

# for row in df_green.itertuples(index=False):
#     message = {col: getattr(row, col) for col in row._fields}
#     producer.send(topic_name, value=message)
#     # print(f"Sent: {message}")

# producer.flush()

# t1 = time.time()
# print(f'took {(t1 - t0):.2f} seconds')

### Creating the PySpark consumer

In [22]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

In [23]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [24]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

24/03/19 02:49:35 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-ab2a0b4b-8879-48d3-850b-62b27240f3c7. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/19 02:49:35 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/19 02:49:35 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


Row(key=None, value=bytearray(b'{"lpep_pickup_datetime": "2019-10-01 00:26:02", "lpep_dropoff_datetime": "2019-10-01 00:39:58", "PULocationID": 112, "DOLocationID": 196, "passenger_count": 1.0, "trip_distance": 5.88, "tip_amount": 0.0}'), topic='green-trips', partition=0, offset=0, timestamp=datetime.datetime(2024, 3, 17, 18, 4, 51, 844000), timestampType=0)


### Question 6. Parsing the data

In [25]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [30]:
from pyspark.sql import functions as F

In [31]:
green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `value` cannot be resolved. Did you mean one of the following? [`tip_amount`, `DOLocationID`, `PULocationID`, `trip_distance`, `passenger_count`].;
'Project [from_json(StructField(lpep_pickup_datetime,StringType,true), StructField(lpep_dropoff_datetime,StringType,true), StructField(PULocationID,IntegerType,true), StructField(DOLocationID,IntegerType,true), StructField(passenger_count,DoubleType,true), StructField(trip_distance,DoubleType,true), StructField(tip_amount,DoubleType,true), cast('value as string), Some(Etc/UTC)) AS data#134]
+- Project [data#109.lpep_pickup_datetime AS lpep_pickup_datetime#111, data#109.lpep_dropoff_datetime AS lpep_dropoff_datetime#112, data#109.PULocationID AS PULocationID#113, data#109.DOLocationID AS DOLocationID#114, data#109.passenger_count AS passenger_count#115, data#109.trip_distance AS trip_distance#116, data#109.tip_amount AS tip_amount#117]
   +- Project [from_json(StructField(lpep_pickup_datetime,StringType,true), StructField(lpep_dropoff_datetime,StringType,true), StructField(PULocationID,IntegerType,true), StructField(DOLocationID,IntegerType,true), StructField(passenger_count,DoubleType,true), StructField(trip_distance,DoubleType,true), StructField(tip_amount,DoubleType,true), cast(value#75 as string), Some(Etc/UTC)) AS data#109]
      +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@53373646, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@5df0c250, [startingOffsets=earliest, kafka.bootstrap.servers=localhost:9092, subscribe=green-trips], [key#74, value#75, topic#76, partition#77, offset#78L, timestamp#79, timestampType#80], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@113df02d,kafka,List(),None,List(),None,Map(kafka.bootstrap.servers -> localhost:9092, subscribe -> green-trips, startingOffsets -> earliest),None), kafka, [key#67, value#68, topic#69, partition#70, offset#71L, timestamp#72, timestampType#73]


In [32]:
green_stream.printSchema()

root
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- tip_amount: double (nullable = true)



### Question 7: Most popular destination

In [65]:
green_stream = green_stream.withColumn("timestamp", F.current_timestamp())

In [66]:
popular_destinations = green_stream.groupBy(F.window("timestamp", "5 minutes"), "DOLocationID") \
            .agg(F.count("DOLocationID").alias("DOLocationID_count")) \
            .orderBy(F.desc("DOLocationID_count"))

In [67]:
query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

24/03/19 04:09:27 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-af398b18-ca42-4529-8cd5-08408757b0f2. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/19 04:09:27 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/19 04:09:27 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+------------+------------------+
|window                                    |DOLocationID|DOLocationID_count|
+------------------------------------------+------------+------------------+
|{2024-03-19 04:05:00, 2024-03-19 04:10:00}|74          |35482             |
|{2024-03-19 04:05:00, 2024-03-19 04:10:00}|42          |31884             |
|{2024-03-19 04:05:00, 2024-03-19 04:10:00}|41          |28122             |
|{2024-03-19 04:05:00, 2024-03-19 04:10:00}|75          |25680             |
|{2024-03-19 04:05:00, 2024-03-19 04:10:00}|129         |23860             |
|{2024-03-19 04:05:00, 2024-03-19 04:10:00}|7           |23066             |
|{2024-03-19 04:05:00, 2024-03-19 04:10:00}|166         |21690             |
|{2024-03-19 04:05:00, 2024-03-19 04:10:00}|236         |15826             |
|{2024-03-19 04:05:00, 2024-03-19 04:10:00}|223         

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/gianluca/spark/spark-3.5.0-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/gianluca/spark/spark-3.5.0-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/gianluca/anaconda3/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 