### Question 1: Redpanda version

In [23]:
!docker exec redpanda-1 rpk help

rpk is the Redpanda CLI & toolbox

Usage:
  rpk [command]

Available Commands:
  acl         Manage ACLs and SASL users
  cloud       Interact with Redpanda cloud
  cluster     Interact with a Redpanda cluster
  container   Manage a local container cluster
  debug       Debug the local Redpanda process
  generate    Generate a configuration template for related services
  group       Describe, list, and delete consumer groups and manage their offsets
  help        Help about any command
  iotune      Measure filesystem performance and create IO configuration file
  plugin      List, download, update, and remove rpk plugins
  redpanda    Interact with a local Redpanda process
  topic       Create, delete, produce to and consume from Redpanda topics
  version     Check the current version
  wasm        Deploy and remove inline WASM engine scripts

Flags:
  -h, --help      Help for rpk
  -v, --verbose   Enable verbose logging (default: false)

Use "rpk [command] --help" for more informati

In [24]:
!docker exec redpanda-1 rpk version

v22.3.5 (rev 28b2443)


### Question 2. Creating a topic

In [1]:
!docker exec redpanda-1 rpk topic create test-topic


TOPIC       STATUS
test-topic  OK


### Question 3. Connecting to the Kafka server
_ _ _

In [1]:
import json
import time 

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

True

### Question 4. Sending data to the stream

In [3]:
t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)
    
t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')
print("data sent")
producer.flush()

t2 = time.time()
print(f'took {(t2 - t1):.2f} seconds')

Sent: {'number': 0}
Sent: {'number': 1}
Sent: {'number': 2}
Sent: {'number': 3}
Sent: {'number': 4}
Sent: {'number': 5}
Sent: {'number': 6}
Sent: {'number': 7}
Sent: {'number': 8}
Sent: {'number': 9}
took 0.51 seconds
data sent
took 0.00 seconds


### Reading data with rpk

In [None]:
!docker exec redpanda-1 rpk topic consume test-topic

{
  "topic": "test-topic",
  "value": "{\"number\": 0}",
  "timestamp": 1712609832603,
  "partition": 0,
  "offset": 0
}
{
  "topic": "test-topic",
  "value": "{\"number\": 1}",
  "timestamp": 1712609832657,
  "partition": 0,
  "offset": 1
}
{
  "topic": "test-topic",
  "value": "{\"number\": 2}",
  "timestamp": 1712609832707,
  "partition": 0,
  "offset": 2
}
{
  "topic": "test-topic",
  "value": "{\"number\": 3}",
  "timestamp": 1712609832758,
  "partition": 0,
  "offset": 3
}
{
  "topic": "test-topic",
  "value": "{\"number\": 4}",
  "timestamp": 1712609832809,
  "partition": 0,
  "offset": 4
}
{
  "topic": "test-topic",
  "value": "{\"number\": 5}",
  "timestamp": 1712609832860,
  "partition": 0,
  "offset": 5
}
{
  "topic": "test-topic",
  "value": "{\"number\": 6}",
  "timestamp": 1712609832910,
  "partition": 0,
  "offset": 6
}
{
  "topic": "test-topic",
  "value": "{\"number\": 7}",
  "timestamp": 1712609832961,
  "partition": 0,
  "offset": 7
}
{
  "topic": "test-topic",
  "va

### Question 5: Sending the Trip Data

In [None]:
!docker exec redpanda-1 rpk topic create green-trips


In [12]:
# Create the directory (replace "data" with your desired directory name)
!mkdir data

# Download the file with the directory included in the path
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz -O "data/green_tripdata_2019-10.csv.gz"


--2024-04-07 11:45:50--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 140.82.121.3
Connecting to github.com (github.com)|140.82.121.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/ea580e9e-555c-4bd0-ae73-43051d8e7c0b?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240407%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240407T104549Z&X-Amz-Expires=300&X-Amz-Signature=429d84358e8c00b2d5c7b61321386bb526b8710e071860e7b57231bd5b85e69f&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dgreen_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-04-07 11:45:51--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/ea580e9e

In [12]:
import os

# Replace "../../../../" with the actual absolute path to your home directory
home_directory = "../../../../../"
os.environ["SPARK_HOME"] = os.path.join(home_directory, "spark-3.3.2-bin-hadoop3")

In [13]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

In [14]:
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark = SparkSession \
    .builder \
    .appName("Spark-Notebook") \
    .getOrCreate()

24/04/08 22:07:51 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [5]:
# Define path to the file, including the .gz extension
file_path = "data/green_tripdata_2019-10.csv.gz"

# Read the data as a DataFrame with options for header and compression
# df = spark.read.option("header", True).option("compression", "gzip").csv(file_path)
df = spark.read.option("header", "true").option("compression", "gzip").option("inferSchema", "true").csv(file_path)


# Show the schema and the first few rows (optional)
df.printSchema()
df.show(3)

                                                                                

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)

+--------+--------------------+---------------------+------------------+----------+-

In [6]:
columns = ['lpep_pickup_datetime', 'lpep_dropoff_datetime', 'PULocationID','DOLocationID','passenger_count','trip_distance','tip_amount']

In [7]:
# Select specific columns
selected_columns = df.select(columns)
print("Selected Columns:")
selected_columns.show(3)

Selected Columns:
+--------------------+---------------------+------------+------------+---------------+-------------+----------+
|lpep_pickup_datetime|lpep_dropoff_datetime|PULocationID|DOLocationID|passenger_count|trip_distance|tip_amount|
+--------------------+---------------------+------------+------------+---------------+-------------+----------+
| 2019-10-01 00:26:02|  2019-10-01 00:39:58|         112|         196|              1|         5.88|       0.0|
| 2019-10-01 00:18:11|  2019-10-01 00:22:38|          43|         263|              1|          0.8|       0.0|
| 2019-10-01 00:09:31|  2019-10-01 00:24:47|         255|         228|              2|          7.5|       0.0|
+--------------------+---------------------+------------+------------+---------------+-------------+----------+
only showing top 3 rows



In [49]:
for row in selected_columns.collect():
    row_dict = row.asDict()
    data = {}
    for column_name, column_value in row_dict.items():
        data[column_name] = "{}".format(column_value)
    print(data)
    break

                                                                                

{'lpep_pickup_datetime': '2019-10-01 00:26:02', 'lpep_dropoff_datetime': '2019-10-01 00:39:58', 'PULocationID': '112', 'DOLocationID': '196', 'passenger_count': '1', 'trip_distance': '5.88', 'tip_amount': '0.0'}


In [8]:
t0 = time.time()

topic_name = 'green-trips'

for row in selected_columns.collect():
    row_dict = row.asDict()
    data = {}
    for column_name, column_value in row_dict.items():
        data[column_name] = "{}".format(column_value)
    producer.send(topic_name, value=data)
    

producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds to send data')

                                                                                

took 181.79 seconds to send data


In [9]:
spark.stop()

In [17]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
# kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    # .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

IndentationError: unexpected indent (319676990.py, line 12)

In [18]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

AnalysisException:  Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".        

In [21]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

24/04/08 16:59:53 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-76a819d8-bac0-48e2-953a-53cc0dc4a6c2. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/08 16:59:53 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [16]:
query.stop()

In [13]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [14]:
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [15]:
green_stream

DataFrame[lpep_pickup_datetime: string, lpep_dropoff_datetime: string, PULocationID: int, DOLocationID: int, passenger_count: double, trip_distance: double, tip_amount: double]

In [18]:
spark.stop()