In [2]:
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import json
import time 
from kafka import KafkaProducer
from pyspark.sql import types
from time import sleep

## load data

In [6]:
TOPIC_NAME = 'green-trips2'
TEST_LOAD=False

print(' load green trips data to df..')
df = pd.read_csv('green_tripdata_2019-10.csv')

#look at the data
for i, row in enumerate(df.itertuples(index=False),1):
    row_dict = {col: getattr(row, col) for col in row._fields}
    print(i, row_dict, '')
    if i == 3:
        break
##        

 load green trips data to df..
1 {'VendorID': 2.0, 'lpep_pickup_datetime': '2019-10-01 00:26:02', 'lpep_dropoff_datetime': '2019-10-01 00:39:58', 'store_and_fwd_flag': 'N', 'RatecodeID': 1.0, 'PULocationID': 112, 'DOLocationID': 196, 'passenger_count': 1.0, 'trip_distance': 5.88, 'fare_amount': 18.0, 'extra': 0.5, 'mta_tax': 0.5, 'tip_amount': 0.0, 'tolls_amount': 0.0, 'ehail_fee': nan, 'improvement_surcharge': 0.3, 'total_amount': 19.3, 'payment_type': 2.0, 'trip_type': 1.0, 'congestion_surcharge': 0.0} 
2 {'VendorID': 1.0, 'lpep_pickup_datetime': '2019-10-01 00:18:11', 'lpep_dropoff_datetime': '2019-10-01 00:22:38', 'store_and_fwd_flag': 'N', 'RatecodeID': 1.0, 'PULocationID': 43, 'DOLocationID': 263, 'passenger_count': 1.0, 'trip_distance': 0.8, 'fare_amount': 5.0, 'extra': 3.25, 'mta_tax': 0.5, 'tip_amount': 0.0, 'tolls_amount': 0.0, 'ehail_fee': nan, 'improvement_surcharge': 0.3, 'total_amount': 9.05, 'payment_type': 2.0, 'trip_type': 1.0, 'congestion_surcharge': 0.0} 
3 {'VendorI

## check kafka connection


In [7]:
#check kafka connection

print('check kafka connection..')
def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'
producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)
producer.bootstrap_connected()

check kafka connection..


True

## load data to kafka

In [8]:
print('load data to kafka..')
##load data to kafka
def load_data():
    t0 = time.time()
    for i, row in enumerate(df.itertuples(index=False),1):
        row_dict = {col: getattr(row, col) for col in row._fields}
        #print(i, row_dict, '')
        if TEST_LOAD and i == 5:
            break
        message = {}    
        message['lpep_pickup_datetime']= row_dict['lpep_pickup_datetime']
        message['lpep_dropoff_datetime']= row_dict['lpep_dropoff_datetime']
        message['PULocationID']= row_dict['PULocationID']
        message['DOLocationID']= row_dict['DOLocationID']
        message['passenger_count']= row_dict['passenger_count']
        message['trip_distance']= row_dict['trip_distance']
        message['tip_amount']= row_dict['tip_amount']
        t1 = time.time()
        producer.send(TOPIC_NAME, value=message)
        t2 = time.time()
        #print(f'** loop time to send {(t2 - t1):.2f} seconds for index ', i )
        t1 = t2

    producer.flush()
    t3 = time.time()
    print(f'**end-end took {(t3-t0):.2f} seconds')    
    
load_data()

load data to kafka..
**end-end took 64.59 seconds


## create spark consumer

In [9]:
##create spark consumer
print('create spark consumer...')
pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer2") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

create spark consumer...
:: loading settings :: url = jar:file:/usr/local/Cellar/apache-spark/3.5.1/libexec/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/amohan/.ivy2/cache
The jars for the packages stored in: /Users/amohan/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6fe16dba-d99d-4dc2-9e5e-db57ea5e03d5;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.1 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 546ms :: artifacts dl 13ms
	

## read data

In [14]:
## read data
print('read data in spark streaming...')
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", TOPIC_NAME) \
    .option("startingOffsets", "earliest") \
    .load()

read data in spark streaming...


In [12]:
print('schema for raw data')
green_stream.printSchema()
print(' green_stream ' ,green_stream)

schema for raw data
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

 green_stream  DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]


## look at first record

In [15]:
def my_peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)
    if first_row:
        print(batch_id, first_row[0])
    #end of func

print('read raw data writeStream.foreachBatch')
query1_peek = green_stream.writeStream.foreachBatch(my_peek).start()
print('query1_peek(for raw data) is active? ',query1_peek.isActive)
time.sleep(30)
query1_peek.stop()
print('query1_peek(for raw data) is still active? ',query1_peek.isActive)

read raw data writeStream.foreachBatch
query1_peek(for raw data) is active?  True


24/03/30 17:52:38 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/w7/bjnp92015253vpp0xtyzp16c0000gn/T/temporary-2c912429-1ef8-4e40-8d32-9388d8716523. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/30 17:52:38 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/30 17:52:38 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

0 Row(key=None, value=bytearray(b'{"lpep_pickup_datetime": "2019-10-01 00:26:02", "lpep_dropoff_datetime": "2019-10-01 00:39:58", "PULocationID": 112, "DOLocationID": 196, "passenger_count": 1.0, "trip_distance": 5.88, "tip_amount": 0.0}'), topic='green-trips2', partition=0, offset=0, timestamp=datetime.datetime(2024, 3, 30, 17, 38, 8, 323000), timestampType=0)
query1_peek(for raw data) is still active?  False


## adding schema to dataframe

In [16]:
print('add schema to data...')
### add schema to data
schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

green_stream_withschema = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

print('schema after attaching schema to data')
green_stream_withschema.printSchema()

add schema to data...
schema after attaching schema to data
root
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- tip_amount: double (nullable = true)



## look at the data again (with schema)

In [17]:
print('read raw data writeStream.foreachBatch (with schema)')
query2_peek = green_stream_withschema.writeStream.foreachBatch(my_peek).start()
print('query2_peek (with schema) is active? ',query2_peek.isActive)
time.sleep(30)
query2_peek.stop()
print('query2_peek (with schema) is still active? ',query2_peek.isActive)

read raw data writeStream.foreachBatch (with schema)
query2_peek (with schema) is active?  True


24/03/30 17:56:54 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/w7/bjnp92015253vpp0xtyzp16c0000gn/T/temporary-8f17cccc-fa81-497e-ab1a-2a7b497c0598. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/30 17:56:54 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/30 17:56:54 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


0 Row(lpep_pickup_datetime='2019-10-01 00:26:02', lpep_dropoff_datetime='2019-10-01 00:39:58', PULocationID=112, DOLocationID=196, passenger_count=1.0, trip_distance=5.88, tip_amount=0.0)
query2_peek (with schema) is still active?  False
