```bash
docker compose up -d
docker exec -it redpanda-1 bash
```

In [11]:
import json
import time 

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

True

In [12]:
t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)

t0p5 = time.time()

producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

print(f'sending messages took {(t0p5 - t0):.2f} seconds')
print(f'flushing took {(t1 - t0p5):.2f} seconds')

Sent: {'number': 0}
Sent: {'number': 1}
Sent: {'number': 2}
Sent: {'number': 3}
Sent: {'number': 4}
Sent: {'number': 5}
Sent: {'number': 6}
Sent: {'number': 7}
Sent: {'number': 8}
Sent: {'number': 9}
took 0.54 seconds
sending messages took 0.54 seconds
flushing took 0.00 seconds


In [13]:
import pandas as pd

df_green = pd.read_csv('./data/green_tripdata_2019-10.csv.gz', compression='gzip')

  df_green = pd.read_csv('./data/green_tripdata_2019-10.csv.gz', compression='gzip')


In [14]:
cols_to_use = ['lpep_pickup_datetime',
                'lpep_dropoff_datetime',
                'PULocationID',
                'DOLocationID',
                'passenger_count',
                'trip_distance',
                'tip_amount']

df_green = df_green[cols_to_use]

In [15]:
df_green

Unnamed: 0,lpep_pickup_datetime,lpep_dropoff_datetime,PULocationID,DOLocationID,passenger_count,trip_distance,tip_amount
0,2019-10-01 00:26:02,2019-10-01 00:39:58,112,196,1.0,5.88,0.00
1,2019-10-01 00:18:11,2019-10-01 00:22:38,43,263,1.0,0.80,0.00
2,2019-10-01 00:09:31,2019-10-01 00:24:47,255,228,2.0,7.50,0.00
3,2019-10-01 00:37:40,2019-10-01 00:41:49,181,181,1.0,0.90,0.00
4,2019-10-01 00:08:13,2019-10-01 00:17:56,97,188,1.0,2.52,2.26
...,...,...,...,...,...,...,...
476381,2019-10-31 23:30:00,2019-11-01 00:00:00,65,102,,7.04,0.00
476382,2019-10-31 23:03:00,2019-10-31 23:24:00,129,136,,0.00,0.00
476383,2019-10-31 23:02:00,2019-10-31 23:23:00,61,222,,3.90,0.00
476384,2019-10-31 23:42:00,2019-10-31 23:56:00,76,39,,3.08,0.00


In [16]:
for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    print(row_dict)
    break

{'lpep_pickup_datetime': '2019-10-01 00:26:02', 'lpep_dropoff_datetime': '2019-10-01 00:39:58', 'PULocationID': 112, 'DOLocationID': 196, 'passenger_count': 1.0, 'trip_distance': 5.88, 'tip_amount': 0.0}


```
redpanda@1e7e76f85ee5:/$ rpk topic create green-trips
TOPIC        STATUS
green-trips  OK
```

In [17]:
t0 = time.time()

topic_name = 'green-trips'

for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    producer.send(topic_name, value=row_dict)

t0p5 = time.time()

producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

print(f'sending messages took {(t0p5 - t0):.2f} seconds')
print(f'flushing took {(t1 - t0p5):.2f} seconds')

Row(lpep_pickup_datetime='2019-10-01 00:26:02', lpep_dropoff_datetime='2019-10-01 00:39:58', PULocationID=112, DOLocationID=196, passenger_count=1.0, trip_distance=5.88, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-01 00:57:44', lpep_dropoff_datetime='2019-10-01 01:06:59', PULocationID=25, DOLocationID=228, passenger_count=1.0, trip_distance=2.62, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-01 14:32:40', lpep_dropoff_datetime='2019-10-01 14:36:14', PULocationID=25, DOLocationID=40, passenger_count=1.0, trip_distance=0.65, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-01 17:04:41', lpep_dropoff_datetime='2019-10-01 17:40:25', PULocationID=244, DOLocationID=233, passenger_count=1.0, trip_distance=9.05, tip_amount=7.11)
Row(lpep_pickup_datetime='2019-10-02 07:22:35', lpep_dropoff_datetime='2019-10-02 07:37:34', PULocationID=74, DOLocationID=263, passenger_count=1.0, trip_distance=2.21, tip_amount=2.26)
Row(lpep_pickup_datetime='2019-10-02 13:13:26', lpep_dropoff_datetime='2

Row(lpep_pickup_datetime='2019-10-31 11:06:00', lpep_dropoff_datetime='2019-10-31 11:51:00', PULocationID=61, DOLocationID=79, passenger_count=nan, trip_distance=6.92, tip_amount=0.0)


In [2]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

In [3]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

24/03/14 12:44:56 WARN Utils: Your hostname, Andromeda.local resolves to a loopback address: 127.0.0.1; using 172.26.4.116 instead (on interface en0)
24/03/14 12:44:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/abhirupghosh/.ivy2/cache
The jars for the packages stored in: /Users/abhirupghosh/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-4435bdd6-8030-4d13-ad31-afeb988f3df7;1.0
	confs: [default]


:: loading settings :: url = jar:file:/Users/abhirupghosh/opt/anaconda3/envs/ml-zoomcamp/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.1 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.5.1/spark-sql-kafka-0-10_2.12-3.5.1.jar ...
	[SUCCESSFUL ] org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.1!spark-sql-kafka-0-10_2.12.jar (60ms)
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.5.1/spark-token-provider-kafka-0

24/03/14 12:45:18 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [4]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [5]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

24/03/14 13:46:31 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/r6/4c778b992hv25hqynjsb5f380000gn/T/temporary-1cdb52c3-e328-48c2-9733-ed9bd9a238e6. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/14 13:46:31 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


24/03/14 13:46:31 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


Row(key=None, value=bytearray(b'{"lpep_pickup_datetime": "2019-10-01 00:26:02", "lpep_dropoff_datetime": "2019-10-01 00:39:58", "PULocationID": 112, "DOLocationID": 196, "passenger_count": 1.0, "trip_distance": 5.88, "tip_amount": 0.0}'), topic='green-trips', partition=0, offset=0, timestamp=datetime.datetime(2024, 3, 14, 11, 47, 59, 955000), timestampType=0)


In [6]:
query.stop()

In [7]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [8]:
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [9]:
query = green_stream.writeStream.foreachBatch(peek).start()

24/03/14 13:47:44 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/r6/4c778b992hv25hqynjsb5f380000gn/T/temporary-36da9eb2-e333-4345-a056-ab2fd55dd84c. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/14 13:47:44 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


24/03/14 13:47:44 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


Row(lpep_pickup_datetime='2019-10-01 00:26:02', lpep_dropoff_datetime='2019-10-01 00:39:58', PULocationID=112, DOLocationID=196, passenger_count=1.0, trip_distance=5.88, tip_amount=0.0)
