In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

In [3]:
pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

In [4]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

In [27]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "redpanda-1:29092") \
    .option("subscribe", "green-trips2") \
    .option("startingOffsets", "earliest") \
    .load()

In [28]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)
    

    if first_row:
        print(first_row[0])

In [29]:
query = green_stream.writeStream.foreachBatch(peek).start()

Row(key=None, value=bytearray(b'{"VendorID": 2.0, "lpep_pickup_datetime": "2019-10-01 00:26:02", "lpep_dropoff_datetime": "2019-10-01 00:39:58", "store_and_fwd_flag": "N", "RatecodeID": 1.0, "PULocationID": 112, "DOLocationID": 196, "passenger_count": 1.0, "trip_distance": 5.88, "fare_amount": 18.0, "extra": 0.5, "mta_tax": 0.5, "tip_amount": 0.0, "tolls_amount": 0.0, "ehail_fee": NaN, "improvement_surcharge": 0.3, "total_amount": 19.3, "payment_type": 2.0, "trip_type": 1.0, "congestion_surcharge": 0.0}'), topic='green-trips2', partition=0, offset=0, timestamp=datetime.datetime(2024, 4, 7, 18, 44, 9, 163000), timestampType=0)


In [30]:
query.stop()

In [32]:
schema = types.StructType() \
    .add("VendorID", types.IntegerType()) \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [33]:
green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [34]:
print(green_stream)

DataFrame[VendorID: int, lpep_pickup_datetime: string, lpep_dropoff_datetime: string, PULocationID: int, DOLocationID: int, passenger_count: double, trip_distance: double, tip_amount: double]


In [35]:
query = green_stream.writeStream.foreachBatch(peek).start()

Row(VendorID=None, lpep_pickup_datetime='2019-10-01 00:26:02', lpep_dropoff_datetime='2019-10-01 00:39:58', PULocationID=112, DOLocationID=196, passenger_count=1.0, trip_distance=5.88, tip_amount=0.0)
