In [1]:
import json
import time 

from kafka import KafkaProducer

In [2]:
def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

In [8]:
producer.bootstrap_connected()

AttributeError: 'KafkaProducer' object has no attribute 'bootstrap_connected'

In [6]:
t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)

producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

Sent: {'number': 0}
Sent: {'number': 1}
Sent: {'number': 2}
Sent: {'number': 3}
Sent: {'number': 4}
Sent: {'number': 5}
Sent: {'number': 6}
Sent: {'number': 7}
Sent: {'number': 8}
Sent: {'number': 9}
took 0.50 seconds


In [44]:
#we do not need a spark session to create the dataframe as we are no longer inferring the schema
#we want to leave all attributes as string
import pyspark
from pyspark.sql import SparkSession

In [45]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

24/03/17 16:58:56 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [28]:
df = spark.read \
    .option("header", "true") \
    .csv('green_tripdata_2019-10.csv')

In [None]:
.option("inferSchema", "true") \

In [29]:
df.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- trip_type: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)



In [30]:
df_green = df \
    .select(['lpep_pickup_datetime','lpep_dropoff_datetime','PULocationID','DOLocationID','passenger_count','trip_distance','tip_amount'])

In [31]:
df_green.show()

+--------------------+---------------------+------------+------------+---------------+-------------+----------+
|lpep_pickup_datetime|lpep_dropoff_datetime|PULocationID|DOLocationID|passenger_count|trip_distance|tip_amount|
+--------------------+---------------------+------------+------------+---------------+-------------+----------+
| 2019-10-01 00:26:02|  2019-10-01 00:39:58|         112|         196|              1|         5.88|         0|
| 2019-10-01 00:18:11|  2019-10-01 00:22:38|          43|         263|              1|          .80|         0|
| 2019-10-01 00:09:31|  2019-10-01 00:24:47|         255|         228|              2|         7.50|         0|
| 2019-10-01 00:37:40|  2019-10-01 00:41:49|         181|         181|              1|          .90|         0|
| 2019-10-01 00:08:13|  2019-10-01 00:17:56|          97|         188|              1|         2.52|      2.26|
| 2019-10-01 00:35:01|  2019-10-01 00:43:40|          65|          49|              1|         1.47|    

In [32]:
import pandas as pd

In [33]:
pandas_green_df = df_green.toPandas()

                                                                                

In [34]:
for row in pandas_green_df.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    print(row_dict)
    break

{'lpep_pickup_datetime': '2019-10-01 00:26:02', 'lpep_dropoff_datetime': '2019-10-01 00:39:58', 'PULocationID': '112', 'DOLocationID': '196', 'passenger_count': '1', 'trip_distance': '5.88', 'tip_amount': '0'}


In [39]:
topic_name = 'test-topic'

for row in pandas_green_df.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    print(row_dict)
    break
    
producer.send(topic_name, value=row_dict)
producer.flush()

{'lpep_pickup_datetime': '2019-10-01 00:26:02', 'lpep_dropoff_datetime': '2019-10-01 00:39:58', 'PULocationID': '112', 'DOLocationID': '196', 'passenger_count': '1', 'trip_distance': '5.88', 'tip_amount': '0'}


In [43]:
t0 = time.time()

topic_name = 'green-trips'

for row in pandas_green_df.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    producer.send(topic_name, value=row_dict)

producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

took 67.31 seconds


In [46]:
spark.stop()

In [3]:
import pyspark
from pyspark.sql import SparkSession

In [4]:
pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

In [3]:
kafka_jar_package

'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2'

In [4]:
pyspark.__version__

'3.3.2'

In [5]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

24/03/18 18:37:59 WARN Utils: Your hostname, codespaces-45ac75 resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
24/03/18 18:37:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/workspaces/GabrielZoomcamp2024/week_5_batch_processing/spark/spark-3.3.2-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/codespace/.ivy2/cache
The jars for the packages stored in: /home/codespace/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-10c81952-0ca1-4bc9-8b49-cbea691df02b;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.2 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:

24/03/18 18:38:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [6]:
# now we connect to the stream
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [7]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

24/03/18 18:38:20 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-43ad9533-564f-4128-bc51-5706d75d3840. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/18 18:38:20 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

Row(key=None, value=bytearray(b'{"lpep_pickup_datetime": "2019-10-01 00:26:02", "lpep_dropoff_datetime": "2019-10-01 00:39:58", "PULocationID": "112", "DOLocationID": "196", "passenger_count": "1", "trip_distance": "5.88", "tip_amount": "0"}'), topic='green-trips', partition=0, offset=0, timestamp=datetime.datetime(2024, 3, 17, 16, 57, 3, 565000), timestampType=0)
Row(lpep_pickup_datetime='2019-10-01 00:26:02', lpep_dropoff_datetime='2019-10-01 00:39:58', PULocationID=None, DOLocationID=None, passenger_count=None, trip_distance=None, tip_amount=None)
Row(lpep_pickup_datetime='2019-10-01 00:26:02', lpep_dropoff_datetime='2019-10-01 00:39:58', PULocationID=None, DOLocationID=None, passenger_count=None, trip_distance=None, tip_amount=None)


In [11]:
query.stop()

In [8]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [9]:
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [12]:
query = green_stream.writeStream.foreachBatch(peek).start()

24/03/18 18:39:46 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-f5ac44ef-80d8-488d-8311-8ab77d54f23b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/18 18:39:46 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
