# Streaming: N

In [2]:
!pip install confluent_kafka sseclient



### Imports

In [3]:
import json

from pyspark.sql import Window
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType, IntegerType
from shapely.strtree import STRtree

from shapely import Polygon
from shapely import Point

ModuleNotFoundError: No module named 'shapely'

In [None]:
from confluent_kafka import SerializingProducer, DeserializingConsumer
from confluent_kafka.serialization import StringSerializer, StringDeserializer
from confluent_kafka.admin import AdminClient, NewTopic
from uuid import uuid4
import sys, random

brokers = "kafka1:9092,kafka2:9093"

In [None]:
pconf = {
    'bootstrap.servers': brokers,
    'partitioner': 'murmur2_random',
    'key.serializer': StringSerializer('utf_8'),
    'value.serializer':  StringSerializer('utf_8')
}

In [4]:
p = SerializingProducer(pconf)

In [7]:
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("DF2_Practice") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 

spark = configure_spark_with_delta_pip(builder).getOrCreate()

spark.conf.set("spark.sql.repl.eagerEval.enabled",True) # OK for exploration, not great for performance
spark.conf.set("spark.sql.repl.eagerEval.truncate", 500)

### Reading in data

In [5]:
taxies = "sample.csv"

In [8]:
# Reading in borough dataset
with open('nyc-boroughs.geojson') as file:
    boroughs = json.load(file)

features = boroughs['features']
properties = [feature['properties'] for feature in features]
geometry = [feature['geometry'] for feature in features]

rowNumberWindow = Window.orderBy(F.lit("a"))

properties_df = spark.createDataFrame(properties).withColumn("rid",F.row_number().over(rowNumberWindow))
geometry_df = spark.createDataFrame(geometry).withColumn("rid",F.row_number().over(rowNumberWindow))
borough_base_df = properties_df.join(geometry_df,("rid")).drop("rid") #rid - row_id

In [15]:
# Finding the area size of the boroughs
def calculate_area(coordinates):
    polygon = Polygon(list(coordinates[0]))
    return polygon.area

calculate_area_udf = F.udf(calculate_area,DoubleType())

borough_df_sorted = borough_base_df.withColumn("area",calculate_area_udf(F.col("coordinates"))) \
                            .sort(F.col("area").desc()) \
                            .withColumn("row_id",F.row_number().over(rowNumberWindow) - 1)

# https://shapely.readthedocs.io/en/stable/strtree.html
geomtree = STRtree([Polygon(coords["coordinates"][0]) for coords in borough_df_sorted.select("coordinates").collect()])
borough_df_sorted

@id,borough,boroughCode,coordinates,type,area,row_id
http://nyc.pediacities.com/Resource/Borough/Queens,Queens,4,"[[[-73.89145055584646, 40.776372179016676], [-73.89145185712489, 40.776512769270866], [-73.8911204433948, 40.777124699736], [-73.8905625059536, 40.77740392096429], [-73.89035193082614, 40.77780032027483], [-73.89008729416801, 40.77778419360687], [-73.88947105399761, 40.77553555096723], [-73.88978900756067, 40.77423686116983], [-73.89019171520673, 40.77364687622498], [-73.88985169959679, 40.77375329736393], [-73.88945486903738, 40.773532951127166], [-73.8858076228666, 40.774022224475374], [-73...",Polygon,0.0271938732263071,0
http://nyc.pediacities.com/Resource/Borough/Brooklyn,Brooklyn,3,"[[[-73.95439555417089, 40.739114772522505], [-73.95198794354425, 40.73874762097681], [-73.94652352854787, 40.73692685395815], [-73.94645392267348, 40.73680959046116], [-73.9471998336712, 40.73535517811413], [-73.94706532915389, 40.73440199281923], [-73.94659806707354, 40.734442224347276], [-73.946736923407, 40.735294943719836], [-73.94692902737472, 40.735313140683736], [-73.94662663786626, 40.73586558516741], [-73.94644310074186, 40.7358131844557], [-73.94617195881511, 40.73620288877423], [-7...",Polygon,0.0187055890190034,1
http://nyc.pediacities.com/Resource/Borough/Staten_Island,Staten Island,5,"[[[-74.08221272914938, 40.64828016229008], [-74.08142228203805, 40.64850472594939], [-74.08072838762374, 40.64827487384626], [-74.07980996428705, 40.648383312987924], [-74.07899546333259, 40.648142554422414], [-74.0765065715286, 40.646968818183346], [-74.074452825637, 40.645067488723235], [-74.07395839976468, 40.645193205445516], [-74.07359107919278, 40.64499892804299], [-74.07349851367621, 40.6450833734751], [-74.07385653684726, 40.645424816099606], [-74.07333813856985, 40.64578311616224], [...",Polygon,0.016035476019122,2
http://nyc.pediacities.com/Resource/Borough/Bronx,Bronx,2,"[[[-73.87294860352858, 40.90444102266826], [-73.85946778700769, 40.90051720926688], [-73.85907692341046, 40.90100799310065], [-73.85941984091325, 40.90139312501249], [-73.85937696525053, 40.90156521816433], [-73.85902704152225, 40.90144490975347], [-73.85886401316355, 40.90171169504956], [-73.85946389958121, 40.90193387763358], [-73.85957882948261, 40.902440842771384], [-73.85926432541882, 40.902669479896545], [-73.8587282483346, 40.9022889705702], [-73.85816000004857, 40.902305710202725], [-...",Polygon,0.0114058694602546,3
http://nyc.pediacities.com/Resource/Borough/Manhattan,Manhattan,1,"[[[-73.92640556921117, 40.87762147653734], [-73.92629548795446, 40.87748846858917], [-73.92645784900961, 40.877258189008735], [-73.92619292267656, 40.877064419414374], [-73.92555292248268, 40.87710258898463], [-73.92508215701339, 40.87739761061557], [-73.92457036990588, 40.877428912671306], [-73.92329476733927, 40.87726443545978], [-73.92239768169318, 40.876780223141196], [-73.92244468226002, 40.87569183576377], [-73.9226680400197, 40.874981991369474], [-73.92261383040437, 40.87440727210196],...",Polygon,0.0058590779960357,4
http://nyc.pediacities.com/Resource/Borough/Queens,Queens,4,"[[[-73.8049919885511, 40.5967523588775], [-73.80487565214918, 40.59674853593164], [-73.80511185479914, 40.597151047998096], [-73.8047111222805, 40.597674663195114], [-73.80393338922406, 40.597943498583916], [-73.80297943325749, 40.59884430455297], [-73.80167092680179, 40.59889630926943], [-73.80094867862822, 40.59915166461987], [-73.80091500623968, 40.59939076113576], [-73.80077654707625, 40.599162360478296], [-73.8004952637198, 40.599204368068555], [-73.80047379227797, 40.59933155912372], [-...",Polygon,0.0022440271285791,5
http://nyc.pediacities.com/Resource/Borough/Queens,Queens,4,"[[[-73.82337592129355, 40.638987655897566], [-73.82277105438688, 40.635576914085036], [-73.82209868083655, 40.6340723011468], [-73.82184604252807, 40.63204018843733], [-73.82107505533322, 40.6297337745662], [-73.81991014068154, 40.62784065311699], [-73.8179641483957, 40.62710556569674], [-73.81800752228979, 40.62647634385673], [-73.81769796593271, 40.62596162249622], [-73.81611483429552, 40.62460465517299], [-73.81608643470798, 40.62431541030932], [-73.81672998058578, 40.62387151407256], [-73...",Polygon,0.0004664525482037259,6
http://nyc.pediacities.com/Resource/Borough/Manhattan,Manhattan,1,"[[[-73.92133752419281, 40.800852107502166], [-73.92031465521232, 40.79937545928999], [-73.91662985247804, 40.79785813943708], [-73.9154547096642, 40.79706346603097], [-73.91443827233314, 40.79566551786385], [-73.91380428276815, 40.794511846455876], [-73.91378385370597, 40.793836285482065], [-73.91514134426112, 40.792012482994856], [-73.91630577076108, 40.791117369952566], [-73.91617092499695, 40.79101419021154], [-73.91686540584891, 40.790362063464165], [-73.9171062231295, 40.79046861673676],...",Polygon,0.0002327165585676201,7
http://nyc.pediacities.com/Resource/Borough/Bronx,Bronx,2,"[[[-73.88885148496335, 40.79870632895875], [-73.8882134885128, 40.798665304638554], [-73.88839250519695, 40.798566297269936], [-73.8882335331365, 40.79824288428591], [-73.88755967580654, 40.79836873243706], [-73.88665108524592, 40.798038196699906], [-73.88375741590161, 40.795708565419794], [-73.88362347511054, 40.795672726309924], [-73.88365516756662, 40.795967230065116], [-73.88320394303747, 40.795544997462244], [-73.8820963368747, 40.795159861043004], [-73.87980493377019, 40.794892251170346...",Polygon,0.00017978427201030463,8
http://nyc.pediacities.com/Resource/Borough/Brooklyn,Brooklyn,3,"[[[-73.84734350666992, 40.62909473971626], [-73.84984630167423, 40.62622449074596], [-73.84727837727986, 40.62662166979185], [-73.84504991718364, 40.62541106188365], [-73.84358010755514, 40.62317816418441], [-73.84614754008332, 40.62222360960777], [-73.84847781581281, 40.62241878479054], [-73.85665510191754, 40.61999364918835], [-73.85947697626031, 40.6186652168382], [-73.86107132214275, 40.61771207937058], [-73.86364988794935, 40.61787877364238], [-73.86404106276495, 40.61920759671407], [-73...",Polygon,0.00012096573285815688,9


In [30]:
def findBorough(x,y):
    point = Point(x,y)
    nearestLocIndex = geomtree.nearest(point)
    borough = borough_df_sorted.collect()[nearestLocIndex]["borough"]
    return borough

### Constructing the ride

In [32]:
from datetime import datetime  
  #2023-10-13T08:16:13Z
def construct_ride(row):
    time_stamp = time.time()
    date_time = datetime.fromtimestamp(time_stamp)
    str_date_time = date_time.strftime("%Y-%m-%dT%H:%M:%SZ") #"%d-%m-%Y, %H:%M:%S"
    ride = {"vendor_id": row[2],
             "pickup_datetime": row[5],
             "droppff_datetime":row[5],
            "pickup_longitude": float(row[10]),
            "pickup_latitude": float(row[11]),
            "dropoff_longitude": float(row[12]),
            "dropoff_latitude": float(row[13]),
            "start_borough": findBorough(float(row[10]), float(row[11])),
            "end_borough": findBorough(float(row[12]), float(row[13])),
             "timestamp":str_date_time
             }
    return ride

### Starting the stream

In [33]:
import csv, json
import time
n = 0
with open(taxies) as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    next(csv_reader, None) # Skipping the headers
    try:
        for row in csv_reader:
            #print(row)
            if n == 10:
               break
            ride = construct_ride(row)
            print(ride)
            p.produce('ride', value=json.dumps(ride))
            p.poll(0)
            #p.flush()
            time.sleep(0.5)
            n = n + 1
    except BufferError:
        sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len(p))


{'vendor_id': 'CMT', 'pickup_datetime': '2013-01-01 15:11:48', 'droppff_datetime': '2013-01-01 15:11:48', 'pickup_longitude': -73.978165, 'pickup_latitude': 40.757977, 'dropoff_longitude': -73.989838, 'dropoff_latitude': 40.751171, 'start_borough': 'Manhattan', 'end_borough': 'Manhattan', 'timestamp': '2024-05-07T07:49:17Z'}
{'vendor_id': 'CMT', 'pickup_datetime': '2013-01-06 00:18:35', 'droppff_datetime': '2013-01-06 00:18:35', 'pickup_longitude': -74.006683, 'pickup_latitude': 40.731781, 'dropoff_longitude': -73.994499, 'dropoff_latitude': 40.75066, 'start_borough': 'Manhattan', 'end_borough': 'Manhattan', 'timestamp': '2024-05-07T07:49:18Z'}
{'vendor_id': 'CMT', 'pickup_datetime': '2013-01-05 18:49:41', 'droppff_datetime': '2013-01-05 18:49:41', 'pickup_longitude': -74.004707, 'pickup_latitude': 40.73777, 'dropoff_longitude': -74.009834, 'dropoff_latitude': 40.726002, 'start_borough': 'Manhattan', 'end_borough': 'Manhattan', 'timestamp': '2024-05-07T07:49:19Z'}
{'vendor_id': 'CMT', 

### Use only if you need to purge all the messages in the queue

In [None]:
brokers = "kafka1:9092,kafka2:9093"

admin_client = AdminClient({"bootstrap.servers":brokers})
admin_client.delete_topics(topics=['stock'])