# Bronze: Ingest data
The bronze lazer of the [medallion architecture](https://www.databricks.com/glossary/medallion-architecture) is mainly responsible to ingest data from different sources.
The logic below reads "streaming" data via a Kafka Topic.

In [1]:
# Kafka defintions
# -------------------------------------------------------------------------------------------------
KAFKA_BROKER = "kafka:9092"
TOPIC = "weather-data"
GROUP_ID = "weather-data-bronze-loop" # change the group-id to ingest the whole data again

# MinIO Configuration
# -------------------------------------------------------------------------------------------------
MINIO_ENDPOINT = "http://minio:9000"
# Danger-Zone (https://www.youtube.com/watch?v=siwpn14IE7E)
# Typicalla a kind of vault would be used e.g. https://azure.microsoft.com/en-us/products/key-vault, ...
MINIO_ACCESS_KEY = "admin"
MINIO_SECRET_KEY = "password"
BUCKET_NAME = "weather-data"

# Weather API
# -------------------------------------------------------------------------------------------------
FORECAST_DAYS=4

## Query the API and publish to the topic

In [None]:
from confluent_kafka import Producer
import json
import time
import requests

# Initialize Kafka producer
producer = Producer({'bootstrap.servers': KAFKA_BROKER})

# Callback to confirm delivery
def delivery_report(err, msg):
    if err is not None:
        print(f"Delivery failed: {err}")
    else:
        print(f"Sent: {msg.value().decode('utf-8')}")

# Function to fetch weather data
def fetch_weather_data():
    url = 'https://api.open-meteo.com/v1/forecast'
    params = {
        'latitude': 47.72,  # Puch Urstein!
        'longitude': 13.09, # Puch Urstein!
        'hourly': 'temperature_2m,relative_humidity_2m,wind_speed_10m',
        'timezone': 'UTC',
        'forecast_days': FORECAST_DAYS # look into the future
    }
    response = requests.get(url, params=params)
    return response.json()

# Fetch and send data to Kafka
weather_data = fetch_weather_data()

hourly_data = weather_data['hourly']
for i, timestamp in enumerate(hourly_data['time']):
    record = {
        'timestamp': timestamp,
        'temperature': hourly_data['temperature_2m'][i],
        'humidity': hourly_data['relative_humidity_2m'][i],
        'wind_speed': hourly_data['wind_speed_10m'][i]
    }
    payload = json.dumps(record).encode('utf-8')
    producer.produce(TOPIC, value=payload, callback=delivery_report)
    producer.poll(0)  # Trigger delivery report callbacks
    time.sleep(1)  # Simulate real-time data streaming

producer.flush()

## Consume from Kafka topic
The approch is to "listen" to data in the queue in batch-sizes and store the data in [parquet](https://parquet.apache.org/) files on the storage layer.

In [None]:
from confluent_kafka import Consumer
import pandas as pd
import json
from io import BytesIO
import datetime
import boto3


# Initialize Kafka Consumer
consumer = Consumer({
    'bootstrap.servers': KAFKA_BROKER,
    'group.id': GROUP_ID,
    'auto.offset.reset': 'earliest'
})
consumer.subscribe([TOPIC])

# Initialize MinIO Client
s3_client = boto3.client(
    "s3",
    endpoint_url=MINIO_ENDPOINT,
    aws_access_key_id=MINIO_ACCESS_KEY,
    aws_secret_access_key=MINIO_SECRET_KEY
)
s3_resource = boto3.resource('s3', 
    endpoint_url=MINIO_ENDPOINT,
     aws_access_key_id=MINIO_ACCESS_KEY,
    aws_secret_access_key=MINIO_SECRET_KEY
)

# Continuous ingestion loop
print("Listening for messages...")
batch = []

# ensure storage bucket is available
def init_bucket(bucket_name):
    create_bucket=True
    for b in s3_resource.buckets.all():
        if b.name == bucket_name:
            create_bucket = False
    if create_bucket is True:
        s3_client.create_bucket(Bucket=bucket_name)


init_bucket(BUCKET_NAME)

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print(f"Error: {msg.error()}")
            continue

        record = json.loads(msg.value().decode("utf-8"))
        print(f"Received: {record}")
        batch.append(record)

        # Flush batch every 24 records
        if len(batch) >= 24:
            df = pd.DataFrame(batch)
            df["timestamp"] = pd.to_datetime(df["timestamp"], format='mixed', utc=True)

            # Partition by current UTC date
            now = datetime.datetime.now(datetime.timezone.utc)
            current_date = now.strftime("%Y-%m-%d")
            parquet_key = f"bronze/{current_date}/weather_{now.timestamp()}.parquet"

            # Save to MinIO
            buffer = BytesIO()
            df.to_parquet(buffer, engine="pyarrow", index=False)
            
            s3_client.put_object(Bucket=BUCKET_NAME, Key=parquet_key, Body=buffer.getvalue())
            print(f"Stored {len(batch)} records to MinIO at: {parquet_key}")

            batch = []

except KeyboardInterrupt:
    print("Stopping consumer...")

finally:
    consumer.close()
