In [3]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

24/03/19 04:42:56 WARN Utils: Your hostname, codespaces-40bcf2 resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
24/03/19 04:42:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/usr/local/python/3.10.13/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/codespace/.ivy2/cache
The jars for the packages stored in: /home/codespace/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-414862ff-7d9a-4164-9c67-38b3d7f84d69;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.1 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 812ms :: artifacts dl 34

In [7]:
green_stream_a = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [8]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [9]:
from pyspark.sql import functions as F

green_stream_b = green_stream_a \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [11]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream_b.writeStream.foreachBatch(peek).start()

24/03/19 04:45:18 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-8dd3e8cf-aec8-42b3-baab-6f3262837fe4. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/19 04:45:18 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/19 04:45:19 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

Row(lpep_pickup_datetime=None, lpep_dropoff_datetime=None, PULocationID=None, DOLocationID=None, passenger_count=None, trip_distance=None, tip_amount=None)


In [12]:
from pyspark.sql.functions import current_timestamp, col
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType

In [13]:
stream_with_timestamp = green_stream_b.withColumn("timestamp", current_timestamp())

In [15]:
grouped_stream = stream_with_timestamp.groupBy(
    col("DOLocationID"),
    F.window(col("timestamp"), "5 minutes")
).count()

In [16]:
ordered_stream = grouped_stream.orderBy(col("count").desc())

In [17]:
query = ordered_stream \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

24/03/19 04:50:29 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-a1cd7b2f-b60c-4f0e-a55f-e24e1552aa69. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/19 04:50:29 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/19 04:50:29 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------+------------------------------------------+-----+
|DOLocationID|window                                    |count|
+------------+------------------------------------------+-----+
|74          |{2024-03-19 04:50:00, 2024-03-19 04:55:00}|17741|
|42          |{2024-03-19 04:50:00, 2024-03-19 04:55:00}|15942|
|41          |{2024-03-19 04:50:00, 2024-03-19 04:55:00}|14061|
|75          |{2024-03-19 04:50:00, 2024-03-19 04:55:00}|12840|
|129         |{2024-03-19 04:50:00, 2024-03-19 04:55:00}|11930|
|7           |{2024-03-19 04:50:00, 2024-03-19 04:55:00}|11533|
|166         |{2024-03-19 04:50:00, 2024-03-19 04:55:00}|10845|
|236         |{2024-03-19 04:50:00, 2024-03-19 04:55:00}|7913 |
|223         |{2024-03-19 04:50:00, 2024-03-19 04:55:00}|7542 |
|238         |{2024-03-19 04:50:00, 2024-03-19 04:55:00}|7318 |
|82          |{2024-03-19 04:50:00, 2024-03-19 04:55:00}|7292 |
|181   

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/python/3.10.13/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/python/3.10.13/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/local/python/3.10.13/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 