In [1]:
import pandas as pd
import numpy as np
import numbers
from datetime import datetime, date, timedelta

In [2]:
# November dataset
start_date = datetime(2019, 11, 1)
end_date = datetime(2019, 11, 9) 

nov_df = pd.read_csv('datasets/2019-Nov.csv')
nov_df = nov_df.dropna()

# Converting the event_time into datetime and removing the fractional seconds from the event_time column
nov_df["event_time"] = pd.to_datetime(nov_df["event_time"]).dt.strftime('%Y-%m-%d %H:%M:%S')

# Filtering the dataframe from start date to end date
nov_df_filtered = nov_df[(nov_df["event_time"] >= f"{start_date} 00:00:00") & (nov_df["event_time"] <= f"{end_date} 23:59:59")]

nov_df_filtered

Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
66,2019-11-01 00:04:33,purchase,5766980,1487580013053083824,stationery.cartrige,italwax,1.98,564451209,861ab2f1-b2e5-886f-a93b-5b067eff081f
67,2019-11-01 00:04:33,purchase,5767494,1487580013053083824,stationery.cartrige,italwax,2.14,564451209,861ab2f1-b2e5-886f-a93b-5b067eff081f
68,2019-11-01 00:04:33,purchase,5775813,1487580013053083824,stationery.cartrige,italwax,1.98,564451209,861ab2f1-b2e5-886f-a93b-5b067eff081f
69,2019-11-01 00:04:33,purchase,5775814,1487580013053083824,stationery.cartrige,italwax,1.98,564451209,861ab2f1-b2e5-886f-a93b-5b067eff081f
242,2019-11-01 00:16:56,view,5885596,1487580006350586771,appliances.environment.vacuum,polarus,102.38,202438687,8dc848f5-bac3-44d7-9414-75d4e599abaf
...,...,...,...,...,...,...,...,...,...
1151230,2019-11-08 23:42:34,remove_from_cart,5712521,2007399943458784057,apparel.glove,nitrimax,10.16,514840020,84904b76-2b92-4f02-a81f-f07cdfbc8195
1151232,2019-11-08 23:42:39,remove_from_cart,5712695,2007399943458784057,apparel.glove,nitrimax,10.32,514840020,84904b76-2b92-4f02-a81f-f07cdfbc8195
1151269,2019-11-08 23:43:49,view,5873430,2007399943458784057,apparel.glove,benovy,7.46,435124065,91961db2-9246-4e09-a762-128aa8d14901
1151302,2019-11-08 23:44:59,remove_from_cart,5732026,1487580011970953351,furniture.bathroom.bath,domix,4.16,563851655,1d44a696-faba-4970-a392-271574c6ca71


In [3]:
import threading
import time
from queue import Queue, Empty
import os



class Message:
    """
    Class represents an individual action from the user in a message object way
    """


    def __init__(self, event_time, event_type, product_id, category_id, brand, price, user_id, user_session):
        """
        :param event_time: Represent the time of the event
        :param event_type: Represent the type of the event
        :param product_id: Represent the the id of a specific product
        :param category_id: Represent the id of a category
        :param brand: identifies product's brand
        :param price: price of the product
        :param user_id: User id
        :param user_session: Defines the user_session reference
        """
        self.event_time = event_time
        self.event_type = event_type
        self.product_id = product_id
        self.category_id = category_id
        self.brand = brand
        self.price = price
        self.user_id = user_id
        self.user_session = user_session
        

    # Getters for parameters
    def event_time_getter(self):
        return self.event_time

    def event_type_getter(self):
        return self.event_type
        
    def product_id_getter(self):
        return self.product_id

    def brand_getter(self):
        return self.brand

    def price_getter(self):
        return self.price

    def user_id_getter(self):
        return self.user_id

    def to_dict(self):
        return {
            "event_time": self.event_time,
            "event_type": self.event_type,
            "product_id": self.product_id,
            "category_id": self.category_id,
            "brand": self.brand,
            "price": self.price,
            "user_id": self.user_id,
            "user_session": self.user_session
        }

    def __str__(self):
        return str(self.to_dict())



class KafkaBroker:

    """
    Class mimics a Kafka broker, holds 2 queues for speed and batch layers.
    """

    
    def __init__(self):
        """
        Constructor holds batch and speed queues, each one is responsible for producing and consuming messages
        according to the specified queue.
        """
        self.message_queues_batch = {}
        self.message_queues_speed = {}

        
    # Producing messages according to received topics
    # The reason we have topics for producing messages is to check each topic's existance in each queue since we can have multiple topics
    def produce_message(self, message: Message, topic: str) -> None:

        if topic == "batch_topic":
            if topic not in self.message_queues_batch:
                self.message_queues_batch[topic] = Queue()
            
            self.message_queues_batch[topic].put(message)

        if topic == "speed_topic":
            if topic not in self.message_queues_speed:
                self.message_queues_speed[topic] = Queue()
            
            self.message_queues_speed[topic].put(message)

    # Checking callers and consuming messages
    def consume_messages(self, caller : str, interval: float = 1.0):
        
        if caller == "batch":
            while True:
                for topic, message in self.message_queues_batch.items():
                    try:
                        message = self.message_queues_batch[topic].get_nowait()
                        yield message
                    
                    except Empty:
                        pass
        
        if caller == "speed":
            while True:
                for topic, message in self.message_queues_speed.items():
                    try:
                        message = self.message_queues_speed[topic].get_nowait()
                        yield message
                    
                    except Empty:
                        pass



class KafkaProducer:
    """
    Class represents a Kafka producer which is responsible to produce messages 
    into Kafka Broker.
    """

    def __init__(self, broker: KafkaBroker, batch_topic: str, speed_topic: str):
        """
        :param broker: Represents a kafka broker
        :param batch_topic: Defines the batch layer's topic name
        :param speed_topic: Defines the speed layer's topic name
        """
        self.broker = broker
        self.batch_topic = batch_topic
        self.speed_topic = speed_topic
        self.thread = None

    # Starting produce_messages process in a thread
    def start(self):
        self.thread = threading.Thread(target=self.produce_messages)
        self.thread.start()

    
    def send(self, message: Message):
        """
        Method validates the batch and speed topics after which
        a message object is being sent to Kafka broker

        Parameters:
            :param message: Represents the message object from produce_messages method
        """
        if self.batch_topic:
            self.broker.produce_message(message, self.batch_topic)
        if self.speed_topic:
            self.broker.produce_message(message, self.speed_topic)

    
    def produce_messages(self):
        """
        Method creates message objects and sends to Kafka broker
        """
        for index, row in nov_df_filtered.iterrows():
            event_time = row["event_time"]
            event_type = row["event_type"]
            product_id = row["product_id"]
            category_id = row["category_id"]
            brand = row["brand"]
            price = row["price"]
            user_id = row["user_id"]
            user_session = row["user_session"]

            # Creating message object
            message_obj = Message(event_time, event_type, product_id, category_id, brand, price, user_id, user_session)

            # Sending it to KafkaBroker
            self.send(message_obj)
            time.sleep(0.1)


class BatchLayer:

    """
    Class represents a Batch layer which receives and keeps user acitivity for a long term.
    User activity is being pre-computed on a daily basis with help of the trending_logic method.
    Results are being passed into Serving layer.
    """
    
    def __init__(self, broker: KafkaBroker):
        """
        :param broker (KafkaBroker): Defines the Kafka broker
        """
        self.broker = broker
        self.queue = Queue()
        self.thread = None
        self.product_popularity = {}
        self.daily_purchase_count = {}

    def start(self):
        self.thread = threading.Thread(target=self.run)
        self.thread.start()

        
    # Consuming messages from kafka broker for the Batch layer
    # Consumed mesages are being put into queue
    def run(self):
        for message in self.broker.consume_messages("batch"):
            if message is not None:
                self.queue.put(message)

    # Pulling each message from the queue
    def poll(self, timeout=1):
        try:
            return self.queue.get(timeout=timeout) # Waiting 1 second before raising an exception
        except Empty:
            return None
        
    
    def accepting_messages(self):
        """
        Method accepts messages into Batch layer and evaluates product's popularity by purchase and cart activities
        Displays the total amount of accepted messages at the end.
        """
        counter = 0
        
        while True:
            
            if self.queue.empty():
                print(f"Accepted a total of {counter} messages for batch layer!")
                break
                
            while not self.queue.empty():
                message = self.poll()
                
                if message is None:
                    break
                    
                print(f"[BATCH LAYER] Received message\n{message}")
                print("")
                counter += 1
            
                event_time = message.event_time_getter()
                event_time = datetime.strptime(event_time, "%Y-%m-%d %H:%M:%S")
                event_type = message.event_type_getter()
                product_id = message.product_id_getter()
                user_id = message.user_id_getter()
                brand = message.brand_getter()
            

                if event_type == "purchase" or event_type == "cart":
                    date_key = event_time.date()

                    # Adding product id into product_popularity dict, and incrementing the value by 1 each time seen
                    self.product_popularity[product_id] = self.product_popularity.get(product_id, 0) + 1

                    # Getting the daily purchase count for the current date
                    self.daily_purchase_count[date_key] = self.daily_purchase_count.get(date_key, {})
                    self.daily_purchase_count[date_key][product_id] = self.daily_purchase_count[date_key].get(product_id, 0) + 1

            # Adding one second wait for the message queue re-check
            time.sleep(1)

    
    def trending_logic(self, start_date, end_date):
        """
        Args:
            start_date: Defines the start date for trending logic
            end_date: Defines the end date for trending logic
        """
        
        # Converting dates into date format so it will match with dataframe's date type
        start_date = start_date.date()
        end_date = end_date.date()
        
        min_threshold = 10
        trending_items = {} # Storing trending percentages on daily basis

        # Calculating daily trending percentages
        for date_key in sorted(self.daily_purchase_count.keys()):
            
            current_day_purchase = self.daily_purchase_count[date_key]
            next_day_purchase = self.daily_purchase_count.get(date_key + timedelta(days=1), {})
            
            trending_items[date_key] = {
                # Calculating the increase over past day
                product: ((next_day_purchase.get(product, 0) - current_day_purchase.get(product, 0)) / current_day_purchase.get(product, 1)) * 100
                
                for product in set(list(current_day_purchase.keys()) + list(next_day_purchase.keys()))
                if current_day_purchase.get(product, 0) > 0
            }

        
        # Getting the most popular items for the last day if data is available for that day
        last_day_purchase = self.daily_purchase_count.get(end_date, {})
            
        # Sorting the popular items
        most_popular_items = sorted(last_day_purchase, key=last_day_purchase.get, reverse=True)[:10]

        # Getting recommended items based on trending percentages for the second last day if data is available for that day
        second_last_day = end_date - timedelta(days=1)
        recommended_items = set(trending_items.get(second_last_day, {})).intersection(most_popular_items)
        
        # Creating lists to store brands, product_ids, and trending percentages for recommended items
        brands = []
        product_ids = []
        trending_percentages = []

        for item in recommended_items:
            trending_percentage = trending_items.get(second_last_day, {}).get(item, 0)
            brand = nov_df_filtered[nov_df_filtered["product_id"] == item]["brand"].iloc[0]

            if trending_percentage >= min_threshold:
                product_ids.append(item)
                brands.append(brand)
                trending_percentages.append(trending_percentage)

        # Returning trending percentages along with products
        if trending_percentages != [] and product_ids != []:
            return trending_percentages, product_ids
        
        else:
            print("Could not complete the trending logic !")
            return None



class SpeedLayer:
    """
    Class represents a Speed layer which receives and evaluates real time user's activity
    Returns user's most 50 recent activity of brands
    """
    
    def __init__(self, broker: KafkaBroker):
        self.broker = broker
        self.queue = Queue()
        self.thread = None
        self.brands_activity = {} # Contains users ids and last 50 brands 

    def start(self):
        self.thread = threading.Thread(target=self.run)
        self.thread.start()

    # Consuming messages from the broker
    def run(self):
        for message in self.broker.consume_messages("speed"):
            self.queue.put(message)

    
    def poll(self):
        """
        Method pulls messages from the queue and fills the brands activity dictionary for 
        user's most recent 50 brands. 
        Displays the total received messages at the end.
        """
        counter = 0
        
        while True:
            if self.queue.empty():
                print(f"Accepted a total of {counter} messages for speed layer!")
                break

            while not self.queue.empty():
                
                try:
                    message = self.queue.get(timeout=1)
                    print(f"[SPEED LAYER] Received a message\n{message}")
                    print("")
            
                    if message is None:
                        return None
            
                    else:
                        counter += 1
                        user_id = message.user_id_getter()
                        brand = message.brand_getter()

                        # Storing into the dictionary
                        if user_id not in self.brands_activity.keys():
                            self.brands_activity[user_id] = []
                            self.brands_activity[user_id].append(brand)
                
                        else:
                            if len(self.brands_activity[user_id]) >= 50:
                                # In case if the list reaches the 50 max capacity, we can remove the first element
                                self.brands_activity[user_id].pop(0)
                        
                            self.brands_activity[user_id].append(brand)
            
                except:
                    return None
            
            time.sleep(1)

    
    
    # Returning chosen user's activity of last 50 brands
    def return_user_activity(self, requested_user_id):
        return self.brands_activity[requested_user_id]

          
class ServingLayer:
    """
    Class represents a container for Batch layer's results
    """

    def __init__(self):
        self.batch_precomputed_results = {}

    def storing_results(self, product_ids, trending_percentages):
        
        for product_id, trending_percentage in zip(product_ids, trending_percentages):
            self.batch_precomputed_results[product_id] = trending_percentage

        print("[RESULTS FROM THE BATCH LAYER SUCCESSFULLY STORED INTO THE SERVING LAYER]\n")

    def batch_results_getter(self, start_date, end_date):
        results = []

        print(f"Over {(end_date2 - start_date2).days} days trending analysis\n")
        
        for product, trending_percentage_increase in self.batch_precomputed_results.items():
            result = f"Product {product} - Increase of % {trending_percentage_increase}"
            results.append(result)

        return results


if __name__ == "__main__":
    
    broker = KafkaBroker()
    
    batch_topic = 'batch_topic'
    speed_topic = 'speed_topic'

    # Creating the producer and starting it in a thread
    producer = KafkaProducer(broker, batch_topic, speed_topic)
    producer.start()
    time.sleep(1)

    batch_consumer = BatchLayer(broker)
    speed_consumer = SpeedLayer(broker)
    serving_layer = ServingLayer()

    batch_consumer.start()
    time.sleep(1)
    speed_consumer.start()
    time.sleep(1)

    batch_consumer_thread = threading.Thread(target=batch_consumer.accepting_messages)
    speed_consumer_thread = threading.Thread(target=speed_consumer.poll)
    batch_consumer_thread.start()
    speed_consumer_thread.start()

    # From this point the code will be waiting until both batch and speed layer threads finish
    batch_consumer_thread.join()
    speed_consumer_thread.join()
    time.sleep(1)

    
    start_date2 = datetime(2019, 11, 1)
    end_date2 = datetime(2019, 11, 8)

    # Calling the trending logic method for a specified date (Can be any optional date from withinh the dataframe period)
    trending_result = batch_consumer.trending_logic(start_date2, end_date2)
    trending_percentages, product_ids = trending_result

    # Storing results into serving layer and displaying those
    serving_layer.storing_results(product_ids, trending_percentages)
    batch_results = serving_layer.batch_results_getter(start_date2, end_date2)

    
    for product_increase in batch_results:
        print(product_increase)
    
    print("")

    # Displaying Serving layer's results
    # For displaying purposes we have the below 7 different user ids
    try:
        print(f"Last possible 50 brands for user 389123726\n{speed_consumer.return_user_activity(389123726)}")
        print(f"Last possible 50 brands for user 568749567\n{speed_consumer.return_user_activity(568749567)}")
        print(f"Last possible 50 brands for user 555444394\n{speed_consumer.return_user_activity(555444394)}")
        print(f"Last possible 50 brands for user 308571663\n{speed_consumer.return_user_activity(308571663)}")
        print(f"Last possible 50 brands for user 564451209\n{speed_consumer.return_user_activity(564451209)}")
        print(f"Last possible 50 brands for user 466373304\n{speed_consumer.return_user_activity(466373304)}")
        print(f"Last possible 50 brands for user 558282780\n{speed_consumer.return_user_activity(558282780)}")

    except KeyError:
        print("Unable to find one or more of the requested users!")


    # Extracting the user activity data
    user_activities = {
        389123726: speed_consumer.return_user_activity(389123726),
        568749567: speed_consumer.return_user_activity(568749567),
        555444394: speed_consumer.return_user_activity(555444394),
        308571663: speed_consumer.return_user_activity(308571663),
        564451209: speed_consumer.return_user_activity(564451209),
        466373304: speed_consumer.return_user_activity(466373304),
        558282780: speed_consumer.return_user_activity(558282780),
    }

    # Getting the target brand
    def get_brand_from_product_id(product_id):
        analysis_passed_products = nov_df_filtered[nov_df_filtered["product_id"] == product_id] 
        target_brand = analysis_passed_products['brand'].iloc[0]
    
        return target_brand
    
    # Iterating through user activities and finding recommendations
    for user_id, user_brands in user_activities.items():
        recommended_products = set()

        for brand in user_brands:
            for product_id in product_ids:
                product_brand = get_brand_from_product_id(product_id)

                if brand == product_brand:
                    recommended_products.add(product_id)

        if recommended_products:
            print(f"\nFor user {user_id}:")
            for product_id in recommended_products:
                increase_percentage = serving_layer.batch_precomputed_results.get(product_id, "N/A")
                print(f"Recommended product {product_id} - Increase of {increase_percentage}%")
        else:
            print(f"\nNo recommendations for user {user_id}\n")



[BATCH LAYER] Received message
{'event_time': '2019-11-01 00:04:33', 'event_type': 'purchase', 'product_id': 5766980, 'category_id': 1487580013053083824, 'brand': 'italwax', 'price': 1.98, 'user_id': 564451209, 'user_session': '861ab2f1-b2e5-886f-a93b-5b067eff081f'}
[SPEED LAYER] Received a message
{'event_time': '2019-11-01 00:04:33', 'event_type': 'purchase', 'product_id': 5766980, 'category_id': 1487580013053083824, 'brand': 'italwax', 'price': 1.98, 'user_id': 564451209, 'user_session': '861ab2f1-b2e5-886f-a93b-5b067eff081f'}

[SPEED LAYER] Received a message
{'event_time': '2019-11-01 00:04:33', 'event_type': 'purchase', 'product_id': 5767494, 'category_id': 1487580013053083824, 'brand': 'italwax', 'price': 2.14, 'user_id': 564451209, 'user_session': '861ab2f1-b2e5-886f-a93b-5b067eff081f'}

[SPEED LAYER] Received a message
{'event_time': '2019-11-01 00:04:33', 'event_type': 'purchase', 'product_id': 5775813, 'category_id': 1487580013053083824, 'brand': 'italwax', 'price': 1.98, 'u

In [None]:
# Add comments
# Good ReadMe    

In [None]:
import threading
import time
from queue import Queue, Empty
import os


"""
Class is represents an individual action from the user in a message object way
"""
class Message:


    """
    :param event_time: Represent the time of the event
    :param event_type: Represent the type of the event
    :param product_id: Represent the the id of a specific product
    :param category_id: Represent the id of a category
    :param brand: identifies product's brand
    :param price: price of the product
    :param user_id: User id
    :param user_session: Defines the user_session reference
    """
    def __init__(self, event_time, event_type, product_id, category_id, brand, price, user_id, user_session):
        self.event_time = event_time
        self.event_type = event_type
        self.product_id = product_id
        self.category_id = category_id
        self.brand = brand
        self.price = price
        self.user_id = user_id
        self.user_session = user_session
        

    # Getters for parameters
    def event_time_getter(self):
        return self.event_time

    def event_type_getter(self):
        return self.event_type
        
    def product_id_getter(self):
        return self.product_id

    def brand_getter(self):
        return self.brand

    def price_getter(self):
        return self.price

    def user_id_getter(self):
        return self.user_id

    def to_dict(self):
        return {
            "event_time": self.event_time,
            "event_type": self.event_type,
            "product_id": self.product_id,
            "category_id": self.category_id,
            "brand": self.brand,
            "price": self.price,
            "user_id": self.user_id,
            "user_session": self.user_session
        }

    def __str__(self):
        return str(self.to_dict())


"""
Class mimics a Kafka broker, holds 2 queues for speed and batch layer.
"""
class KafkaBroker:

    """
    Constructor holds batch and speed queues, each one is responsible for producing and consuming messages
    according to the specified queue.
    """
    def __init__(self):
        self.message_queues_batch = {}
        self.message_queues_speed = {}

        
    # Producing messages
    def produce_message(self, message: Message, topic: str) -> None:
        
        if topic == "batch_topic":
            if topic not in self.message_queues_batch:
                self.message_queues_batch[topic] = Queue()
            
            self.message_queues_batch[topic].put(message)

        if topic == "speed_topic":
            if topic not in self.message_queues_speed:
                self.message_queues_speed[topic] = Queue()
            
            self.message_queues_speed[topic].put(message)

    # Consuming messages
    def consume_messages(self, caller : str, interval: float = 1.0):
        
        if caller == "batch":
            while True:
                for topic, message in self.message_queues_batch.items():
                    try:
                        message = self.message_queues_batch[topic].get_nowait()
                        yield message
                    
                    except Empty:
                        pass
        
        if caller == "speed":
            while True:
                for topic, message in self.message_queues_speed.items():
                    try:
                        message = self.message_queues_speed[topic].get_nowait()
                        yield message
                    
                    except Empty:
                        pass


"""
Class represents a Kafka producer which is responsible to produce messages 
to a Kafka Broker.
"""
class KafkaProducer:

    """
    :param broker: Represents a kafka broker
    :param batch_topic: Defines the batch layer's topic name
    :param speed_topic: Defines the speed layer's topic name
    """
    def __init__(self, broker: KafkaBroker, batch_topic: str, speed_topic: str):
        self.broker = broker
        self.batch_topic = batch_topic
        self.speed_topic = speed_topic
        self.thread = None

    def start(self):
        self.thread = threading.Thread(target=self.produce_messages)
        self.thread.start()

    """
    Method validates the batch and speed topics after which
    a message object is being send to Kafka broker
    :param message: Represents the message object from produce_messages method
    """
    def send(self, message: Message):
        if self.batch_topic:
            self.broker.produce_message(message, self.batch_topic)
        if self.speed_topic:
            self.broker.produce_message(message, self.speed_topic)

    
    """
    Method creates message objects and sends to Kafka broker
    """
    def produce_messages(self):
        for index, row in nov_df_filtered.iterrows():
            event_time = row["event_time"]
            event_type = row["event_type"]
            product_id = row["product_id"]
            category_id = row["category_id"]
            brand = row["brand"]
            price = row["price"]
            user_id = row["user_id"]
            user_session = row["user_session"]

            # Creating message object
            message_obj = Message(event_time, event_type, product_id, category_id, brand, price, user_id, user_session)

            # Sending it to KafkaBroker
            self.send(message_obj)
            time.sleep(0.1)


class BatchLayer:

    """
    Class represents a Batch layer which receives and keeps user acitivity for a long term.
    User activity is being pre-computed on a daily basis with help of the trending_logic method.
    Results are being passed into Serving layer.
    """
    
    def __init__(self, broker: KafkaBroker, topic: str):
        """
        :param broker (KafkaBroker): Defines the Kafka broker
        :param topic (str): Defines the topic of 
        """
        self.broker = broker
        self.topic = topic
        self.queue = Queue()
        self.thread = None
        self.product_popularity = {}
        self.daily_purchase_count = {}

    def start(self):
        self.thread = threading.Thread(target=self.run)
        self.thread.start()

    def run(self):
        for message in self.broker.consume_messages("batch"):
            if message is not None:
                self.queue.put(message)
                
    def poll(self, timeout=1):
        try:
            return self.queue.get(timeout=timeout) # Waiting 1 second before raising an exception
        except Empty:
            return None
        
    
    def accepting_messages(self):
        counter = 0
        
        while True:
            
            if self.queue.empty():
                print(f"Accepted a total of {counter} messages for batch layer!\n")
                break
                
            while not self.queue.empty():
                message = self.poll()
                
                if message is None:
                    break
                    
                print(f"[BATCH LAYER] Received message\n{message}\n")
                counter += 1
            
                event_time = message.event_time_getter()
                event_time = datetime.strptime(event_time, "%Y-%m-%d %H:%M:%S")
                event_type = message.event_type_getter()
                product_id = message.product_id_getter()
                user_id = message.user_id_getter()
                brand = message.brand_getter()
            

                if event_type == "purchase" or event_type == "cart":
                    date_key = event_time.date()

                    # Adding product id into product_popularity dict, and incrementing the value by 1 each time seen
                    self.product_popularity[product_id] = self.product_popularity.get(product_id, 0) + 1

                    # Getting the daily purchase count for the current date
                    self.daily_purchase_count[date_key] = self.daily_purchase_count.get(date_key, {})
                    self.daily_purchase_count[date_key][product_id] = self.daily_purchase_count[date_key].get(product_id, 0) + 1
            
            time.sleep(1)

    
    def trending_logic(self, start_date, end_date):
        # Converting dates into date format so it will match with dataframes date type
        start_date = start_date.date()
        end_date = end_date.date()
        
        min_threshold = 10
        trending_items = {} # Storing trending percentages on daily basis

        # Calculating daily trending percentages
        for date_key in sorted(self.daily_purchase_count.keys()):
            
            current_day_purchase = self.daily_purchase_count[date_key]
            next_day_purchase = self.daily_purchase_count.get(date_key + timedelta(days=1), {})
            
            trending_items[date_key] = {
                # Calculating the increase over past day
                product: ((next_day_purchase.get(product, 0) - current_day_purchase.get(product, 0)) / current_day_purchase.get(product, 1)) * 100
                
                for product in set(list(current_day_purchase.keys()) + list(next_day_purchase.keys()))
                if current_day_purchase.get(product, 0) > 0
            }

        
        # Getting the most popular items for the last day if data is available for that day
        last_day_purchase = self.daily_purchase_count.get(end_date, {})
            
        # Sorting the popular items
        most_popular_items = sorted(last_day_purchase, key=last_day_purchase.get, reverse=True)[:10]

        # Getting recommended items based on trending percentages for the second last day if data is available for that day
        second_last_day = end_date - timedelta(days=1)
        recommended_items = set(trending_items.get(second_last_day, {})).intersection(most_popular_items)
        
        # Creating lists to store product_ids, brands, and trending percentages for recommended items
        product_ids = []

        # Creating the list for brands
        brands = []
        
        # This list stores actual trending percentages
        trending_percentages = []

        for item in recommended_items:
            trending_percentage = trending_items.get(second_last_day, {}).get(item, 0)
            brand = nov_df_filtered[nov_df_filtered["product_id"] == item]["brand"].iloc[0]

            if trending_percentage >= min_threshold:
                product_ids.append(item)
                brands.append(brand)
                trending_percentages.append(trending_percentage)

        # Returning trending percentages along with products
        if trending_percentages != [] and product_ids != []:
            return trending_percentages, product_ids
        
        else:
            print("Could not complete the trending logic !")
            return None



class SpeedLayer:
    
    def __init__(self, broker: KafkaBroker, topic: str):
        self.broker = broker
        self.topic = topic
        self.queue = Queue()
        self.thread = None
        self.brands_activity = {}

    def start(self):
        self.thread = threading.Thread(target=self.run)
        self.thread.start()

    def run(self):
        for message in self.broker.consume_messages("speed"):
            self.queue.put(message)

    
    def poll(self):
        counter = 0
        
        while True:
            if self.queue.empty():
                print(f"Accepted a total of {counter} messages for speed layer!\n")
                break

            while not self.queue.empty():
                
                try:
                    message = self.queue.get(timeout=1)
                    print(f"[SPEED LAYER] Received a message\n{message}\n")
            
                    if message is None:
                        return None
            
                    else:
                        counter += 1
                        user_id = message.user_id_getter()
                        brand = message.brand_getter()

                        # Storing into the dictionary
                        if user_id not in self.brands_activity.keys():
                            self.brands_activity[user_id] = []
                            self.brands_activity[user_id].append(brand)
                
                        else:
                            if len(self.brands_activity[user_id]) >= 50:
                                # In case if the list reaches the 50 max capacity, we can remove the first element
                                self.brands_activity[user_id].pop(0)
                        
                            self.brands_activity[user_id].append(brand)
            
                except:
                    return None
            
            time.sleep(1)

    
    """
    Returning chosen user's activity of last 50 brands
    """
    def return_user_activity(self, requested_user_id):
        
        return self.brands_activity[requested_user_id]

            
class ServingLayer:

    def __init__(self):
        self.batch_precomputed_results = {}

    def storing_results(self, product_ids, trending_percentages):
        
        for product_id, trending_percentage in zip(product_ids, trending_percentages):
            self.batch_precomputed_results[product_id] = trending_percentage

        print("[RESULTS FROM THE BATCH LAYER SUCCESSFULLY STORED INTO THE SERVING LAYER]\n")

    def batch_results_getter(self, start_date, end_date):
        results = []

        print(f"Over {(end_date2 - start_date2).days} days trending analysis\n")
        
        for product, trending_percentage_increase in self.batch_precomputed_results.items():
            result = f"Product {product} - Increase of % {trending_percentage_increase}"
            results.append(result)

        return results

if __name__ == "__main__":
    
    broker = KafkaBroker()
    
    batch_topic = 'batch_topic'
    speed_topic = 'speed_topic'

    # Creating the producer and starting it in a thread
    producer = KafkaProducer(broker, batch_topic, speed_topic)
    producer.start()
    time.sleep(1)

    batch_consumer = BatchLayer(broker, batch_topic)
    speed_consumer = SpeedLayer(broker, speed_topic)
    serving_layer = ServingLayer()

    batch_consumer.start()
    time.sleep(1)
    speed_consumer.start()
    time.sleep(1)

    batch_consumer_thread = threading.Thread(target=batch_consumer.accepting_messages)
    speed_consumer_thread = threading.Thread(target=speed_consumer.poll)
    batch_consumer_thread.start()
    speed_consumer_thread.start()

    batch_consumer_thread.join()
    speed_consumer_thread.join()

    time.sleep(1)

    start_date2 = datetime(2019, 11, 1)
    end_date2 = datetime(2019, 11, 8)
    
    trending_result = batch_consumer.trending_logic(start_date2, end_date2)
    trending_percentages, product_ids = trending_result
    
    serving_layer.storing_results(product_ids, trending_percentages)
    batch_results = serving_layer.batch_results_getter(start_date2, end_date2)

    
    for product_increase in batch_results:
        print(product_increase)
    
    print("")
    
    try:
        print(f"Last possible 50 brands for user 389123726\n{speed_consumer.return_user_activity(389123726)}")
        print(f"Last possible 50 brands for user 566943543\n{speed_consumer.return_user_activity(568749567)}")
        print(f"Last possible 50 brands for user 566943992\n{speed_consumer.return_user_activity(555444394)}")
        print(f"Last possible 50 brands for user 566898073\n{speed_consumer.return_user_activity(308571663)}")
        print(f"Last possible 50 brands for user 548498785\n{speed_consumer.return_user_activity(564451209)}")
        print(f"Last possible 50 brands for user 548498785\n{speed_consumer.return_user_activity(466373304)}")
        print(f"Last possible 50 brands for user 548498785\n{speed_consumer.return_user_activity(558282780)}")

    except KeyError:
        print("Unable to find one or more of the requested users!")


    # Extracting the user activity data
    user_activities = {
        389123726: speed_consumer.return_user_activity(389123726),
        568749567: speed_consumer.return_user_activity(568749567),
        555444394: speed_consumer.return_user_activity(555444394),
        308571663: speed_consumer.return_user_activity(308571663),
        564451209: speed_consumer.return_user_activity(564451209),
        466373304: speed_consumer.return_user_activity(466373304),
        558282780: speed_consumer.return_user_activity(558282780),
    }

    def get_brand_from_product_id(product_id):
        analysis_passed_products = nov_df_filtered[nov_df_filtered["product_id"] == product_id] 
        target_brand = analysis_passed_products['brand'].iloc[0]
    
        return target_brand
    
    # Iterate through user activities and find recommendations
    for user_id, user_brands in user_activities.items():
        recommended_products = set()

        for brand in user_brands:
            for product_id in product_ids:
                product_brand = get_brand_from_product_id(product_id)

                if brand == product_brand:
                    recommended_products.add(product_id)

        if recommended_products:
            print(f"\nFor user {user_id}:")
            for product_id in recommended_products:
                increase_percentage = serving_layer.batch_precomputed_results.get(product_id, "N/A")
                print(f"Recommended product {product_id} - Increase of {increase_percentage}%")
        else:
            print(f"\nNo recommendations for user {user_id}\n")

