In [None]:
pip install kafka

In [None]:
pip install kafka-python

In [None]:
pip install requests requests_oauthlib kafka

In [None]:
pip install git+https://github.com/dpkp/kafka-python.git

In [None]:
import json
import time 

In [None]:
from kafka import KafkaProducer

In [None]:
def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

In [None]:
t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    t1a = time.time()
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    t1b = time.time()
    print(f'Sending took {(t1b - t1a):.2f} seconds')
    time.sleep(0.05)

tPreFlush = time.time()
producer.flush()
tPostFlush = time.time()
print(f'Flushing took {(tPostFlush - tPreFlush):.2f} seconds')

tEnd = time.time()
print(f'Total took {(tEnd - t0):.2f} seconds')

In [None]:
import pandas as pd
import datetime

In [None]:
dtypes = {'lpep_pickup_datetime' : str,
          'lpep_dropoff_datetime' : str,
         'PULocationID': int, 
         'DOLocationID': int, 
         'passenger_count': int, 
         'trip_distance': float, 
         'tip_amount': float}

In [None]:
headers = ['lpep_pickup_datetime','lpep_dropoff_datetime','PULocationID', 'DOLocationID','passenger_count', 'trip_distance','tip_amount']

In [None]:
dates =  ['lpep_pickup_datetime','lpep_dropoff_datetime']

In [None]:
import pyspark
from pyspark.sql import SparkSession
pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"


spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
df = spark.read.csv('green_tripdata_2019-10.csv.gz', header=True)

In [None]:
from pyspark.sql import functions as F

# Add the timestamp column using current_timestamp function
df_with_timestamp = df.withColumn('timestamp', F.current_timestamp())


# Convert the pyspark dataframe to pandas dataframe
pandas_df = df_with_timestamp.toPandas()

# Convert the timestamp column to string
pandas_df['timestamp'] = pandas_df['timestamp'].dt.strftime('%Y-%m-%d %H:%M:%S')

In [None]:
pandas_df

In [None]:
pandas_df.dtypes

In [None]:
pip install pyspark

In [None]:
t0 = time.time()

for row in pandas_df.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    print(row_dict)    
    producer.send('green-trips', value=row_dict)
    
producer.flush()

tEnd = time.time()
print(f'Total took {(tEnd - t0):.2f} seconds')

In [None]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:8080") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [None]:
def peek(mini_batch, batch_id):
        first_row = mini_batch.take(1)
    
        if first_row:
            display(first_row[0])

In [None]:
query = green_stream.writeStream.foreachBatch(peek).start()

In [None]:
query.stop()

In [None]:
from pyspark.sql import types

In [None]:
schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType()) \
    .add("timestamp", types.StringType())

In [None]:
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [None]:
query = green_stream.writeStream.foreachBatch(peek).start()

In [None]:
popular_destinations = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [None]:
popular_destinations = popular_destinations \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*");




In [None]:
popular_destinations

In [None]:
groupeddestionations = popular_destinations.groupBy("DOLocationID", "timestamp").count().orderBy("count",  ascending=False)


In [None]:
groupeddestinations = groupeddestionations.orderBy("count", ascending=False)

In [None]:
query = groupeddestionations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", False) \
    .start()

query.awaitTermination()