In [2]:
%run nb_logging

StatementMeta(, 7ed8ddf9-78a0-4692-98bf-78e268e79313, 8, Finished, Available, Finished)

nb_config Loaded
nb_logging Loaded


In [3]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import *
import re

spark = SparkSession.builder.getOrCreate()

StatementMeta(, 7ed8ddf9-78a0-4692-98bf-78e268e79313, 9, Finished, Available, Finished)

# --- Temizlik ve Format Fonksiyonları ---

In [4]:
def clean_column_names(df: DataFrame) -> DataFrame:
    """Kolon isimlerini snake_case formatına çevirir ve özel karakterleri temizler."""
    def to_snake_case(name: str) -> str:
        name = re.sub(r"[^a-zA-Z0-9]", "_", name)
        name = re.sub(r"_+", "_", name)
        return name.strip("_").lower()

    new_columns = [to_snake_case(c) for c in df.columns]
    return df.toDF(*new_columns)

StatementMeta(, 7ed8ddf9-78a0-4692-98bf-78e268e79313, 10, Finished, Available, Finished)

In [5]:
def set_datatype(data: DataFrame, datatype_mapping: dict) -> DataFrame:
    """Belirtilen kolonların veri tiplerini değiştirir."""
    for col_name, new_type in datatype_mapping.items():
        if col_name in data.columns:
            data = data.withColumn(col_name, col(col_name).cast(new_type))
    return data

StatementMeta(, 7ed8ddf9-78a0-4692-98bf-78e268e79313, 11, Finished, Available, Finished)

# --- Explode ve Flatten İşlemleri ---

In [6]:
def explode_col(data: DataFrame, col_name: str, prefix: str = "", remove_original: bool = True):
    """Array kolonunu patlatır (explode)."""
    if col_name not in data.columns: return data
    new_col = f"{prefix}{col_name}"
    data = data.withColumn(new_col, explode(col_name))
    if remove_original and new_col != col_name:
        data = data.drop(col_name)
    return data

StatementMeta(, 7ed8ddf9-78a0-4692-98bf-78e268e79313, 12, Finished, Available, Finished)

In [7]:
def flatten_struct_col(data: DataFrame, col_name: str, prefix: str = "", remove_original: bool = True):
    """Struct kolonunu düzleştirir."""
    if col_name not in data.columns: return data
    
    # Struct içindeki alanları al
    try:
        schema = data.select(col_name).schema.fields[0].dataType
        flatten_expression = [col(col_name + "." + n.name).alias(prefix + n.name) for n in schema]
        data = data.select(["*"] + flatten_expression)
        if remove_original:
            data = data.drop(col_name)
    except Exception as e:
        print(f"Flatten hatası ({col_name}): {e}")
        
    return data

StatementMeta(, 7ed8ddf9-78a0-4692-98bf-78e268e79313, 13, Finished, Available, Finished)

In [8]:
def zip_explode_and_flatten(data: DataFrame, parent_col: str, children_cols: list):
    """
    Weather ve AirQuality verileri için: Array'leri zipler ve düzleştirir.
    [FIELD_NOT_FOUND] hatasını engellemek için geliştirilmiş versiyon.
    """
    # 1. Çocuk kolonların tam yollarını oluştur
    cols_to_zip = [col(f"{parent_col}.{c}") for c in children_cols]
    
    # 2. arrays_zip ile birleştir
    data = data.withColumn("zipped_temp", arrays_zip(*cols_to_zip))
    
    # 3. Explode et
    data = data.withColumn("exploded_temp", explode("zipped_temp"))
    
    # 4. Sayısal indeks yerine doğrudan kolon isimleriyle erişelim
    for child_name in children_cols:
        # exploded_temp içindeki child_name alanını dışarı çıkar
        data = data.withColumn(child_name, col(f"exploded_temp.{child_name}"))
        
    # 5. Temizlik
    data = data.drop("zipped_temp", "exploded_temp", parent_col)
    return data

StatementMeta(, 7ed8ddf9-78a0-4692-98bf-78e268e79313, 14, Finished, Available, Finished)

# --- Hash ve Key İşlemleri ---

In [9]:
def hash_column(objecttype_col, bron_col, lokaal_id_col):
    """Benzersiz ID (Primary Key) oluşturur."""
    return md5(concat_ws("||", objecttype_col, bron_col, lokaal_id_col))

def select_columns_safe(df: DataFrame, columns: list) -> DataFrame:    
    """Sadece var olan kolonları seçer, hata vermez."""
    existing_cols = [c for c in columns if c in df.columns]
    return df.select(*existing_cols)

StatementMeta(, 7ed8ddf9-78a0-4692-98bf-78e268e79313, 15, Finished, Available, Finished)

# --- Tarih İşlemleri ---

In [10]:
def enrich_date_time_features(df: DataFrame, input_col: str, target_col: str = "date") -> DataFrame:
    """Zaman damgasını tarihe çevirir ve Yıl/Ay/Gün ekler."""
    df = df.withColumn(target_col, to_date(col(input_col)))
    df = df.withColumn("year", year(col(target_col)))
    df = df.withColumn("month", month(col(target_col)))
    df = df.withColumn("day", dayofmonth(col(target_col)))
    return df

def format_decimal_col(df: DataFrame, col_name: str, scale: int = 2):
    if col_name in df.columns:
        df = df.withColumn(col_name, round(col(col_name), scale))
    return df

StatementMeta(, 7ed8ddf9-78a0-4692-98bf-78e268e79313, 16, Finished, Available, Finished)

# --- Yazma ve Delta Lake İşlemleri ---

In [11]:
def load_data_into_delta_table(data: DataFrame, sink_path: str, full_load: bool, primary_col_name: str = None, table_name: str = None):
    """
    Veriyi Delta Lake formatında yazar.
    - full_load=True: Overwrite (Ezer)
    - full_load=False: Merge (Upsert - Var olanı günceller, yoksa ekler)
    """
    try:
        # Eğer tablo ismi verilmişse (Gold katmanı için), managed table olarak kaydet
        if table_name:
            if full_load:
                data.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(table_name)
            else:
                # Basitlik için Gold katmanında genelde overwrite kullanılır, 
                # ama merge gerekiyorsa buraya eklenebilir.
                data.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(table_name)
            print(f"SUCCESS: Table '{table_name}' saved.")
            return

        # Dosya yoluna yazma (Silver katmanı için)
        if full_load:
            data.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(sink_path)
            print(f"SUCCESS: Written to {sink_path} (Overwrite)")
        else:
            # Delta Table var mı kontrol et
            if DeltaTable.isDeltaTable(spark, sink_path):
                deltaTable = DeltaTable.forPath(spark, sink_path)
                
                # Merge (Upsert) Mantığı
                # Kaynak (updates) ve Hedef (target) eşleşmesi
                deltaTable.alias("target").merge(
                    data.alias("updates"),
                    f"target.{primary_col_name} = updates.{primary_col_name}"
                ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
                print(f"SUCCESS: Merged into {sink_path}")
            else:
                # Tablo yoksa ilk kez oluştur
                data.write.format("delta").mode("overwrite").save(sink_path)
                print(f"SUCCESS: Created new Delta table at {sink_path}")

    except Exception as e:
        print(f"ERROR in load_data_into_delta_table: {e}")
        raise

def create_lakehouse_table(table_name: str, path: str):
    """Files altındaki Delta dosyalarını Table olarak tanıtır (Opsiyonel)."""
    spark.sql(f"CREATE TABLE IF NOT EXISTS {table_name} USING DELTA LOCATION '{path}'")
    print(f"Table '{table_name}' registered from location '{path}'")

StatementMeta(, 7ed8ddf9-78a0-4692-98bf-78e268e79313, 17, Finished, Available, Finished)