In [0]:
# -----------------------------
# KONFIGURACJA (NO HARD-CODING)
# -----------------------------
dbutils.widgets.text("input_path", "/databricks-datasets/retail-org/customers/customers.csv")
dbutils.widgets.text("target_table", "default.customers_clean")
input_path = dbutils.widgets.get("input_path")
target_table = dbutils.widgets.get("target_table")

# -----------------------------
# IMPORTY I LOGOWANIE
# -----------------------------
import logging
from pyspark.sql.functions import col, current_timestamp
from pyspark.sql.utils import AnalysisException

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("PipelineLogger")

# -----------------------------
# 1. MODUŁ - Wczytanie danych
# -----------------------------
def load_data(path):
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(path)
    logger.info(f"Dane wczytane z {path}, liczba rekordów: {df.count()}")
    return df

# -----------------------------
# 2. MODUŁ - Walidacja danych
# -----------------------------
def validate_data(df):
    assert "customer_id" in df.columns, "Brakuje kolumny customer_id"
    assert df.filter(col("loyalty_segment").isNull()).count() == 0, "Brakuje danych w loyalty_segment"
    return df

# -----------------------------
# 3. MODUŁ - Transformacja
# -----------------------------
def transform_data(df):
    return df.withColumn("processed_at", current_timestamp())

# -----------------------------
# 4. MODUŁ - Zapis idempotentny (MERGE)
# -----------------------------
def write_data_merge(df, target_table):
    from delta.tables import DeltaTable
    from pyspark.sql.utils import AnalysisException

    try:
        delta_table = DeltaTable.forName(spark, target_table)
        existing_columns = set(delta_table.toDF().columns)
        incoming_columns = set(df.columns)

        if existing_columns != incoming_columns:
            print(f"Schemat tabeli {target_table} nie pasuje. Nadpisuję tabelę.")
            spark.sql(f"DROP TABLE IF EXISTS {target_table}")
            df.write.format("delta").saveAsTable(target_table)
        else:
            (delta_table.alias("t")
             .merge(
                df.alias("s"),
                "t.customer_id = s.customer_id"
             )
             .whenMatchedUpdateAll()
             .whenNotMatchedInsertAll()
             .execute())
            print(f"Wykonano MERGE do tabeli {target_table}")
    except AnalysisException:
        df.write.format("delta").saveAsTable(target_table)
        print(f"Utworzono nową tabelę {target_table}")

def deduplicate_on_key(df, key_column):
    return df.dropDuplicates([key_column])




# -----------------------------
# PIPELINE
# -----------------------------
raw_df = load_data(input_path)
valid_df = validate_data(raw_df)
transformed_df = transform_data(valid_df)
dedup_df = deduplicate_on_key(transformed_df, "customer_id")
write_data_merge(dedup_df, target_table)


INFO:PipelineLogger:Dane wczytane z /databricks-datasets/retail-org/customers/customers.csv, liczba rekordów: 28813


Wykonano MERGE do tabeli default.customers_clean
