In [0]:
from pyspark.sql.functions import *
from pyspark.sql import *
import datetime
import json 
from delta.tables import DeltaTable

def load_config(path):
    with open(path,'r') as f:
        config = json.loads(f.read())
    return config

def selectschema(df:DataFrame, newschema:list) -> DataFrame:
    df = df.select(newschema)
    return df

def dropcolumns(df:DataFrame, dropcolumns:list) -> DataFrame:
    df = df.drop(*dropcolumns)
    return df

def renamecolumns(df:DataFrame,columnrenamedict:dict) -> DataFrame:
    for oldname, newname in columnrenamedict.items():
        df = df.withColumnRenamed(oldname,newname)
    return df

def dropduplicates(df:DataFrame) -> DataFrame:
    df = df.dropDuplicates()
    return df

def notnullwithor(df: DataFrame, notnullcolumns: list) -> DataFrame:
    condition = col(notnullcolumns[0]).isNotNull()
    for column in notnullcolumns[1:]:
        condition = condition | col(column).isNotNull()   
    return df.filter(condition)

def notnullwithand(df: DataFrame, notnullcolumns: list) -> DataFrame:
    condition = col(notnullcolumns[0]).isNotNull()
    for column in notnullcolumns[1:]:
        condition = condition & col(column).isNotNull()   
    return df.filter(condition)

def picklatestrecords(df:DataFrame,partitioncolumns: list,reliablecolumns: list) -> DataFrame:
    window = Window.partitionBy(*partitioncolumns).orderBy(*[col(column).desc() for column in reliablecolumns])
    df = df.withColumn("rank", row_number().over(window))
    df = df.filter(col("rank") == 1).drop("rank")
    return df

def schemaenforcemnet(df:DataFrame, castcolumns: dict) -> DataFrame:
    for column, casttype in castcolumns.items():
        df = df.withColumn(column, col(column).cast(casttype))
    return df

def concatcolumns(df:DataFrame,concatcolumns: list,newcolumnname:str) -> DataFrame:
    df = df.withColumn(newcolumnname,concat(*concatcolumns))
    return df

def makenullstoemptystrings(df:DataFrame, emptystringcolumns: list) -> DataFrame:
    for column in emptystringcolumns:
        df = df.withColumn(column, coalesce(col(column), lit('')))
    return df

def addauditcolumns(df:DataFrame) -> DataFrame:
    df = df.withColumn('createddate',  lit(datetime.datetime.now()))\
           .withColumn('createdby', lit('silverorchestrator'))
    return df

def mergetotarget(df:DataFrame, keycolumnsformerge:list,table):
    merge_condition = " AND ".join([f"target.{col} = source.{col}" for col in keycolumnsformerge])
    targettable = DeltaTable.forName(spark, table)
    targettable.alias("target").merge(
        df.alias("source"),
        merge_condition
        ).whenMatchedUpdateAll() \
         .whenNotMatchedInsertAll() \
         .execute()

