# Bronze: Ingest data
The bronze lazer of the [medallion architecture](https://www.databricks.com/glossary/medallion-architecture) is mainly responsible to ingest data from different sources.
The logic below reads "streaming" data via a Kafka Topic.

## Query the API and publish to the topic
Start the **weather_producer** to query the open API: open-meteo.com to generate new messages for the given Kafka topic.

**NOTE**: Ensure the topic **weather-data-pipeline** is available!

## Consume from Kafka topic
The approch is to "listen" to data in the topic and store the data in [parquet](https://parquet.apache.org/) files on the storage layer.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType
import boto3
from io import BytesIO
import datetime

# Kafka & MinIO definitions
# -------------------------------------------------------------------------------------------------
KAFKA_BROKER = "kafka:9092"
TOPIC = "weather-data-pipeline"

MINIO_ENDPOINT = "http://minio:9000"
MINIO_ACCESS_KEY = "admin"
MINIO_SECRET_KEY = "password"
BUCKET_NAME = "weather-data"

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("KafkaWeatherToMinIOOptimized") \
    .getOrCreate()

# Initialize MinIO client
s3_client = boto3.client(
    "s3",
    endpoint_url=MINIO_ENDPOINT,
    aws_access_key_id=MINIO_ACCESS_KEY,
    aws_secret_access_key=MINIO_SECRET_KEY
)

# Create bucket if it doesn't exist
def init_bucket(bucket_name):
    buckets = [b['Name'] for b in s3_client.list_buckets().get('Buckets', [])]
    if bucket_name not in buckets:
        s3_client.create_bucket(Bucket=bucket_name)

init_bucket(BUCKET_NAME)

# Define schema for Kafka 'value' field
weather_schema = StructType() \
    .add("timestamp", StringType()) \
    .add("temperature", DoubleType()) \
    .add("humidity", DoubleType()) \
    .add("wind_speed", DoubleType())

# Read Stream from Kafka
df_raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", TOPIC) \
    .option("startingOffsets", "earliest") \
    .load()

# Deserialize JSON from Kafka 'value'
df_parsed = df_raw.selectExpr("CAST(value AS STRING) as json_str") \
    .select(from_json(col("json_str"), weather_schema).alias("data")) \
    .select(
        col("data.timestamp"),
        col("data.temperature"),
        col("data.humidity"),
        col("data.wind_speed")
    )

# Write to MinIO using foreachBatch
def write_to_minio(batch_df, batch_id):
    if batch_df.isEmpty():
        return

    batch_df = batch_df.withColumn("timestamp", col("timestamp").cast(TimestampType()))

    pdf = batch_df.toPandas()

    now = datetime.datetime.now(datetime.timezone.utc)
    current_date = now.strftime("%Y-%m-%d")
    parquet_key = f"bronze/{current_date}/weather_{now.timestamp()}.parquet"

    buffer = BytesIO()
    pdf.to_parquet(buffer, engine="pyarrow", index=False)
    s3_client.put_object(Bucket=BUCKET_NAME, Key=parquet_key, Body=buffer.getvalue())

    print(f"[{datetime.datetime.now()}] Stored {len(pdf)} records to MinIO at: {parquet_key}")

# Start the streaming query with 5 seconds trigger
query = df_parsed.writeStream \
    .foreachBatch(write_to_minio) \
    .outputMode("append") \
    .trigger(processingTime="5 seconds") \
    .start()

query.awaitTermination(30)

[2025-05-30 17:12:44.282315] Stored 96 records to MinIO at: bronze/2025-05-30/weather_1748625164.205021.parquet


False

## Shutdown the query
You can have a look and view the monitoring of spark jobs with the Spark UI: [http://localhost:4040](http://localhost:4040)

In [2]:
print("Is active:", query.isActive)
query.stop()
print("Is active:", query.isActive)

Is active: True
Is active: False
