In [None]:
import json
import pandas as pd
from tqdm import tqdm
from kafka import KafkaProducer
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

from features.user import generate_users
from features.item import get_items
from features.feedback import generate_feedbacks

es_client = Elasticsearch(
    "http://localhost:9200",
    basic_auth=("elastic", "password"),
    verify_certs=False,
    ssl_show_warn=False
)

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def generate_index(index_name, df):
    for index, row in df.iterrows():
        yield {
            "_index": index_name,
            "_id": index,
            "_source": row.to_dict()
        }

def simulate_interactions():
    user_data = generate_users(1000)
    user_df = pd.DataFrame(user_data)
    item_data = get_items(1000)
    item_df = pd.DataFrame(item_data)
    feedbacks = generate_feedbacks(10000, user_data, item_data)
    feedbacks_df = pd.DataFrame(feedbacks)

    success1, _ = bulk(es_client, generate_index("user", user_df))
    success2, _ = bulk(es_client, generate_index("item", item_df))
    success3, _ = bulk(es_client, generate_index("interaction", feedbacks_df))
    print(f"Successfully indexed {success1} users,  {success1} items,  {success3} interactions")
    
    feedbacks_df['json'] = feedbacks_df.apply(lambda x: x.to_json(), axis=1)
    
    return [json.loads(i) for i in feedbacks_df.json.values]

interactions_data = simulate_interactions()

for interaction in tqdm(interactions_data, desc="sending feedback data"):
    producer.send("feedback_update", interaction)
    producer.flush()

producer.close()