<a href="https://colab.research.google.com/github/ferygood/LLM_behavior_prediction/blob/main/02_real_time_user_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install kafka-python

First, you have to install and run Kafka ([Quick Start](https://kafka.apache.org/quickstart) ).


1.   Kafka has reset mechanism which prevent data lost when sending and receving.
2.   Kafka has partitions to handle large data size
3.   kafka will keep the data from some time to prevent data lost.



In [None]:
from kafka import KafkaProducer
import json
import time
import numpy as np
import pandas as pd

# configure Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# simulate sending data to Kafka Producer
def generate_and_send_data():
    while True:
        web_visit = {
            'user_id': int(np.random.randint(1, 1000)),
            'visit_time': pd.Timestamp.now().isoformat(),
            'page_url': np.random.choice(['home', 'product', 'cart', 'checkout']),
            'referrer_url': np.random.choice(['google', 'facebook', 'twitter', 'direct'])
        }

        purchase = {
            'user_id': int(np.random.randint(1, 1000)),
            'purchase_time': pd.Timestamp.now().isoformat(),
            'product_id': int(np.random.randint(1, 100)),
            'amount': float(np.random.uniform(10, 500))
        }

        social_interaction = {
            'user_id': int(np.random.randint(1, 1000)),
            'interaction_time': pd.Timestamp.now().isoformat(),
            'platform': np.random.choice(['facebook', 'twitter', 'instagram']),
            'action': np.random.choice(['like', 'share', 'comment'])
        }

        producer.send('web_visit_topic', web_visit)
        producer.send('purchase_topic', purchase)
        producer.send('social_interaction_topic', social_interaction)

        time.sleep(1)  # give a timeframe for generating data

# generate and send data
generate_and_send_data()


Received data from Kafka and pre-process the data

In [None]:
from kafka import KafkaConsumer

# set Kafka Consumer
consumer_web_visit = KafkaConsumer(
    'web_visit_topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

consumer_purchase = KafkaConsumer(
    'purchase_topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

consumer_social_interaction = KafkaConsumer(
    'social_interaction_topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# pre-process the data
def process_data():
    web_visit_data = []
    purchase_data = []
    social_interaction_data = []

    for message in consumer_web_visit:
        web_visit_data.append(message.value)

        # create tome feature column
        if len(web_visit_data) >= 10:
            df = pd.DataFrame(web_visit_data)
            df.drop_duplicates(inplace=True)
            df['visit_time'] = pd.to_datetime(df['visit_time'])
            df['visit_date'] = df['visit_time'].dt.date
            df['visit_hour'] = df['visit_time'].dt.hour
            print("Processed web visit data:\n", df.head())
            web_visit_data = []

    for message in consumer_purchase:
        purchase_data.append(message.value)

        if len(purchase_data) >= 10:
            df = pd.DataFrame(purchase_data)
            df.drop_duplicates(inplace=True)
            df['purchase_time'] = pd.to_datetime(df['purchase_time'])
            df['purchase_date'] = df['purchase_time'].dt.date
            df['purchase_hour'] = df['purchase_time'].dt.hour
            print("Processed purchase data:\n", df.head())
            purchase_data = []

    for message in consumer_social_interaction:
        social_interaction_data.append(message.value)

        if len(social_interaction_data) >= 10:
            df = pd.DataFrame(social_interaction_data)
            df.drop_duplicates(inplace=True)
            df['interaction_time'] = pd.to_datetime(df['interaction_time'])
            df['interaction_date'] = df['interaction_time'].dt.date
            df['interaction_hour'] = df['interaction_time'].dt.hour
            print("Processed social interaction data:\n", df.head())
            social_interaction_data = []

# pre-process data
process_data()
