In [1]:
from kafka import KafkaProducer
import json

In [2]:
# Kafka configuration
KAFKA_TOPIC = 'lastStream'
KAFKA_SERVER = 'localhost:9092'

# Initialize Kafka producer
producer = KafkaProducer(
    bootstrap_servers=KAFKA_SERVER,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),  # Serialize data as JSON
    key_serializer=lambda k: str(k).encode('utf-8')  # Optional: Serialize key as a string
)

In [3]:
# # Produce messages
# try:
#     for i in range(10):
#         key = f"key-{i}"
#         value = {"id": i, "message": f"Message {i}"}
#         producer.send(KAFKA_TOPIC, key=key, value=value)
#         print(f"Produced message: {key} -> {value}")

#     # Ensure all messages are sent before closing
#     producer.flush()
# except Exception as e:
#     print(f"Error while producing messages: {e}")
# finally:
#     producer.close()

In [4]:
import time
import threading
import random
import os
from datetime import datetime, timedelta

countries = [
    "USA", "India", "UK", "Canada", "Germany", "Australia", "France", "Brazil", "Japan", "South Korea",
    "Italy", "Mexico", "China", "Russia", "Spain", "South Africa", "Netherlands", "Sweden", "Turkey",
    "Argentina", "Egypt", "Saudi Arabia", "Nigeria", "Vietnam", "Thailand", "Malaysia", "Singapore",
    "Indonesia", "Philippines", "Poland", "Belgium", "Switzerland", "Denmark", "Norway", "Finland", "Greece",
    "Chile", "New Zealand", "Ireland", "Portugal", "Romania", "Ukraine"
]

categories = [
    "Gaming", "Educational", "Entertainment", "Music", "Sports", "Travel", "Technology", "Food", "Health",
    "Fashion", "DIY", "Comedy", "Vlogs", "Movies", "News", "Science", "Politics", "Animals", "Nature",
    "Lifestyle", "Fitness", "Art", "Photography", "Crafts", "Books", "History", "Language Learning",
    "Architecture", "Business", "Finance", "Gaming Reviews", "Tech Reviews", "Food Reviews", "Productivity",
    "Parenting", "Relationships", "Mental Health", "Motivation", "Startup", "World Events"
]

channels = [
    "TechWorld", "GamingMaster", "ExploreLife", "FitnessGuru", "MusicVibes", "EduTutorials", "CreativeMinds",
    "SportPro", "TravelAdventures", "FoodieDelight", "ScienceExplained", "HistoryHub", "CookingJourney",
    "GamerZone", "TechSavvy", "VlogLife", "DailyFitness", "MusicBeats", "TravelVloggers", "HealthyLiving",
    "GamingElite", "ArtisticVision", "SportXtreme", "TechTales", "AdventureSeekers", "DigitalNomads",
    "GastronomyMaster", "LifestyleVibes", "GamerVision", "ExplorationStation", "FitnessNation", "MusicMasters",
    "EdutainmentWorld", "CraftingLife", "ViralVibes", "MindsetMotivation", "TheTechSpot", "HealthyBites",
    "TheFitnessFactor", "TheCookingCorner", "TravelEssentials", "AdventureWorld", "TechForBeginners", "GamingLegends"
]

chunks = []
chunks_lock = threading.Lock()  
stop_event = threading.Event()  

def generate_random_record():
    channel_name = random.choice(channels)
    country = random.choice(countries)
    category = random.choice(categories)
    
    likes = random.randint(100, 50000)
    views = random.randint(likes * 2, likes * 50)  
    
    subscribers = random.randint(views // 10, views * 2)  

    comments = random.randint(likes // 10, likes)

    start_date = datetime.now() - timedelta(days=5 * 365)
    random_date = start_date + timedelta(days=random.randint(0, 5 * 365))
    video_date = random_date.strftime('%Y-%m-%d')

    return {
        "Video ID": f"vid_{random.randint(1, 1000):05d}",
        "Channel Name": channel_name,
        "Country": country,
        "Category": category,
        "Comments": comments,
        "Subscribers": subscribers,
        "Views": views,
        "Likes": likes,
        "Date": video_date
    }

def generate_data(num_records):
    global chunks
    with chunks_lock:
        chunks = [generate_random_record() for _ in range(num_records)]

def process_chunks():
    global chunks

    while not stop_event.is_set():
        with chunks_lock:
            if not chunks:
                break

            chunk_size = 8
            current_chunk = chunks[:chunk_size]
            chunks = chunks[chunk_size:]

        random.shuffle(current_chunk)

        with open("processed_videos.txt", "a") as file:
            key=0
            for record in current_chunk:
                formatted_data = (
                    f"{record['Video ID']}, {record['Channel Name']}, {record['Country']}, {record['Category']}, "
                    f"{record['Comments']}, {record['Subscribers']}, {record['Views']}, {record['Likes']}, {record['Date']}"
                )
                file.write(formatted_data + "\n")
                producer.send(KAFKA_TOPIC, key=key, value=formatted_data)
                
                key += 1
        time.sleep(2)

def Q():
    while True:
        user_input = input()
        if user_input.lower() == 'q':  
            stop_event.set()
            print("Stopping processing")

generate_data(1000)
worker_thread = threading.Thread(target=process_chunks, daemon=True)
worker_thread.start()

input_thread = threading.Thread(target=Q, daemon=True)
input_thread.start()
    
try:
    while worker_thread.is_alive():
        time.sleep(1)
except KeyboardInterrupt:
    print("Stopping processing")
    stop_event.set()
    worker_thread.join()


In [None]:
            # key=0
            # for record in current_chunk:
            #     formatted_data = (
            #         f"{record['Video ID']}, {record['Channel Name']}, {record['Country']}, {record['Category']}, "
            #         f"{record['Comments']}, {record['Subscribers']}, {record['Views']}, {record['Likes']}"
            #     )
            #     file.write(formatted_data + "\n")
            #     producer.send(KAFKA_TOPIC, key=key, value=formatted_data)
            #     key += 1

            #     # print(formatted_data)
