# Advanced Data Pipelines & Real-Time Analytics

Diese Übersicht zeigt die wichtigsten Bausteine für eine skalierbare, moderne Dateninfrastruktur mit Streaming, Batch, Lakehouse und Self-Service Analytics.


In [None]:
# Kafka-Topics für Raw- und Aggregatdaten anlegen
kafka-topics.sh --create --topic orders_raw --partitions 12 --replication-factor 3 --bootstrap-server kafka:9092
kafka-topics.sh --create --topic orders_aggregates --partitions 6 --replication-factor 3 --bootstrap-server kafka:9092

In [None]:
// Flink Streaming-Job: Order Amount Aggregator
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import org.apache.flink.api.common.serialization._

case class OrderEvent(orderId: String, userId: String, amount: Double, ts: Long)

object OrderCountJob {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val props = // kafka properties
    val consumer = new FlinkKafkaConsumer[OrderEvent]("orders_raw", new JSONDeserializationSchema[OrderEvent](), props)
    val stream = env.addSource(consumer)
      .keyBy(_.userId)
      .window(TumblingEventTimeWindows.of(Time.minutes(1)))
      .sum("amount")
    stream.addSink(new FlinkKafkaProducer[OrderEvent]("orders_aggregates", new JSONSerializationSchema[OrderEvent](), props))
    env.execute("Order Amount Aggregator")
  }
}

In [None]:
# Airflow DAG für Delta Lake Batch-Job
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime, timedelta

default_args = {'start_date': datetime(2025,6,30), 'retries': 1, 'retry_delay': timedelta(minutes=10)}
with DAG('delta_batch', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:
    run_spark = SparkSubmitOperator(
        task_id='run_delta_job',
        application='/opt/airflow/scripts/run_delta_job.py',
        conf={'spark.master':'local[*]'},
        deploy_mode='cluster'
    )

In [None]:
# Delta Lake Spark Job: User-Aggregation
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("delta_batch").getOrCreate()
df = spark.read.format("delta").load("s3://data/raw/orders/")
agg = df.groupBy("userId").agg({"amount":"sum"}).withColumnRenamed("sum(amount)","total_amount")
agg.write.format("delta").mode("overwrite").save("s3://data/processed/orders/")
spark.stop()

In [None]:
-- dbt Model: orders_summary
with raw as (
  select * from delta.`s3://data/processed/orders/`
)
select
  userId,
  total_amount,
  rank() over (order by total_amount desc) as user_rank
from raw

## Hinweise zu Serving, Monitoring & Data Quality

- **Serving Layer:** Grafana visualisiert Echtzeitdaten aus Kafka/InfluxDB, Superset/Jupyter für Ad-hoc-Analysen auf Delta Lake via Trino.
- **Monitoring:** Flink/Airflow/Batch-Jobs mit Prometheus/Grafana überwachen, Alerting bei Lags oder Fehlern.
- **Data Quality:** Great Expectations-Checks in Airflow/Batch und dbt-Tests für alle Modelle integrieren.
- **Self-Service:** dbt, Superset und JupyterHub für explorative Analysen bereitstellen.