In [25]:
from kafka import KafkaConsumer
from time import sleep
from json import dumps,loads
from confluent_kafka import Consumer, KafkaException
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry.json_schema import JSONDeserializer
from confluent_kafka.schema_registry import SchemaRegistryClient
import json

In [9]:
class Taxi_data(object):
    def __init__(self, VendorID, tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, 
                 trip_distance, RatecodeID, store_and_fwd_flag, PULocationID, DOLocationID, 
                 payment_type, fare_amount, extra, mta_tax, tip_amount,
                 tolls_amount, improvement_surcharge, total_amount, congestion_surcharge, airport_fee):
        self.VendorID = VendorID
        self.tpep_pickup_datetime = tpep_pickup_datetime
        self.tpep_dropoff_datetime = tpep_dropoff_datetime
        self.passenger_count = passenger_count
        self.trip_distance = trip_distance
        self.RatecodeID = RatecodeID
        self.store_and_fwd_flag = store_and_fwd_flag
        self.PULocationID = PULocationID
        self.DOLocationID = DOLocationID
        self.payment_type = payment_type
        self.fare_amount = fare_amount
        self.extra = extra
        self.mta_tax = mta_tax
        self.tip_amount = tip_amount
        self.tolls_amount = tolls_amount
        self.improvement_surcharge = improvement_surcharge
        self.total_amount = total_amount
        self.congestion_surcharge = congestion_surcharge
        self.airport_fee = airport_fee
        

In [39]:
def dict_to_data(dict, ctx):
    return Taxi_data(dict['VendorID'],
                     dict['tpep_pickup_datetime'],
                     dict['tpep_dropoff_datetime'],
                     dict['passenger_count'],
                     dict['trip_distance'],
                     dict['RatecodeID'],
                     dict['store_and_fwd_flag'],
                     dict['PULocationID'],
                     dict['DOLocationID'],
                     dict['payment_type'],
                     dict['fare_amount'],
                     dict['extra'],
                     dict['mta_tax'],
                     dict['tip_amount'],
                     dict['tolls_amount'],
                     dict['improvement_surcharge'],
                     dict['total_amount'],
                     dict['congestion_surcharge'],
                     dict['airport_fee'])

In [40]:
schema_str = """{
    "$schema": "https://json-schema.org/draft/2020-12/schema",
    "title": "nyc_taxi",
    "description": "NYC Taxi Data",
    "type": "object",
    "properties": {
      "VendorID": {
        "description": "A code indicating the TPEP provider that provided the record.",
        "type": "number"
      },
      "tpep_pickup_datetime": {
        "description": "The date and time when the meter was engaged. ",
        "type": "string"
      },
      "tpep_dropoff_datetime": {
        "description": "The date and time when the meter was disengaged.",
        "type": "string"
      },
      "passenger_count": {
        "description": "The number of passengers in the vehicle. ",
        "type": "number"
      },
      "trip_distance": {
        "description": "The elapsed trip distance in miles reported by the taximeter.",
        "type": "number"
      },
      "RatecodeID": {
        "description": "The final rate code in effect at the end of the trip.",
        "type": "number"
      },
      "store_and_fwd_flag": {
        "description": "This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka store and forward, because the vehicle did not have a connection to the server.",
        "type": "string"
      },
      "PULocationID": {
        "description": "TLC Taxi Zone in which the taximeter was engaged",
        "type": "number"
      },
      "DOLocationID": {
        "description": "TLC Taxi Zone in which the taximeter was disengaged",
        "type": "number"
      },
      "payment_type": {
        "description": "A numeric code signifying how the passenger paid for the trip.",
        "type": "number"
      },
      "fare_amount": {
        "description": "The time-and-distance fare calculated by the meter.",
        "type": "number"
      },
      "extra": {
        "description": "Miscellaneous extras and surcharges. Currently, this only includes the $0.50 and $1 rush hour and overnight charges.",
        "type": "number"
      },
      "mta_tax": {
        "description": "$0.50 MTA tax that is automatically triggered based on the metered rate in use.",
        "type": "number"
      },
      "tip_amount": {
        "description": "This field is automatically populated for credit card tips. Cash tips are not included.",
        "type": "number"
      },
      "tolls_amount": {
        "description": "Total amount of all tolls paid in trip.",
        "type": "number"
      },
      "improvement_surcharge": {
        "description": "$0.30 improvement surcharge assessed trips at the flag drop. The improvement surcharge began being levied in 2015.",
        "type": "number"
      },
      "total_amount": {
        "description": "The total amount charged to passengers. Does not include cash tips.",
        "type": "number"
      },
      "congestion_surcharge": {
        "description": "Total amount collected in trip for NYS congestion surcharge.",
        "type": "number"
      },
      "airport_fee": {
        "description": "$1.25 for pick up only at LaGuardia and John F. Kennedy Airports",
        "type": "number"
      }
    },
    "required": ["VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"]
  }"""

In [41]:
def assignment_callback(consumer, partitions):
    for p in partitions:
        print(f'Assigned to {p.topic}, partition {p.partition}')

In [43]:

config = {'bootstrap.servers': 'localhost:9092',
        'group.id': 'test',
        'auto.offset.reset': 'smallest'}
sr_config = {
    'url': 'http://localhost:8081'
}

schema_registry_client = SchemaRegistryClient(sr_config)

json_deserializer = JSONDeserializer(schema_str,
                                     schema_registry_client=schema_registry_client,
                                     from_dict=dict_to_data)

In [52]:
topic = 'hello_topic'
consumer = Consumer(config)
consumer.subscribe([topic])

while True:
    try:
        # SIGINT can't be handled when polling, limit timeout to 1 second.
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        
        data = json_deserializer(msg.value(), SerializationContext(topic, MessageField.VALUE))
        

        dict_data = {
        "VendorID" : data.VendorID,
        "tpep_pickup_datetime" : data.tpep_pickup_datetime,
        "tpep_dropoff_datetime" : data.tpep_dropoff_datetime,
        "passenger_count" : data.passenger_count,
        "trip_distance" : data.trip_distance,
        "RatecodeID" : data.RatecodeID,
        "store_and_fwd_flag" : data.store_and_fwd_flag,
        "PULocationID" : data.PULocationID,
        "DOLocationID" : data.DOLocationID,
        "payment_type" : data.payment_type,
        "fare_amount" : data.fare_amount,
        "extra" : data.extra,
        "mta_tax" : data.mta_tax,
        "tip_amount" : data.tip_amount,
        "tolls_amount" : data.tolls_amount,
        "improvement_surcharge" : data.improvement_surcharge,
        "total_amount" : data.total_amount,
        "congestion_surcharge" : data.congestion_surcharge,
        "airport_fee" : data.airport_fee
        }


        if dict_data is not None:
            # Writing to json
            with open("/home/dana123/python_code/nyc_taxi_project/kafka_nyc_taxi_data/nyc_taxi_{}.json".format(msg.offset()), "w") as outfile:
                json.dump(dict_data, outfile)
            print("Done writing kafka_nyc_taxi_data/nyc_taxi_{}.json".format(msg.offset()))
            
    except KeyboardInterrupt:
        break

consumer.close()

Done writing kafka_nyc_taxi_data/nyc_taxi_11.json
Done writing kafka_nyc_taxi_data/nyc_taxi_12.json
Done writing kafka_nyc_taxi_data/nyc_taxi_13.json
Done writing kafka_nyc_taxi_data/nyc_taxi_14.json
Done writing kafka_nyc_taxi_data/nyc_taxi_15.json
Done writing kafka_nyc_taxi_data/nyc_taxi_16.json
Done writing kafka_nyc_taxi_data/nyc_taxi_17.json
Done writing kafka_nyc_taxi_data/nyc_taxi_18.json
Done writing kafka_nyc_taxi_data/nyc_taxi_19.json
Done writing kafka_nyc_taxi_data/nyc_taxi_20.json
Done writing kafka_nyc_taxi_data/nyc_taxi_21.json
Done writing kafka_nyc_taxi_data/nyc_taxi_22.json
Done writing kafka_nyc_taxi_data/nyc_taxi_23.json
Done writing kafka_nyc_taxi_data/nyc_taxi_24.json
Done writing kafka_nyc_taxi_data/nyc_taxi_25.json
Done writing kafka_nyc_taxi_data/nyc_taxi_26.json
Done writing kafka_nyc_taxi_data/nyc_taxi_27.json
Done writing kafka_nyc_taxi_data/nyc_taxi_28.json
Done writing kafka_nyc_taxi_data/nyc_taxi_29.json
Done writing kafka_nyc_taxi_data/nyc_taxi_30.json
