In [0]:
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from pyspark.sql.functions import explode_outer


#porownanie df do schema

def validate_schema(df, exp_schema: StructType):
    actual_fields = set((f.name, f.dataType) for f in df.schema.fields)
    exp_fields = set((f.name, f.dataType) for f in exp_schema.fields)

    missing = exp_fields - actual_fields
    extra = actual_fields - exp_fields

    if missing:
        raise ValueError(f"missing: {missing}")
    if extra:
        print(f"extra fields: {extra}")

In [0]:
#explode z zabezpieczeniem
def safe_explode(df, col_name):
    if col_name not in df.columns:
        raise ValueError(f"Column {col_name} not in DF")
    return df.withColumn(col_name, explode_outer(col(col_name)))

In [0]:
#szybkie sprawdzanie czy nie ma nulli w waznych kolumnach
def check_nulls_in_key_columns(df, key_columns):
    for col_name in key_columns:
        null_count = df.filter(col(col_name).isNull()).count()
        if null_count > 0:
            raise ValueError(f"Nulls in: {col_name} ({null_count} rows)")


In [0]:
#sprawdzanie unikalnosci
def check_uniqueness(df, col_name):
    total = df.count()
    distinct = df.select(col_name).distinct().count()
    if total != distinct:
        raise ValueError(f" {col_name} not uniq -  distincit: {distinct}  total: {total}")


In [0]:
#row limit zeby nie wczytac za duzo : 
def check_row_limit(df, max_rows=100000):
    count = df.count()
    if count > max_rows:
        raise RuntimeError(f"too many rows: {count} > {max_rows}")
