In [5]:
import pyspark
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.sql.functions import from_json, col, udf
import json


pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = (
    SparkSession.builder.master("local[*]")
    .appName("GreenTripsConsumer")
    .config("spark.jars.packages", kafka_jar_package)
    .getOrCreate()
)

In [6]:
green_stream = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "coba")
    .option("startingOffsets", "earliest")
    .load()
)

In order to test that we can consume from the stream, let's see what will be the first record there.

In Spark streaming, the stream is represented as a sequence of small batches, each batch being a small RDD (or a small dataframe).

So we can execute a function over each mini-batch. Let's run take(1) there to see what do we have in the stream:


In [7]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])


query = green_stream.writeStream.foreachBatch(peek).start()

24/03/21 03:31:55 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-c6336821-bff3-40ac-9925-da47be54d82f. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/21 03:31:55 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


24/03/21 03:31:55 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

Row(key=None, value=bytearray(b'{"Index": 0, "lpep_pickup_datetime": "2019-10-01 00:26:02", "lpep_dropoff_datetime": "2019-10-01 00:39:58", "PULocationID": 112, "DOLocationID": 196, "passenger_count": 1.0, "trip_distance": 5.88, "tip_amount": 0.0}'), topic='coba', partition=0, offset=0, timestamp=datetime.datetime(2024, 3, 21, 3, 31, 5, 538000), timestampType=0)


In [None]:
# green_stream.select(col('value')).writeStream.foreachBatch(peek).start()

The data is JSON, but currently it's in binary format. We need to parse it and turn it into a streaming dataframe with proper columns.

Similarly to PySpark, we define the schema


In [8]:
from pyspark.sql import types as T

schema = T.StructType(
    [
        T.StructField("Index", T.IntegerType(), True),
        T.StructField("lpep_pickup_datetime", T.StringType(), True),
        T.StructField("lpep_dropoff_datetime", T.StringType(), True),
        T.StructField("PULocationID", T.IntegerType(), True),
        T.StructField("DOLocationID", T.IntegerType(), True),
        T.StructField("passenger_count", T.DoubleType(), True),
        T.StructField("trip_distance", T.DoubleType(), True),
        T.StructField("tip_amount", T.DoubleType(), True),
    ]
)

In [9]:
from pyspark.sql import functions as F

green_stream = green_stream.select(
    F.from_json(F.col("value").cast("STRING"), schema).alias("data")
).select("data.*")

green_stream.writeStream.foreachBatch(peek).start()

24/03/21 03:33:20 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-6047cb93-db15-40d4-8bd1-03e37ecfa848. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/21 03:33:20 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x7f0b3a681150>

24/03/21 03:33:20 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

Row(Index=0, lpep_pickup_datetime='2019-10-01 00:26:02', lpep_dropoff_datetime='2019-10-01 00:39:58', PULocationID=112, DOLocationID=196, passenger_count=1.0, trip_distance=5.88, tip_amount=0.0)


24/03/21 03:33:57 WARN KafkaOffsetReaderAdmin: Error in attempt 1 getting Kafka offsets: 
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.listOffsets(KafkaOffsetReaderAdmin.scala:88)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.$anonfun$fetchLatestOffsets$1(KafkaOffsetReaderAdmin.scala:332)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.$anonfun$partitionsAssignedToAdmin$1(KafkaOffsetReaderAdmin.scala:501)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.withRetries(KafkaOffsetReaderAdmin.scala:518)
	at org.apache.spark.sql.kafka010.KafkaOf

In [None]:
# Define a function to replace escaped backslashes and decode the bytearray
def clean_and_parse_json(data):
    # Decode the bytearray to a string
    data_str = data.alias("row").getField("value").decode("utf-8")
    # Replace escaped backslashes (**important**: replace twice to handle double escapes)
    clean_data = data_str.replace("\\\\", "\\").replace(
        "\\", "")  # Replace twice for double escapes
    # Parse the cleaned string as JSON
    try:
        return from_json(clean_data, schema)
    except Exception as e:
        print(f"Error parsing JSON: {e}")
        # Handle parsing errors (optional: log the error or return a default value)
        return None  # You may return a default value or raise an exception

In [None]:
parsed_stream = green_stream.select(col('value')).withColumn(
    "data", clean_and_parse_json(col("value")))
query = parsed_stream.writeStream.foreachBatch(peek).start()

In [None]:
# from pyspark.sql import functions as F

# green_stream = green_stream \
#   .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
#   .select("data.*")
# green_stream.show(truncate=False)
# green_stream.printSchema()


# final_df = green_stream.select(
# F.col("data.Index").cast("integer"),
# F.col("data.lpep_pickup_datetime").cast("timestamp"),
# F.col("data.lpep_dropoff_datetime").cast("timestamp"),
# F.col("data.PULocationID").cast("integer"),
# F.col("data.DOLocationID").cast("integer"),
# F.col("data.passenger_count").cast("double"),
# F.col("data.trip_distance").cast("double"),
# F.col("data.tip_amount").cast("double"))

#     F.col("data.Index").cast("integer"),
# F.col("data.lpep_pickup_datetime").cast("timestamp"),
# F.col("data.lpep_dropoff_datetime").cast("timestamp"),
# F.col("data.PULocationID").cast("integer"),
# F.col("data.DOLocationID").cast("integer"),
# F.col("data.passenger_count").cast("double"),
# F.col("data.trip_distance").cast("double"),
# F.col("data.tip_amount").cast("double"))

query = green_stream.writeStream.foreachBatch(peek).start()

In [None]:
query.stop()