In [7]:
import pandas as pd
import datetime
import threading
import time
from queue import Queue
from collections import defaultdict

class Message:
    def __init__(self, timestamp: int, userId: str, type: str, product_Id: str, brand: str, category: str):
        self.timestamp = timestamp
        self.userId = userId
        self.type = type
        self.product_Id = product_Id
        self.brand = brand
        self.category = category

    def __str__(self):
        return f"Message:\n\tUser Id: {self.userId}\n\tAt Timestamp: {self.timestamp}\n\tType: {self.type}\n\tProduct Id: {self.product_Id}\n\tBrand: {self.brand}\n\tCategory: {self.category}"

class KafkaBroker:
    def __init__(self):
        self.message_queues = {}

    def produce_message(self, message: Message, topic: str) -> None:
        if topic not in self.message_queues:
            self.message_queues[topic] = Queue()
        self.message_queues[topic].put(message)

    def consume_messages(self):
        while True:
            messages_exist = False
            for index, message_queue in self.message_queues.items():
                if not message_queue.empty():
                    messages_exist = True
                    yield message_queue.get()
            if not messages_exist:
                time.sleep(0.01)
                continue
            yield None  # Yield None when there are no more messages to consume

class KafkaProducer:
    def __init__(self, broker: KafkaBroker, topic: str, data: pd.DataFrame, name: str):
        self.broker = broker
        self.topic = topic
        self.data = data
        self.thread = None
        self.name = name  # Add the 'name' attribute

    def start(self):
        self.thread = threading.Thread(target=self.produce_messages)
        self.thread.start()

    def send(self, message: Message):
        #print(f"Sending message from {self.name}: {message}")
        self.broker.produce_message(message, self.topic)

    def produce_messages(self):
        df = self.data.sample(10000)

        for _, row in df.iterrows():
            timestamp = int(pd.Timestamp(row['event_time']).timestamp())
            user_id = str(row['user_id'])
            event_type = str(row['event_type'])
            product_id = row['product_id']
            brand = row['brand']
            category = row['category_code']
            message = Message(timestamp, user_id, event_type, product_id, brand, category)
            self.send(message)
            time.sleep(0.001)
        time.sleep(1)

class KafkaConsumer:
    def __init__(self, broker: KafkaBroker, topic: str, layer_type: str, name: str):
        self.broker = broker
        self.topic = topic
        self.queue = Queue()
        self.thread = None
        self.layer_type = layer_type
        self.name = name  # Add the 'name' attribute

    def start(self):
        self.thread = threading.Thread(target=self.run)
        self.thread.start()

    def run(self):
        for message in self.broker.consume_messages():
            if message is not None:
                #print(f"{self.layer_type} Layer {self.name} received message: {message}")
                self.queue.put(message)

    def poll(self, timeout=1):
        try:
            return self.queue.get(timeout=timeout)
        except:
            return None

class BatchLayer:
    def __init__(self, start_date, end_date):
        self.popularity = {}
        self.buckets = {}
        self.user_interactions = defaultdict(set)
        self.start_date = start_date
        self.end_date = end_date

    def precompute_trending(self, min_change_threshold=10):
        trending_items = {}
        if len(self.buckets) >= 2:
            for bucket in range(len(self.buckets) - 1):
                current_bucket = self.buckets.get(bucket, {})
                next_bucket = self.buckets.get(bucket + 1, {})
                trending_items[bucket] = {
                    product: ((next_bucket.get(product, 0) - current_bucket.get(product, 0)) / max(current_bucket.get(product, 0) + next_bucket.get(product, 0), 1))
                    for product in set(list(current_bucket.keys()) + list(next_bucket.keys()))
                    if current_bucket.get(product, 0) > 0 and abs((next_bucket.get(product, 0) - current_bucket.get(product, 0))) >= min_change_threshold
                }

            trending_flat = {}
            for items in trending_items.values():
                trending_flat.update(items)

            top_trending_items = dict(sorted(trending_flat.items(), key=lambda item: item[1], reverse=True)[:5])
            product_ids = list(top_trending_items.keys())
            trending_percentages = list(top_trending_items.values())

            # Ensure trending percentages are positive
            trending_percentages = [percentage if percentage < -1 else -1 * percentage for percentage in trending_percentages]

            return product_ids, trending_percentages
        else:
            print("Insufficient data to compute trending.")
            return [], []

    def process_realtime_event(self, message):
        event_time = pd.to_datetime(message.timestamp, unit='s')
        event_type = message.type
        product_id = message.product_Id
        user_id = message.userId

        if event_type in ['view', 'cart']:
            self.user_interactions[user_id].add(product_id)

        if event_type not in ['view', 'cart']:
            if self.start_date <= event_time.date() <= self.end_date:
                # Update popularity count for the product
                self.popularity[product_id] = self.popularity.get(product_id, 0) + 1

                # Update popularity for each product in the current bucket
                bucket_start = (event_time.date() - self.start_date).days // 7
                self.buckets.setdefault(bucket_start, {})
                for prod_id in self.popularity:
                    self.buckets[bucket_start][prod_id] = self.popularity[prod_id]

class SpeedLayer:
    def __init__(self, max_interactions=5):
        self.max_interactions = max_interactions
        self.user_interactions = defaultdict(list)

    def process_realtime_event(self, message):
        product_id = message.product_Id
        brand = message.brand
        category = message.category

        user_id = message.userId
        if user_id not in self.user_interactions:
            self.user_interactions[user_id] = [(product_id, brand, category)]
        else:
            self.user_interactions[user_id].append((product_id, brand, category))
            if len(self.user_interactions[user_id]) > self.max_interactions:
                self.user_interactions[user_id] = self.user_interactions[user_id][-self.max_interactions:]

    def find_similar_products(self, user_id, num_similar=5):
        user_interactions = self.user_interactions.get(user_id, [])
        similar_products = defaultdict(float)
        for u_id, interactions in self.user_interactions.items():
            if u_id != user_id:
                common_interactions = len(set(interactions).intersection(user_interactions))
                similarity_score = common_interactions / (len(user_interactions) + len(interactions) - common_interactions)
                for interaction in interactions:
                    product_id, brand, category = interaction
                    similar_products[product_id] += similarity_score

        similar_products = dict(sorted(similar_products.items(), key=lambda item: item[1], reverse=True))
        return list(similar_products.keys())[:num_similar]

class ServingLayer:
    def __init__(self):
        self.precomputed_results = []

    def store_precomputed_results(self, product_ids, trending_percentages):
        self.precomputed_results = list(zip(product_ids, trending_percentages))
        print("Precomputed results stored.")

    def __str__(self):
        return str(self.precomputed_results)


if __name__ == "__main__":
    data = pd.read_csv('2019-Nov.csv')
    broker = KafkaBroker()
    consumer_batch = KafkaConsumer(broker, 'user_activities', layer_type='Batch', name='Consumer Batch')
    consumer_speed = KafkaConsumer(broker, 'user_activities', layer_type='Speed', name='Consumer Speed')
    producer = KafkaProducer(broker, 'user_activities', data, name='Producer')

    producer.start()
    consumer_batch.start()
    consumer_speed.start()

    batch_layer = BatchLayer(start_date=datetime.date(2019, 11, 1), end_date=datetime.date(2019, 11, 30))
    speed_layer = SpeedLayer()

    timeout_seconds = 20
    consumed_messages = 0
    start_time = time.time()

    while True:
        message_batch = consumer_batch.poll()
        message_speed = consumer_speed.poll()

        if message_batch:
            consumed_messages += 1
            #print(f"[#{consumed_messages}] {consumer_batch.name} received: {message_batch}")
            batch_layer.process_realtime_event(message_batch)
        if message_speed:
            consumed_messages += 1
            #print(f"[#{consumed_messages}] {consumer_speed.name} received: {message_speed}")
            speed_layer.process_realtime_event(message_speed)

        if not message_batch and not message_speed and time.time() - start_time >= timeout_seconds:
            break

        time.sleep(0.01)



    print("Batch Layer Results:")
    min_change_threshold = 1
    product_ids, trending_percentages = batch_layer.precompute_trending(min_change_threshold)
    print("Product IDs:", product_ids)
    print("Trending Percentages:", trending_percentages)

    print("Speed Layer Results:")
    user_id = '532647354'
    similar_products = speed_layer.find_similar_products(user_id, num_similar=5)
    print("Similar Product Recommendations for User {}: {}".format(user_id, similar_products))

    serving_layer = ServingLayer()

    serving_layer.store_precomputed_results(product_ids, trending_percentages)

    print("Serving Layer Results:")
    stored_results = serving_layer.__str__()
    print("Stored Results:", stored_results)

    # Extracting the product with the highest trending percentage
    highest_trending_product = None
    highest_trending_percentage = 0
    for product_id, trending_percentage in zip(product_ids, trending_percentages):
        if trending_percentage > highest_trending_percentage:
            highest_trending_product = product_id
            highest_trending_percentage = trending_percentage
    print("Product with the Highest Trending Percentage:", highest_trending_product)

    # Extracting the most similar product from the speed layer
    most_similar_product = similar_products[0]
    print("Most Similar Product from Speed Layer:", most_similar_product)

Batch Layer Results:
Product IDs: [2702277, 14700042, 1004875, 1201504, 26202882]
Trending Percentages: [-0.3333333333333333, 1.0, 1.0, 1.0, 1.0]
Speed Layer Results:
Similar Product Recommendations for User 532647354: [2601036, 100004742, 16700606, 1005003, 1004403]
Precomputed results stored.
Serving Layer Results:
Stored Results: [(2702277, -0.3333333333333333), (14700042, 1.0), (1004875, 1.0), (1201504, 1.0), (26202882, 1.0)]
Product with the Highest Trending Percentage: 14700042
Most Similar Product from Speed Layer: 2601036


In [None]:
#Display results to the user
#Batch Layer and speed Layer emit scores for products based on what was the most trending 
#print out which class name recives messages 

In [None]:
#https://github.com/avbatchelor/trail-conditions
