In [None]:
from datetime import datetime, timedelta
from pyspark.sql.functions import lit, from_utc_timestamp, col, when, to_date, explode_outer, unbase64, from_json, from_unixtime, udf, trim as spark_trim
from uuid import uuid4
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import StructType, ArrayType, StringType, NullType

import sempy.fabric as fabric
import pandas as pd
import requests
import os
import json as json_lib
import logging
import re
import time
import notebookutils

def trim(df):
    stringCol = (c for c in df.schema if str(c.dataType) == "StringType")
    for c in stringCol:
        df = df.withColumn(c.name, spark_trim(col(c.name)))
    return df

def deDuplicate(df, subset=None):
    df = df.dropDuplicates(subset)
    return df

def replaceNull(df, value, subset=None):
    from datetime import datetime
    target_type = None
    
    if isinstance(value, str):
        try:
            datetime.strptime(value, "%Y-%m-%d")
            target_type = "date"
        except ValueError:
            try:
                datetime.strptime(value, "%Y-%m-%dT%H:%M:%S")
                target_type = "timestamp"
            except ValueError:
                pass

    columns_to_process = subset if subset is not None else df.columns

    if target_type:
        cols_to_fix = [
            f.name for f in df.schema 
            if f.dataType.simpleString() == target_type 
            and f.nullable 
            and f.name in columns_to_process
        ]
        
        update_exprs = [
            when(col(c).isNull(), lit(value).cast(target_type)).otherwise(col(c)).alias(c) 
            if c in cols_to_fix else col(c) 
            for c in df.columns
        ]
        df = df.select(*update_exprs)
    else:
        df = df.fillna(value, subset=columns_to_process)
    return df

def drop_selected_columns(df, columns_to_drop):
    return df.drop(*columns_to_drop)

def use_selected_columns(df, columns_to_select):
    return df.select(*columns_to_select)
    
def build_select_string_from_schema(df) -> str:
    lines = [
        f'col("{f.name}").cast("{f.dataType.simpleString()}").alias("{f.name}")'
        for f in df.schema.fields
    ]
    body = ",\n".join(lines)
    print(f'''df = df.select(\n{body}\n)''')