In [None]:
# https://ruan.dev/blog/2023/05/17/running-a-multi-broker-kafka-cluster-on-docker
# docker compose -f docker-compose3.yml up -d
# docker exec -it broker-1 kafka-topics --create --topic NYTickets --bootstrap-server broker-1:29091 --partitions 3 --replication-factor 3

# drew99@drew99-HPl:~/School/BigData/project/task4-kafka$ docker exec -it broker-1 kafka-topics --list --bootstrap-server broker-1:29091
# NYTickets
# drew99@drew99-HPl:~/School/BigData/project/task4-kafka$ docker exec -it broker-1 kafka-topics --list --bootstrap-server broker-2:29092
# NYTickets
# drew99@drew99-HPl:~/School/BigData/project/task4-kafka$ docker exec -it broker-1 kafka-topics --list --bootstrap-server broker-3:29093
# NYTickets

# data stored in  var/lib/docker 
# docker container prune
# docker volume prune
# docker network prune

In [15]:
%%writefile producer.py
import confluent_kafka as kafka
import socket
import json, os, time

# producer = kafka.Producer({
#     'bootstrap.servers': "localhost:29092",
#     'client.id': socket.gethostname()
# })
producer = kafka.Producer({
    'bootstrap.servers': "localhost:9091,localhost:9092,localhost:9093",
    'client.id': socket.gethostname()
})

topic = "NYTickets"

def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
    else:
        print("Message produced: %s" % (str(msg)))

def send_tickets(file):
    columns = None
    with open(file) as f:
        for line in f:
            vals = line.strip().split(",")
            vals = [v.strip() for v in vals]
            if columns is None:
                columns = vals
                continue
            key = vals[0]  # Use Summons Number as key
            msg = dict(zip(columns, vals))
            val = json.dumps(msg)
            producer.produce(topic, key=key, value=val, callback=acked)
            producer.poll(0)
            print(f"Sent data {key}")#:{val}")
            time.sleep(1)

data_dir = "../data"
files = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith(".csv")]

for file in files:
    send_tickets(file)
    print(f"Finished sending data for {file}")

producer.flush()
print("Finished sending all data")


Overwriting producer.py


In [16]:
%%writefile consumer_all.py
import confluent_kafka as kafka, socket
import os, socket, json
from datetime import date

consumer = kafka.Consumer({
    'bootstrap.servers': "localhost:9091,localhost:9092,localhost:9093",
    'client.id': socket.gethostname(),
    'group.id': 'test_group',
    'auto.offset.reset': 'earliest'
})

topic = "NYTickets"
consumer.subscribe([topic])

window_size = 12
sum_year, count = 0, 0
min_y, max_y = 3000, 0
try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            raise Exception(msg.error())
        else:
            # Avg Vehicle Year
            key = msg.key().decode('utf-8')
            val = msg.value().decode('utf-8')
            time = msg.timestamp()[1]
            # print(f"Received data {key}:{val} at time {time}")

            ticket = json.loads(val)
            year = int(ticket["Vehicle Year"])

            curr_year = date.today().year
            if year < 1885 or year > curr_year: # First car was made in 1885
                print(f"Found invalid year {year} for ticket {key}")
                continue
            
            sum_year += year
            count += 1

            if year < min_y:
                min_y = year
                print(f"Found new oldest vehicle {ticket['Vehicle Make']} {year}")
            if year > max_y:
                max_y = year
                print(f"Found new newest vehicle {ticket['Vehicle Make']} {year}")
            
            if count % window_size == 0:
                avg_year = sum_year / window_size
                print(f"Received {window_size} tickets. Average Vehicle Year: {avg_year:.2f}")
                sum_year = 0
                count = 0

except KeyboardInterrupt:
    print("Consumer interrupted.")
finally:
    consumer.close()

print("Finished receiving all data")


Overwriting consumer_all.py


In [17]:
%%writefile consumer_borough.py
import confluent_kafka as kafka, socket
import os, socket, json

consumer = kafka.Consumer({
    'bootstrap.servers': "localhost:9091,localhost:9092,localhost:9093",
    'client.id': socket.gethostname(),
    'group.id': 'test_group',
    'auto.offset.reset': 'earliest'
})

topic = "NYTickets"
consumer.subscribe([topic])

window_size = 12
values = {} # key: borough, value: list of vehicle years
try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            raise Exception(msg.error())
        else:
            # key = msg.key().decode('utf-8') # Summons Number
            val = msg.value().decode('utf-8')
            time = msg.timestamp()[1]

            ticket = json.loads(val)
            key = ticket["Violation County"]
            year = int(ticket["Vehicle Year"])

            if key not in values:
                values[key] = []
            values[key].append(year)

            if len(values[key]) > window_size:
                values[key].pop(0)
            
            if len(values[key]) < window_size:
                continue

            avg_year = sum(values[key]) / window_size
            std_dev = (sum([(v - avg_year) ** 2 for v in values[key]]) / window_size) ** 0.5
            min_year = min(values[key])
            max_year = max(values[key])
            
            print(f"Received {window_size} tickets for {key}. Average Vehicle Year: {avg_year:.2f} +/- {std_dev:.2f}. Min: {min_year}, Max: {max_year}")
            
            invalid = [v for v in values[key] if v < 1885 or v > 2024]
            if invalid:
                print(f"Found invalid years {invalid} for borough {key}")


except KeyboardInterrupt:
    print("Consumer interrupted.")
finally:
    consumer.close()

print("Finished receiving all data")


Overwriting consumer_borough.py


In [18]:
%%writefile consumer_street.py
import confluent_kafka as kafka, socket
import os, socket, json

consumer = kafka.Consumer({
    'bootstrap.servers': "localhost:9091,localhost:9092,localhost:9093",
    'client.id': socket.gethostname(),
    'group.id': 'test_group',
    'auto.offset.reset': 'earliest'
})

topic = "NYTickets"
consumer.subscribe([topic])

streets = [street.strip().upper() for street in """
Broadway
3rd Ave
5th Ave
Madison Ave
Lexington Ave
2nd Ave
1st Ave
Queens Blvd
8th Ave
7th Ave
""".split("\n") if street]

window_size = 12
values = {} # key: street, value: list of vehicle years
try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            raise Exception(msg.error())
        else:
            # key = msg.key().decode('utf-8')
            val = msg.value().decode('utf-8')
            time = msg.timestamp()[1]

            ticket = json.loads(val)
            key = ticket["Street"].upper()
            if key not in streets:
                continue
            year = int(ticket["Vehicle Year"])

            if key not in values:
                values[key] = []
            values[key].append(year)

            if len(values[key]) > window_size:
                values[key].pop(0)
            
            if len(values[key]) < window_size:
                continue

            avg_year = sum(values[key]) / window_size
            std_dev = (sum([(v - avg_year) ** 2 for v in values[key]]) / window_size) ** 0.5
            min_year = min(values[key])
            max_year = max(values[key])

            print(f"Received {window_size} tickets for {key}. Average Vehicle Year: {avg_year:.2f} +/- {std_dev:.2f}. Min: {min_year}, Max: {max_year}")
            
            invalid = [v for v in values[key] if v < 1885 or v > 2024]
            if invalid:
                print(f"Found invalid years {invalid} for street {key}")

except KeyboardInterrupt:
    print("Consumer interrupted.")
finally:
    consumer.close()

print("Finished receiving all data")


Overwriting consumer_street.py


In [None]:
# %%writefile consumer_knn.py
# import confluent_kafka as kafka
# import json
# import numpy as np
# from collections import defaultdict
# from sklearn.metrics import mean_squared_error

# class VehicleYearInferenceConsumer:
#     def __init__(self, topic, bootstrap_servers, group_id):
#         self.consumer = kafka.Consumer({
#             'bootstrap.servers': bootstrap_servers,
#             'group.id': group_id,
#             'auto.offset.reset': 'earliest'
#         })
#         # self.producer = kafka.Producer({
#         #     'bootstrap.servers': bootstrap_servers
#         # })
#         self.topic = topic
#         self.consumer.subscribe([topic])
#         self.samples = defaultdict(list)
#         self.centroids = {}
#         self.sample_count = defaultdict(int)
#         self.total_samples = 0
#         self.warmup_complete = False
#         # zero missing values columns
#         self.columns = ['Registration State', 'Plate Type', 'Violation Code',
#                     'Issuing Agency', 'Street Code1', 'Street Code2', 'Street Code3',
#                     'Vehicle Expiration Date', 'Violation Precinct', 'Issuer Precinct',
#                     'Issuer Code', 'Violation Time', 'Date First Observed', 'Law Section',
#                     'Vehicle Year', 'Feet From Curb']
#         # numeric columns
#         self.columns = ['Violation Code', 'Street Code1', 'Street Code2',
#                         'Street Code3', 'Vehicle Expiration Date', 'Violation Location',
#                         'Violation Precinct', 'Issuer Precinct', 'Issuer Code',
#                         'Date First Observed', 'Law Section', 'Unregistered Vehicle?',
#                         'Vehicle Year', 'Feet From Curb']
#         # usefull columns
#         # ...

#     def collect_samples(self, record):
#         vehicle_year = int(record['Vehicle Year'])
#         if vehicle_year != 0:
#             features = self.extract_features(record)
#             self.samples[vehicle_year].append(features)
#             self.sample_count[vehicle_year] += 1
#             if self.sample_count[vehicle_year] == 1000:
#                 self.calculate_centroid(vehicle_year)

#     def extract_features(self, record):
#         features = []
#         for column in self.columns:
#             value = record.get(column, 0)
#             features.append(float(value) if value else 0.0)
#         return np.array(features)

#     def calculate_centroid(self, vehicle_year):
#         self.centroids[vehicle_year] = np.mean(self.samples[vehicle_year], axis=0)
#         self.samples[vehicle_year] = []
#         self.warmup_complete = True

#     def update_centroid(self, vehicle_year, new_sample):
#         if vehicle_year in self.centroids:
#             current_centroid = self.centroids[vehicle_year]
#             new_centroid = (0.99 * current_centroid + 0.01 * new_sample) / 1.0
#             self.centroids[vehicle_year] = new_centroid

#     def infer_vehicle_year(self, features):
#         distances = {year: np.linalg.norm(features - centroid) for year, centroid in self.centroids.items()}
#         return min(distances, key=distances.get)

#     def evaluate_model(self):
#         known_years = []
#         inferred_years = []
#         for year, samples in self.samples.items():
#             for sample in samples[:100]:
#                 inferred_year = self.infer_vehicle_year(sample)
#                 known_years.append(year)
#                 inferred_years.append(inferred_year)
#         rmse = mean_squared_error(known_years, inferred_years, squared=False)
#         print(f"RMSE: {rmse}")

#     def run(self):
#         try:
#             while True:
#                 msg = self.consumer.poll(timeout=1.0)
#                 if msg is None:
#                     continue
#                 if msg.error():
#                     raise Exception(msg.error())
#                 else:
#                     record = json.loads(msg.value().decode('utf-8'))
#                     vehicle_year = int(record['Vehicle Year'])
#                     self.total_samples += 1

#                     if vehicle_year == 0:
#                         if self.warmup_complete:
#                             features = self.extract_features(record)
#                             inferred_year = self.infer_vehicle_year(features)
#                             record['Vehicle Year'] = inferred_year
#                             # self.producer.produce('inferred_vehicle_years', key=str(record['Summons Number']), value=json.dumps(record))
#                             print(f"Inferred vehicle year {inferred_year} for ticket {record['Summons Number']}")
#                     else:
#                         self.collect_samples(record)
#                         if self.sample_count[vehicle_year] > 1000:
#                             self.update_centroid(vehicle_year, self.extract_features(record))

#                     if self.total_samples % 3000 == 0:
#                         self.evaluate_model()
#         except KeyboardInterrupt:
#             print("Consumer interrupted.")
#         finally:
#             self.consumer.close()
#             print("Finished receiving all data")

# if __name__ == "__main__":
#     consumer = VehicleYearInferenceConsumer(
#         topic='NYTickets',
#         bootstrap_servers='localhost:9091,localhost:9092,localhost:9093',
#         group_id='vehicle_year_inference'
#     )
#     consumer.run()


In [12]:
%%writefile consumer_knn.py
import confluent_kafka as kafka
import json
import numpy as np
from collections import defaultdict
from sklearn.metrics import mean_squared_error
from datetime import datetime
from sklearn.preprocessing import OneHotEncoder

class VehicleYearInferenceConsumer:
    def __init__(self, topic, bootstrap_servers, group_id):
        self.consumer = kafka.Consumer({
            'bootstrap.servers': bootstrap_servers,
            'group.id': group_id,
            'auto.offset.reset': 'earliest'
        })
        # self.producer = kafka.Producer({
        #     'bootstrap.servers': bootstrap_servers
        # })
        self.topic = topic
        self.consumer.subscribe([topic])
        self.samples = defaultdict(list)
        self.centroids = {}
        self.sample_count = defaultdict(int)
        self.total_samples = 0
        self.warmup_complete = False

        # # zero missing values columns
        # self.columns = ['Registration State', 'Plate Type', 'Violation Code',
        #             'Issuing Agency', 'Street Code1', 'Street Code2', 'Street Code3',
        #             'Vehicle Expiration Date', 'Violation Precinct', 'Issuer Precinct',
        #             'Issuer Code', 'Violation Time', 'Date First Observed', 'Law Section',
        #             'Vehicle Year', 'Feet From Curb']
        # # numeric columns
        # self.columns = ['Violation Code', 'Street Code1', 'Street Code2',
        #                 'Street Code3', 'Vehicle Expiration Date', 'Violation Location',
        #                 'Violation Precinct', 'Issuer Precinct', 'Issuer Code',
        #                 'Date First Observed', 'Law Section', 'Unregistered Vehicle?',
        #                 'Vehicle Year', 'Feet From Curb']

        self.categorical_columns = ['Registration State', 'Plate Type', 'Vehicle Body Type', 'Vehicle Make', 
                                    'Issuing Agency', 'Vehicle Color']
        self.numeric_columns = ['Violation Code', 'Street Code1', 'Street Code2', 'Street Code3', 
                                'Violation Precinct', 'Issuer Precinct', 'Issuer Code', 
                                'Feet From Curb', 'Violation Location', 'Unregistered Vehicle?']
        self.date_columns = ['Issue Date', 'Vehicle Expiration Date', 'Date First Observed']
        self.time_columns = ['Violation Time']

        self.onehot_encoders = {col: OneHotEncoder(handle_unknown='ignore') for col in self.categorical_columns}
        self.prepare_encoders()

    def prepare_encoders(self):
        # Load unique values from JSON file
        with open('unique_values.json', 'r') as f:
            unique_values = json.load(f)
        
        # Replace NaN (which is a float) with "Unknown"
        for key, values in unique_values.items():
            unique_values[key] = ["Unknown" if isinstance(v, float) and np.isnan(v) else v for v in values]
        
        # Fit one-hot encoders with the unique values
        for column, encoder in self.onehot_encoders.items():
            unique_value_array = np.array(unique_values[column]).reshape(-1, 1)
            encoder.fit(unique_value_array)

    def extract_features(self, record):
        features = []
        
        # Handle numeric columns
        for column in self.numeric_columns:
            value = record.get(column, 0)
            try:
                features.append(float(value) if value else 0.0)
            except ValueError:
                features.append(0.0)
        
        # Handle categorical columns with one-hot encoding
        for column in self.categorical_columns:
            value = record.get(column, 'Unknown')
            encoded = self.onehot_encoders[column].transform([[value]]).toarray().flatten()
            features.extend(encoded.tolist())
        
        # Handle date columns
        for column in self.date_columns:
            date_str = record.get(column, '01/01/1970')
            try:
                if '/' not in date_str:
                    date_obj = datetime.strptime(date_str, '%Y%m%d')
                else:
                    date_obj = datetime.strptime(date_str, '%m/%d/%Y')
            except ValueError:
                # print(f"Invalid date format {date_str}")
                date_obj = datetime.strptime('01/01/1970', '%m/%d/%Y')
            features.extend([date_obj.year, date_obj.month, date_obj.day])
        
        # Handle time columns
        for column in self.time_columns:
            time_str = record.get(column, '0000A')
            minutes = self.convert_to_minutes(time_str)
            features.append(minutes)
        
        return np.array(features)

    def convert_to_minutes(self, time_str):
        if time_str[-1] == 'P' and int(time_str[:2]) != 12:
            minutes = (int(time_str[:2]) + 12) * 60 + int(time_str[2:4])
        elif time_str[-1] == 'A' and int(time_str[:2]) == 12:
            minutes = int(time_str[2:4])
        else:
            minutes = int(time_str[:2]) * 60 + int(time_str[2:4])
        return minutes
    
    def collect_samples(self, record):
        vehicle_year = int(record['Vehicle Year'])
        if vehicle_year != 0:
            features = self.extract_features(record)
            self.samples[vehicle_year].append(features)
            self.sample_count[vehicle_year] += 1
            if self.sample_count[vehicle_year] == 1000 and vehicle_year not in self.centroids:
                self.calculate_centroid(vehicle_year)

    def calculate_centroid(self, vehicle_year):
        self.centroids[vehicle_year] = np.mean(self.samples[vehicle_year], axis=0)
        self.samples[vehicle_year] = []
        if len(self.centroids) >= 10:
            self.warmup_complete = True
        print(f"Calculated centroid for vehicle year {vehicle_year}")

    def update_centroid(self, vehicle_year):
        if vehicle_year in self.centroids:
            current_centroid = self.centroids[vehicle_year]
            total_count = self.sample_count[vehicle_year]
            new_samples = np.array(self.samples[vehicle_year])
            new_count = len(new_samples)
            current_count = total_count - new_count
            
            updated_centroid = (current_centroid * current_count + new_samples.mean(axis=0) * new_count) / (current_count + new_count)
            
            self.centroids[vehicle_year] = updated_centroid
            # self.sample_count[vehicle_year] += new_count
            self.samples[vehicle_year] = []
            print(f"Updated centroid for vehicle year {vehicle_year}")

    def infer_vehicle_year(self, features):
        distances = {year: np.linalg.norm(features - centroid) for year, centroid in self.centroids.items()}
        return min(distances, key=distances.get)

    def evaluate_model(self):
        known_years = []
        inferred_years = []
        for year, samples in self.samples.items():
            for sample in samples[:100]:
                inferred_year = self.infer_vehicle_year(sample)
                known_years.append(year)
                inferred_years.append(inferred_year)
        rmse = mean_squared_error(known_years, inferred_years, squared=False)
        print(f"RMSE: {rmse}")
        abs_avg_error = np.mean(np.abs(np.array(known_years) - np.array(inferred_years)))
        print(f"Mean Absolute Error: {abs_avg_error}")

    def run(self):
        try:
            while True:
                msg = self.consumer.poll(timeout=1.0)
                if msg is None:
                    continue
                if msg.error():
                    raise Exception(msg.error())
                else:
                    record = json.loads(msg.value().decode('utf-8'))
                    try:
                        vehicle_year = int(record['Vehicle Year'])
                    except ValueError:
                        vehicle_year = 0
                        print(f"Invalid vehicle year {record['Vehicle Year']} for ticket {record['Summons Number']}")
                    self.total_samples += 1

                    if vehicle_year == 0:
                        if self.warmup_complete:
                            features = self.extract_features(record)
                            inferred_year = self.infer_vehicle_year(features)
                            record['Vehicle Year'] = inferred_year
                            # self.producer.produce('inferred_vehicle_years', key=str(record['Summons Number']), value=json.dumps(record))
                            print(f"Inferred vehicle year {inferred_year} for ticket {record['Summons Number']}")
                    else:
                        self.collect_samples(record)
                        # print(self.sample_count)
                        if (self.sample_count[vehicle_year]+1) % 1000 == 0 and vehicle_year in self.centroids:
                            self.update_centroid(vehicle_year)

                    if self.total_samples % 3000 == 0 and self.warmup_complete:
                        self.evaluate_model()
        except KeyboardInterrupt:
            print("Consumer interrupted.")
        finally:
            self.consumer.close()
            print("Finished receiving all data")

if __name__ == "__main__":
    consumer = VehicleYearInferenceConsumer(
        topic='NYTickets',
        bootstrap_servers='localhost:9091,localhost:9092,localhost:9093',
        group_id='vehicle_year_inference'
    )
    consumer.run()


Overwriting consumer_knn.py


In [5]:
# %%writefile consumer_sgd.py
# import confluent_kafka as kafka
# import json
# import numpy as np
# from sklearn.preprocessing import OneHotEncoder, StandardScaler
# from sklearn.linear_model import SGDRegressor
# from sklearn.metrics import mean_squared_error
# from datetime import datetime

# class VehicleYearInferenceWithSGD:
#     def __init__(self, topic, bootstrap_servers, group_id):
#         self.consumer = kafka.Consumer({
#             'bootstrap.servers': bootstrap_servers,
#             'group.id': group_id,
#             'auto.offset.reset': 'earliest'
#         })
#         self.topic = topic
#         self.consumer.subscribe([topic])
        
#         self.categorical_columns = ['Registration State', 'Plate Type', 'Vehicle Body Type', 'Vehicle Make', 
#                                     'Issuing Agency', 'Vehicle Color']
#         self.numeric_columns = ['Violation Code', 'Street Code1', 'Street Code2', 'Street Code3', 
#                                 'Violation Precinct', 'Issuer Precinct', 'Issuer Code', 
#                                 'Feet From Curb', 'Violation Location', 'Unregistered Vehicle?']
#         self.date_columns = ['Issue Date', 'Vehicle Expiration Date', 'Date First Observed']
#         self.time_columns = ['Violation Time']

#         self.onehot_encoders = {col: OneHotEncoder(handle_unknown='ignore') for col in self.categorical_columns}
#         self.scaler = StandardScaler()
#         self.model = SGDRegressor()
#         self.total_samples = 0
#         self.eval_samples_collected = 0
#         self.eval_features = []
#         self.eval_labels = []
#         self.mean_shift = 2000
#         self.prepare_encoders()

#     def prepare_encoders(self):
#         with open('unique_values.json', 'r') as f:
#             unique_values = json.load(f)
        
#         for key, values in unique_values.items():
#             unique_values[key] = ["Unknown" if isinstance(v, float) and np.isnan(v) else v for v in values]
        
#         for column, encoder in self.onehot_encoders.items():
#             unique_value_array = np.array(unique_values[column]).reshape(-1, 1)
#             encoder.fit(unique_value_array)

#     def extract_features(self, record):
#         features = []

#         for column in self.numeric_columns:
#             value = record.get(column, 0)
#             try:
#                 features.append(float(value) if value else 0.0)
#             except ValueError:
#                 features.append(0.0)
        
#         for column in self.categorical_columns:
#             value = record.get(column, 'Unknown')
#             encoded = self.onehot_encoders[column].transform([[value]]).toarray().flatten()
#             features.extend(encoded.tolist())
        
#         for column in self.date_columns:
#             date_str = record.get(column, '01/01/1970')
#             try:
#                 if '/' not in date_str:
#                     date_obj = datetime.strptime(date_str, '%Y%m%d')
#                 else:
#                     date_obj = datetime.strptime(date_str, '%m/%d/%Y')
#             except ValueError:
#                 date_obj = datetime.strptime('01/01/1970', '%m/%d/%Y')
#             features.extend([date_obj.year, date_obj.month, date_obj.day])
        
#         for column in self.time_columns:
#             time_str = record.get(column, '0000A')
#             minutes = self.convert_to_minutes(time_str)
#             features.append(minutes)
        
#         return np.array(features)

#     def convert_to_minutes(self, time_str):
#         if time_str[-1] == 'P' and int(time_str[:2]) != 12:
#             minutes = (int(time_str[:2]) + 12) * 60 + int(time_str[2:4])
#         elif time_str[-1] == 'A' and int(time_str[:2]) == 12:
#             minutes = int(time_str[2:4])
#         else:
#             minutes = int(time_str[:2]) * 60 + int(time_str[2:4])
#         return minutes
    
#     def collect_samples(self, record):
#         vehicle_year = int(record['Vehicle Year'])
#         features = self.extract_features(record)
#         if vehicle_year != 0:
#             self.model.partial_fit([features], [vehicle_year-self.mean_shift])

#     def infer_vehicle_year(self, features):
#         return self.model.predict([features])[0] + self.mean_shift

#     def evaluate_model(self):
#         known_years = []
#         inferred_years = []
#         for features, actual_year in zip(self.eval_features, self.eval_labels):
#             inferred_year = self.infer_vehicle_year(features)
#             known_years.append(actual_year)
#             inferred_years.append(inferred_year)
#         rmse = mean_squared_error(known_years, inferred_years, squared=False)
#         print(f"RMSE: {rmse}")
#         abs_avg_error = np.mean(np.abs(np.array(known_years) - np.array(inferred_years)))
#         print(f"Mean Absolute Error: {abs_avg_error}")
#         self.eval_features = []
#         self.eval_labels = []

#     def run(self):
#         try:
#             while True:
#                 msg = self.consumer.poll(timeout=1.0)
#                 if msg is None:
#                     continue
#                 if msg.error():
#                     raise Exception(msg.error())
#                 else:
#                     record = json.loads(msg.value().decode('utf-8'))
#                     try:
#                         vehicle_year = int(record['Vehicle Year'])
#                     except ValueError:
#                         vehicle_year = 0
#                         print(f"Invalid vehicle year {record['Vehicle Year']} for ticket {record['Summons Number']}")
#                     self.total_samples += 1

#                     if self.total_samples % 3000 < 100:
#                         if vehicle_year != 0:
#                             features = self.extract_features(record)
#                             self.eval_features.append(features)
#                             self.eval_labels.append(vehicle_year)
#                             self.eval_samples_collected += 1
#                             if self.eval_samples_collected == 100:
#                                 self.evaluate_model()
#                                 self.eval_samples_collected = 0
#                     else:
#                         if vehicle_year == 0 and self.total_samples > 1000:
#                             features = self.extract_features(record)
#                             inferred_year = self.infer_vehicle_year(features)
#                             record['Vehicle Year'] = inferred_year
#                             print(f"Inferred vehicle year {inferred_year} for ticket {record['Summons Number']}")
#                         else:
#                             self.collect_samples(record)

#         except KeyboardInterrupt:
#             print("Consumer interrupted.")
#         finally:
#             self.consumer.close()
#             print("Finished receiving all data")

# if __name__ == "__main__":
#     consumer = VehicleYearInferenceWithSGD(
#         topic='NYTickets',
#         bootstrap_servers='localhost:9091,localhost:9092,localhost:9093',
#         group_id='vehicle_year_inference'
#     )
#     consumer.run()


Overwriting consumer_sgd.py


In [2]:
%%writefile consumer_sgd.py
import confluent_kafka as kafka
import json
import numpy as np
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.linear_model import SGDRegressor
from sklearn.metrics import mean_squared_error
from datetime import datetime

class VehicleYearInferenceWithSGD:
    def __init__(self, topic, bootstrap_servers, group_id):
        self.consumer = kafka.Consumer({
            'bootstrap.servers': bootstrap_servers,
            'group.id': group_id,
            'auto.offset.reset': 'earliest'
        })
        self.topic = topic
        self.consumer.subscribe([topic])
        
        self.categorical_columns = ['Registration State', 'Plate Type', 'Vehicle Body Type', 'Vehicle Make', 
                                    'Issuing Agency', 'Vehicle Color']
        self.numeric_columns = ['Violation Code', 'Street Code1', 'Street Code2', 'Street Code3', 
                                'Violation Precinct', 'Issuer Precinct', 'Issuer Code', 
                                'Feet From Curb', 'Violation Location', 'Unregistered Vehicle?']
        self.date_columns = ['Issue Date', 'Vehicle Expiration Date', 'Date First Observed']
        self.time_columns = ['Violation Time']

        self.onehot_encoders = {col: OneHotEncoder(handle_unknown='ignore') for col in self.categorical_columns}
        self.scaler = StandardScaler()
        self.model = SGDRegressor()
        self.total_samples = 0
        self.eval_samples_collected = 0
        self.eval_features = []
        self.eval_labels = []
        self.warmup_samples = []
        self.warmup_labels = []
        self.mean_shift = 2000
        self.prepare_encoders()
        self.warmup_complete = False

    def prepare_encoders(self):
        with open('unique_values.json', 'r') as f:
            unique_values = json.load(f)
        
        for key, values in unique_values.items():
            unique_values[key] = ["Unknown" if isinstance(v, float) and np.isnan(v) else v for v in values]
        
        for column, encoder in self.onehot_encoders.items():
            unique_value_array = np.array(unique_values[column]).reshape(-1, 1)
            encoder.fit(unique_value_array)

    def extract_features(self, record):
        features = []

        for column in self.numeric_columns:
            value = record.get(column, 0)
            try:
                features.append(float(value) if value else 0.0)
            except ValueError:
                features.append(0.0)
        
        for column in self.categorical_columns:
            value = record.get(column, 'Unknown')
            encoded = self.onehot_encoders[column].transform([[value]]).toarray().flatten()
            features.extend(encoded.tolist())
        
        for column in self.date_columns:
            date_str = record.get(column, '01/01/1970')
            try:
                if '/' not in date_str:
                    date_obj = datetime.strptime(date_str, '%Y%m%d')
                else:
                    date_obj = datetime.strptime(date_str, '%m/%d/%Y')
            except ValueError:
                date_obj = datetime.strptime('01/01/1970', '%m/%d/%Y')
            features.extend([date_obj.year, date_obj.month, date_obj.day])
        
        for column in self.time_columns:
            time_str = record.get(column, '0000A')
            minutes = self.convert_to_minutes(time_str)
            features.append(minutes)
        
        return np.array(features)

    def convert_to_minutes(self, time_str):
        if time_str[-1] == 'P' and int(time_str[:2]) != 12:
            minutes = (int(time_str[:2]) + 12) * 60 + int(time_str[2:4])
        elif time_str[-1] == 'A' and int(time_str[:2]) == 12:
            minutes = int(time_str[2:4])
        else:
            minutes = int(time_str[:2]) * 60 + int(time_str[2:4])
        return minutes

    def warmup_phase(self, record):
        vehicle_year = int(record['Vehicle Year'])
        features = self.extract_features(record)
        if vehicle_year != 0:
            self.warmup_samples.append(features)
            self.warmup_labels.append(vehicle_year - self.mean_shift)
            if len(self.warmup_samples) >= 1000:
                self.scaler.fit(self.warmup_samples)
                scaled_samples = self.scaler.transform(self.warmup_samples)
                self.model.partial_fit(scaled_samples, self.warmup_labels)
                self.warmup_complete = True
                self.warmup_samples = []
                self.warmup_labels = []

    def collect_samples(self, record):
        vehicle_year = int(record['Vehicle Year'])
        features = self.extract_features(record)
        if vehicle_year != 0:
            scaled_features = self.scaler.transform([features])
            self.model.partial_fit(scaled_features, [vehicle_year - self.mean_shift])

    def infer_vehicle_year(self, features):
        scaled_features = self.scaler.transform([features])
        return self.model.predict(scaled_features)[0] + self.mean_shift
        # return 2013

    def evaluate_model(self):
        known_years = []
        inferred_years = []
        for features, actual_year in zip(self.eval_features, self.eval_labels):
            inferred_year = self.infer_vehicle_year(features)
            known_years.append(actual_year)
            inferred_years.append(inferred_year)
        rmse = mean_squared_error(known_years, inferred_years, squared=False)
        print(f"RMSE: {rmse}")
        abs_avg_error = np.mean(np.abs(np.array(known_years) - np.array(inferred_years)))
        print(f"Mean Absolute Error: {abs_avg_error}")
        self.eval_features = []
        self.eval_labels = []

    def run(self):
        try:
            while True:
                msg = self.consumer.poll(timeout=1.0)
                if msg is None:
                    continue
                if msg.error():
                    raise Exception(msg.error())
                else:
                    record = json.loads(msg.value().decode('utf-8'))
                    try:
                        vehicle_year = int(record['Vehicle Year'])
                    except ValueError:
                        vehicle_year = 0
                        print(f"Invalid vehicle year {record['Vehicle Year']} for ticket {record['Summons Number']}")
                    self.total_samples += 1

                    if not self.warmup_complete:
                        self.warmup_phase(record)
                    else:
                        if self.total_samples % 3000 < 100:
                            if vehicle_year != 0:
                                features = self.extract_features(record)
                                self.eval_features.append(features)
                                self.eval_labels.append(vehicle_year)
                                self.eval_samples_collected += 1
                                if self.eval_samples_collected == 100:
                                    self.evaluate_model()
                                    self.eval_samples_collected = 0
                        else:
                            if vehicle_year == 0:
                                features = self.extract_features(record)
                                inferred_year = self.infer_vehicle_year(features)
                                inferred_year = int(inferred_year)
                                record['Vehicle Year'] = inferred_year
                                print(f"Inferred vehicle year {inferred_year} for ticket {record['Summons Number']}")
                            else:
                                self.collect_samples(record)

        except KeyboardInterrupt:
            print("Consumer interrupted.")
        finally:
            self.consumer.close()
            print("Finished receiving all data")

if __name__ == "__main__":
    consumer = VehicleYearInferenceWithSGD(
        topic='NYTickets',
        bootstrap_servers='localhost:9091,localhost:9092,localhost:9093',
        group_id='vehicle_year_inference'
    )
    consumer.run()


Overwriting consumer_sgd.py


In [9]:
%%writefile consumer_ipca_knn.py
import confluent_kafka as kafka
import json
import numpy as np
from sklearn.decomposition import IncrementalPCA
from sklearn.neighbors import KNeighborsRegressor
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.metrics import mean_squared_error
from datetime import datetime

class VehicleYearInferenceWithIPCAKNN:
    def __init__(self, topic, bootstrap_servers, group_id):
        self.consumer = kafka.Consumer({
            'bootstrap.servers': bootstrap_servers,
            'group.id': group_id,
            'auto.offset.reset': 'earliest'
        })
        self.topic = topic
        self.consumer.subscribe([topic])
        
        self.categorical_columns = ['Registration State', 'Plate Type', 'Vehicle Body Type', 'Vehicle Make', 
                                    'Issuing Agency', 'Vehicle Color']
        self.numeric_columns = ['Violation Code', 'Street Code1', 'Street Code2', 'Street Code3', 
                                'Violation Precinct', 'Issuer Precinct', 'Issuer Code', 
                                'Feet From Curb', 'Violation Location', 'Unregistered Vehicle?']
        self.date_columns = ['Issue Date', 'Vehicle Expiration Date', 'Date First Observed']
        self.time_columns = ['Violation Time']

        self.onehot_encoders = {col: OneHotEncoder(handle_unknown='ignore') for col in self.categorical_columns}
        self.scaler = StandardScaler()
        self.ipca = IncrementalPCA(n_components=10)
        self.knn = KNeighborsRegressor(n_neighbors=5)
        self.buffer = []
        self.total_samples = 0
        self.mean_shift = 2000
        self.prepare_encoders()
        self.already_fit = False

    def prepare_encoders(self):
        with open('unique_values.json', 'r') as f:
            unique_values = json.load(f)
        
        for key, values in unique_values.items():
            unique_values[key] = ["Unknown" if isinstance(v, float) and np.isnan(v) else v for v in values]
        
        for column, encoder in self.onehot_encoders.items():
            unique_value_array = np.array(unique_values[column]).reshape(-1, 1)
            encoder.fit(unique_value_array)

    def extract_features(self, record):
        features = []

        for column in self.numeric_columns:
            value = record.get(column, 0)
            try:
                features.append(float(value) if value else 0.0)
            except ValueError:
                features.append(0.0)
        
        for column in self.categorical_columns:
            value = record.get(column, 'Unknown')
            encoded = self.onehot_encoders[column].transform([[value]]).toarray().flatten()
            features.extend(encoded.tolist())
        
        for column in self.date_columns:
            date_str = record.get(column, '01/01/1970')
            try:
                if '/' not in date_str:
                    date_obj = datetime.strptime(date_str, '%Y%m%d')
                else:
                    date_obj = datetime.strptime(date_str, '%m/%d/%Y')
            except ValueError:
                date_obj = datetime.strptime('01/01/1970', '%m/%d/%Y')
            features.extend([date_obj.year, date_obj.month, date_obj.day])
        
        for column in self.time_columns:
            time_str = record.get(column, '0000A')
            minutes = self.convert_to_minutes(time_str)
            features.append(minutes)
        
        return np.array(features)

    def convert_to_minutes(self, time_str):
        if time_str[-1] == 'P' and int(time_str[:2]) != 12:
            minutes = (int(time_str[:2]) + 12) * 60 + int(time_str[2:4])
        elif time_str[-1] == 'A' and int(time_str[:2]) == 12:
            minutes = int(time_str[2:4])
        else:
            minutes = int(time_str[:2]) * 60 + int(time_str[2:4])
        return minutes

    def update_models(self):
        buffer_features, buffer_labels = zip(*self.buffer)
        buffer_features = np.array(buffer_features)
        buffer_labels = np.array(buffer_labels)

        # Fit scaler and IPCA incrementally
        self.scaler.partial_fit(buffer_features)
        scaled_features = self.scaler.transform(buffer_features)
        self.ipca.partial_fit(scaled_features)

        # Transform features using updated IPCA
        transformed_features = self.ipca.transform(scaled_features)

        # Update kNN model
        self.knn.fit(transformed_features, buffer_labels)

        self.buffer = []

    def infer_vehicle_year(self, features):
        scaled_features = self.scaler.transform([features])
        transformed_features = self.ipca.transform(scaled_features)
        return self.knn.predict(transformed_features)[0] + self.mean_shift

    def evaluate_model(self):
        known_years = []
        inferred_years = []
        for features, actual_year in self.buffer:
            inferred_year = self.infer_vehicle_year(features)
            known_years.append(actual_year + self.mean_shift)
            inferred_years.append(inferred_year)
        rmse = mean_squared_error(known_years, inferred_years, squared=False)
        print(f"RMSE: {rmse}")
        abs_avg_error = np.mean(np.abs(np.array(known_years) - np.array(inferred_years)))
        print(f"Mean Absolute Error: {abs_avg_error}")
        self.buffer = []

    def run(self):
        try:
            while True:
                msg = self.consumer.poll(timeout=1.0)
                if msg is None:
                    continue
                if msg.error():
                    raise Exception(msg.error())
                record = json.loads(msg.value().decode('utf-8'))
                try:
                    vehicle_year = int(record['Vehicle Year'])
                except ValueError:
                    vehicle_year = 0
                    print(f"Invalid vehicle year {record['Vehicle Year']} for ticket {record['Summons Number']}")
                self.total_samples += 1

                if vehicle_year == 0:
                    if self.already_fit:
                        features = self.extract_features(record)
                        inferred_year = self.infer_vehicle_year(features)
                        record['Vehicle Year'] = int(inferred_year)
                        print(f"Inferred vehicle year {record['Vehicle Year']} for ticket {record['Summons Number']}")
                else:
                    features = self.extract_features(record)
                    self.buffer.append((features, vehicle_year - self.mean_shift))
                    
                    if len(self.buffer) >= 1000:
                        self.update_models()
                        self.already_fit = True
                    
                    if len(self.buffer) >= 100 and self.total_samples % 3000 < 100:
                        self.evaluate_model()

        except KeyboardInterrupt:
            print("Consumer interrupted.")
        finally:
            self.consumer.close()
            print("Finished receiving all data")

if __name__ == "__main__":
    consumer = VehicleYearInferenceWithIPCAKNN(
        topic='NYTickets',
        bootstrap_servers='localhost:9091,localhost:9092,localhost:9093',
        group_id='vehicle_year_inference'
    )
    consumer.run()


Overwriting consumer_ipca_knn.py


In [None]:
%run producer.py

In [None]:
%run consumer_all.py
%run consumer_borough.py
%run consumer_street.py