In [36]:
from typing import List, Dict
from decimal import Decimal
from datetime import datetime


class Ride:
    def __init__(self, arr: List[str]):
        self.vendor_id = int(arr[0])
        self.tpep_pickup_datetime = datetime.strptime(arr[1], "%Y-%m-%d %H:%M:%S")
        self.tpep_dropoff_datetime = datetime.strptime(arr[2], "%Y-%m-%d %H:%M:%S")
        self.passenger_count = int(arr[3])
        self.trip_distance = Decimal(arr[4])
        self.rate_code_id = int(arr[5])
        self.store_and_fwd_flag = arr[6]
        self.pu_location_id = int(arr[7])
        self.do_location_id = int(arr[8])
        self.payment_type = arr[9]
        self.fare_amount = Decimal(arr[10])
        self.extra = Decimal(arr[11])
        self.mta_tax = Decimal(arr[12])
        self.tip_amount = Decimal(arr[13])
        self.tolls_amount = Decimal(arr[14])
        self.improvement_surcharge = Decimal(arr[15])
        self.total_amount = Decimal(arr[16])
        self.congestion_surcharge = Decimal(arr[17])

    @classmethod
    def from_dict(cls, d: Dict):
        return cls(arr=[
            d['vendor_id'],
            d['tpep_pickup_datetime'],
            d['tpep_dropoff_datetime'],
            d['passenger_count'],
            d['trip_distance'],
            d['rate_code_id'],
            d['store_and_fwd_flag'],
            d['pu_location_id'],
            d['do_location_id'],
            d['payment_type'],
            d['fare_amount'],
            d['extra'],
            d['mta_tax'],
            d['tip_amount'],
            d['tolls_amount'],
            d['improvement_surcharge'],
            d['total_amount'],
            d['congestion_surcharge'],
        ]
        )

    def __repr__(self):
        return f'{self.__class__.__name__}: {self.__dict__}'


def ride_to_dict(ride: Ride, ctx):
    return ride.__dict__

In [37]:
data = {
    'vendor_id': '123',
    'tpep_pickup_datetime': '2020-07-01 00:25:32',
    'tpep_dropoff_datetime': '2020-07-01 00:33:39',
    'passenger_count': '2',
    'trip_distance': '5.0',
    'rate_code_id': '1',
    'store_and_fwd_flag': 'N',
    'pu_location_id': '101',
    'do_location_id': '102',
    'payment_type': 'Credit Card',
    'fare_amount': '15.00',
    'extra': '1.50',
    'mta_tax': '0.50',
    'tip_amount': '2.00',
    'tolls_amount': '0.00',
    'improvement_surcharge': '0.30',
    'total_amount': '19.30',
    'congestion_surcharge': '2.50',
}

# Sử dụng phương thức from_dict để tạo một đối tượng Ride từ từ điển data
ride = Ride.from_dict(data)

# Đối tượng ride bây giờ chứa thông tin từ từ điển data
print(ride.vendor_id)  # Kết quả: 123
print(ride.tpep_pickup_datetime)  # Kết quả: 2023-09-25 12:00:00
# ...và cùng các thuộc tính khác

123
2020-07-01 00:25:32


In [42]:
ride

Ride: {'vendor_id': 1, 'tpep_pickup_datetime': datetime.datetime(2020, 7, 1, 0, 25, 32), 'tpep_dropoff_datetime': datetime.datetime(2020, 7, 1, 0, 33, 39), 'passenger_count': 1, 'trip_distance': Decimal('1.50'), 'rate_code_id': 1, 'store_and_fwd_flag': 'N', 'pu_location_id': 238, 'do_location_id': 75, 'payment_type': '2', 'fare_amount': Decimal('8'), 'extra': Decimal('0.5'), 'mta_tax': Decimal('0.5'), 'tip_amount': Decimal('0'), 'tolls_amount': Decimal('0'), 'improvement_surcharge': Decimal('0.3'), 'total_amount': Decimal('9.3'), 'congestion_surcharge': Decimal('0')}

In [38]:
import csv
import os
from typing import List
# from ride import Ride

import os

CURRENT_FILE_PATH = os.getcwd()
INPUT_DATA_PATH = os.path.join(CURRENT_FILE_PATH, 'resources', 'data', 'rides.csv')


def read_rides(resource_path: str = INPUT_DATA_PATH) -> List[Ride]:
    rides = []
    with open(resource_path, 'r') as f:
        reader = csv.reader(f)
        header = next(reader)  # skip the header row
        for row in reader:
            rides.append(Ride(arr=row))
    return rides

In [39]:
rides = []
with open(INPUT_DATA_PATH, 'r') as f:
    reader = csv.reader(f)
    header = next(reader)  # skip the header row
    for row in reader:
        row_0 = row
        break

In [40]:
row_0

['1',
 '2020-07-01 00:25:32',
 '2020-07-01 00:33:39',
 '1',
 '1.50',
 '1',
 'N',
 '238',
 '75',
 '2',
 '8',
 '0.5',
 '0.5',
 '0',
 '0',
 '0.3',
 '9.3',
 '0']

In [41]:
ride = Ride(row_0)
ride

Ride: {'vendor_id': 1, 'tpep_pickup_datetime': datetime.datetime(2020, 7, 1, 0, 25, 32), 'tpep_dropoff_datetime': datetime.datetime(2020, 7, 1, 0, 33, 39), 'passenger_count': 1, 'trip_distance': Decimal('1.50'), 'rate_code_id': 1, 'store_and_fwd_flag': 'N', 'pu_location_id': 238, 'do_location_id': 75, 'payment_type': '2', 'fare_amount': Decimal('8'), 'extra': Decimal('0.5'), 'mta_tax': Decimal('0.5'), 'tip_amount': Decimal('0'), 'tolls_amount': Decimal('0'), 'improvement_surcharge': Decimal('0.3'), 'total_amount': Decimal('9.3'), 'congestion_surcharge': Decimal('0')}

In [43]:
import pyspark.sql.types as T

PRODUCE_TOPIC_RIDES_CSV = CONSUME_TOPIC_RIDES_CSV = 'rides_csv'
BOOTSTRAP_SERVERS = 'localhost:9092'

TOPIC_WINDOWED_VENDOR_ID_COUNT = 'vendor_counts_windowed'

RIDE_SCHEMA1 = T.StructType(
    [T.StructField("vendor_id", T.IntegerType()),
     T.StructField('tpep_pickup_datetime', T.StringType()),
     T.StructField('tpep_dropoff_datetime', T.StringType()),
     T.StructField("passenger_count", T.IntegerType()),
     T.StructField("trip_distance", T.FloatType()),
     T.StructField("rate_code_id", T.IntegerType()),
     T.StructField("store_and_fwd_flag", T.StringType()),
     T.StructField("pu_location_id", T.IntegerType()),
     T.StructField("do_location_id", T.IntegerType()),
     T.StructField("payment_type", T.StringType()),
     T.StructField("fare_amount", T.FloatType()),
     T.StructField("extra", T.FloatType()),
     T.StructField("mta_tax", T.FloatType()),
     T.StructField("tip_amount", T.FloatType()),
     T.StructField("tolls_amount", T.FloatType()),
     T.StructField("improvement_surcharge", T.FloatType()),
     T.StructField("total_amount", T.FloatType()),
     T.StructField("congestion_surcharge", T.FloatType()),
     ])

RIDE_SCHEMA = T.StructType(
 [T.StructField("vendor_id", T.IntegerType()),
  T.StructField('tpep_pickup_datetime', T.TimestampType()),
  T.StructField('tpep_dropoff_datetime', T.TimestampType()),
  T.StructField("passenger_count", T.IntegerType()),
  T.StructField("trip_distance", T.FloatType()),
  T.StructField("payment_type", T.IntegerType()),
  T.StructField("total_amount", T.FloatType()),
  ])

In [44]:
def read_from_kafka(consume_topic: str):
    # Spark Streaming DataFrame, connect to Kafka topic served at host in bootrap.servers option
    df_stream = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
        .option("subscribe", consume_topic) \
        .option("startingOffsets", "earliest") \
        .option("checkpointLocation", "checkpoint") \
        .load()
    return df_stream

In [46]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.apache.spark:spark-avro_2.12:3.3.1 pyspark-shell'

In [47]:
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark = SparkSession \
    .builder \
    .appName("Spark-Notebook") \
    .getOrCreate()

In [53]:
def read_from_kafka(consume_topic: str):
    # Spark Streaming DataFrame, connect to Kafka topic served at host in bootrap.servers option
    df_stream = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
        .option("subscribe", consume_topic) \
        .option("startingOffsets", "earliest") \
        .option("checkpointLocation", "checkpoint") \
        .load()
    return df_stream

In [55]:
df_stream = read_from_kafka(CONSUME_TOPIC_RIDES_CSV)
df_stream.show(5)

AnalysisException: Queries with streaming sources must be executed with writeStream.start();
kafka