In [None]:
%%file stream.py
import json
import random
import sys
from datetime import datetime, timedelta
from time import sleep

from kafka import KafkaProducer

KAFKA_TOPIC = 'urzedy'
SERVER = "broker:9092"
LAG = 3

# Warszawa districts and streets
warszawa_districts_and_streets = {
    "Śródmieście": ["Marszałkowska", "Nowy Świat"],
    "Wola": ["Towarowa", "Żelazna"],
    "Mokotów": ["Puławska", "Racławicka"],
    "Praga-Południe": ["Grochowska", "Waszyngtona"],
    "Ursynów": ["al. KEN", "Wynalazek"]
}

# Urzedy i kolejki
office_types = {
    "Ewidencja ludności": ["Dowody osobiste", "Wybory"],
    "Rejestracja pojazdów": [],
    "Prawa jazdy": ["Wnioski o prawo jazdy", "Odbiór prawa jazdy"],
    "Architektura i infrastruktura": [],
    "Działalność gospodarcza": [],
    "Cudzoziemcy": [],
    "Podatki": []
}

# Create combinations of districts and streets
district_street_combinations = [(district, street) for district in warszawa_districts_and_streets for street in warszawa_districts_and_streets[district]]
district_street_index = 0

# Create combinations of urzedy and kolejki
office_types_combinations = [(office, type) for office in office_types for type in office_types[office] or [office]]
office_types_index = 0

if __name__ == "__main__":
    
    producer = KafkaProducer(
        bootstrap_servers=[SERVER],
        value_serializer=lambda x: json.dumps(x, ensure_ascii=False).encode("utf-8"), #polskie znaki
        api_version=(3, 7, 0),
    )
    
    try:
        while True:
            t = datetime.now() + timedelta(seconds=random.randint(-15, 0))
            
            # Generate data for district and street
            district, street = district_street_combinations[district_street_index]
            district_street_index = (district_street_index + 1) % len(district_street_combinations)
            
            office, type = office_types_combinations[office_types_index]
            office_types_index = (office_types_index + 1) % len(office_types_combinations)
            
            queue_length = random.randint(0, 50)
            estimated_wait_time = queue_length * 2  # średnio 2 minuty na osobę
            
            message = {
                "Godzina": str(t),
                "Miasto": "Warszawa",
                "Dzielnica": district,
                "Rodzaj_sprawy": office,
                "Stanowisko": type,
                "Długość_kolejki": queue_length,
                "Czas_oczekiwania": estimated_wait_time  # Estymowany czas oczekiwania w minutach
            }
            
            producer.send(KAFKA_TOPIC, value=message)
            
            sleep(LAG)
    except KeyboardInterrupt:
        producer.close()
