In [20]:
pip install kafka-python

Collecting kafka-python
  Using cached kafka_python-2.2.3-py2.py3-none-any.whl.metadata (10.0 kB)
Downloading kafka_python-2.2.3-py2.py3-none-any.whl (307 kB)
Installing collected packages: kafka-python
Successfully installed kafka-python-2.2.3
Note: you may need to restart the kernel to use updated packages.


In [22]:
!pip install kafka-python



In [28]:
import kafka
print(kafka.__version__)

2.2.3


In [30]:
pip install --user kafka-python

Note: you may need to restart the kernel to use updated packages.


In [38]:
import json
import requests
import sqlite3
import time
from kafka import KafkaConsumer, KafkaProducer
from datetime import datetime

# Konfiguracja - dostosuj jeśli Kafka nie działa na localhost
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
HYDRO_TOPIC = 'imgw-hydro-data'
DATABASE_NAME = 'imgw_hydro_data.db'
API_URL = 'https://danepubliczne.imgw.pl/api/data/hydro2/'

def wait_for_kafka(max_retries=5, delay=5):
    """Czeka na dostępność brokera Kafka"""
    for i in range(max_retries):
        try:
            producer = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)
            producer.close()
            return True
        except:
            print(f"Próba {i+1}/{max_retries} - Broker niedostępny, czekam {delay}s...")
            time.sleep(delay)
    return False

def create_database():
    """Tworzy bazę danych SQLite i tabelę dla danych hydrologicznych"""
    conn = sqlite3.connect(DATABASE_NAME)
    cursor = conn.cursor()
    
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS hydro_data (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        station_id TEXT,
        station_name TEXT,
        river TEXT,
        water_level REAL,
        water_status TEXT,
        measurement_date TEXT,
        timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
    )
    ''')
    
    conn.commit()
    conn.close()

def fetch_hydro_data():
    """Pobiera dane hydrologiczne z API IMGW"""
    try:
        response = requests.get(API_URL)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Błąd podczas pobierania danych: {e}")
        return None

def process_and_save_data(data):
    """Przetwarza i zapisuje dane do bazy danych"""
    if not data:
        return
    
    conn = sqlite3.connect(DATABASE_NAME)
    cursor = conn.cursor()
    
    for record in data:
        try:
            cursor.execute('''
            INSERT INTO hydro_data 
            (station_id, station_name, river, water_level, water_status, measurement_date)
            VALUES (?, ?, ?, ?, ?, ?)
            ''', (
                record.get('id_stacji'),
                record.get('stacja'),
                record.get('rzeka'),
                record.get('stan_wody'),
                record.get('stan_wody_status'),
                record.get('data_pomiaru')
            ))
        except Exception as e:
            print(f"Błąd podczas przetwarzania rekordu: {e}")
    
    conn.commit()
    conn.close()
    print(f"Zapisano {len(data)} rekordów do bazy danych")

def kafka_producer():
    """Producent Kafka wysyłający dane hydrologiczne"""
    if not wait_for_kafka():
        print("Nie można połączyć się z brokerem Kafka")
        return
    
    producer = KafkaProducer(
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    
    data = fetch_hydro_data()
    if data:
        producer.send(HYDRO_TOPIC, value=data)
        producer.flush()
        print("Dane wysłane do Kafki")

def kafka_consumer():
    """Konsument Kafka odbierający i zapisujący dane"""
    if not wait_for_kafka():
        print("Nie można połączyć się z brokerem Kafka")
        return
    
    consumer = KafkaConsumer(
        HYDRO_TOPIC,
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        value_deserializer=lambda x: json.loads(x.decode('utf-8')))
    
    print("Konsument uruchomiony, oczekiwanie na dane...")
    for message in consumer:
        data = message.value
        print(f"Odebrano {len(data)} rekordów z Kafki")
        process_and_save_data(data)

if __name__ == '__main__':
    create_database()
    
    # Wybierz tryb działania: 'producer' lub 'consumer'
    mode = 'consumer'  # Zmień na 'producer' aby wysyłać dane
    
    if mode == 'producer':
        kafka_producer()
    elif mode == 'consumer':
        kafka_consumer()

Próba 1/5 - Broker niedostępny, czekam 5s...
Próba 2/5 - Broker niedostępny, czekam 5s...
Próba 3/5 - Broker niedostępny, czekam 5s...
Próba 4/5 - Broker niedostępny, czekam 5s...
Próba 5/5 - Broker niedostępny, czekam 5s...
Nie można połączyć się z brokerem Kafka
