#customized dbutils

- Provide common resources for standardizing codes and functions;



##standard_debug()

- Function created for printing log information;



In [0]:
#Created: 2023.11.13 - Nelson Jr. ngoliveirajr@gmail.com

#_dbg_option: if Y the log will be printed with the request
#_dbg_validation: object to be printed in the log (string, list or dataframe)
#_dbg_message: Message header to be displayed in the log
def standard_debug(_dbg_option: bool,_dbg_validation,_dbg_message: str):
    if _dbg_option==True:
        print(_dbg_message)
        if type(_dbg_validation)==str or type(_dbg_validation)==list:
            print(_dbg_validation)
        elif isinstance(_dbg_validation,DataFrame):
                _dbg_validation.display()
        

##standard_dim_fact_table()

- Customized Function for creating DIMENSION and FACT tables based on a transactional database;



In [0]:
#Created: 2023.11.13 - Nelson Oliveira nelson.oliveira@volvo.com




#_src_df: the main dataframe to be used in the transformation
#_col_keys: list of columns to be considered as key for the selection. Ex. ["name","id","brand"]
#_TK_name: name for the technical key column
#_incremental: if 'N' the dimension and target tables will be created every time. 
#             if 'Y' the dimension and target tables will be retained and just 
#                    the new combination will be added with the new TK
#             @Any case if tables do not exist, it will be created for the first time
#_TK_consistence: Replace the database with the latest information for the technical key.
#_dim_database: the database name for the dimension table
# _dim_tablename: name for the dimension table
#_target_database: the database name for the target table
#_target_tablename: name for the target table
#_debug: if True the function will print and display in all steps
#


def standard_dim_fact_table(_src_df,_col_keys: list,_TK_name: str,_incremental: bool,_TK_consistence: bool,_dim_database: str,_dim_tablename: str,_fact_database: str, _fact_tablename: str,_debug: bool):
    
    
    #Creates the variable appointed to the dimension table
    _dim_name=set_database_name(_dim_database)+"."+_dim_tablename
   

    #debug**********************
    standard_debug(_debug,_dim_name,"1.Creates the variable appointed to the DIMENSION table:")
        
    #Creates the variable appointed to the fact table
    _fact_table=set_database_name(_fact_database)+"."+_fact_tablename

    #debug**********************
    standard_debug(_debug,_fact_table,"2.Creates the variable appointed to the FACT table:")

    
    if spark.catalog._jcatalog.tableExists(f"{_dim_name}"):
        #Added this just for good understand about the logic.
        _incremental=_incremental
    else:
        _incremental=False
    
    #debug**********************
    standard_debug(_debug,_incremental,"3.Check if the dimension table is present in the catalog. If table is not present then the append will not be considered:")

    #Creates the DF based on the src table
    df=_src_df

    #Creates the distinct combination for considering as the technical key (TK)
    df_distinct_combination = df.select(*_col_keys).distinct()
    print(type(df_distinct_combination))

    #debug**********************
    standard_debug(_debug,df_distinct_combination,"4.Creates the distinct combination for considering as the technical key (TK):")

    #Creates the array column for the technical key calculation
    df_with_array = df_distinct_combination.select(df_distinct_combination['*'],struct([f.col(c).alias(c) for c in df_distinct_combination.columns]).alias("array_col"))
    
    #debug**********************
    standard_debug(_debug,df_with_array,"5.Creates the array column for the technical key calculation:")

    last_key=0
    if(_incremental==True):

        #debug**********************
        standard_debug(_debug,"","5.0.>>Incremental selected.<<<")

        df_dimension_key=spark.sql(f"select * from {_dim_name}")

        #Collect the column names to filter
        df_key_cols=df_dimension_key.select(*_col_keys).limit(1)

        #debug**********************
        standard_debug(_debug,df_key_cols,"5.1.Collect the column names to filter:")

        #Collect the last key in the dim table
        last_key = df_dimension_key.agg(max(_TK_name)).collect()[0][0]

        #debug**********************
        standard_debug(_debug,last_key,"5.2.Collect the last key in the dim table:")

        #Recreates the dimension table with the columns in array
        df_dimension_array = df_dimension_key.select(f.col(_TK_name),*_col_keys,struct([f.col(c).alias(c) for c in df_key_cols.columns]).alias("array_col"))

        #debug**********************
        standard_debug(_debug,df_dimension_array,"5.3.Recreates the dimension table with the columns in array:")

        #Create a left anti join to check the new incoming figures to be added
        #Keep the same df name to be considered in case of N for appending
        df_with_array=df_with_array.join(df_dimension_array,on='array_col',how='leftanti')
        
        #debug**********************
        standard_debug(_debug,df_with_array,"5.4.Create a left anti join to check the new incoming figures to be added:")

    #Creates the technical key column considering the last key as start point
    window = Window.orderBy(df_with_array['array_col'])
    df_with_array_key = df_with_array.withColumn(_TK_name,row_number().over(window)+last_key)
    
    #debug**********************
    standard_debug(_debug,df_with_array_key,"6.Creates the technical key column considering the last key as start point:")

    #If append is Y the new data will be added in the dimension
    if(_incremental==True):
        df_dimension_array = df_dimension_array.unionByName(df_with_array_key, allowMissingColumns=True)
    else:
        df_dimension_array=df_with_array_key
    
    #debug**********************
    standard_debug(_debug,df_dimension_array,"7.If append is Y the new data will be added in the dimension:")

    #Creates the array column with the columns in scope for the source base
    df_base_with_array = df.select(*[f.col(column) for column in df.columns if column not in _col_keys],struct([f.col(c).alias(c) for c in df_distinct_combination.columns]).alias("array_col"))
    
    #debug**********************
    standard_debug(_debug,df_base_with_array,"8.Creates the array column with the columns in scope for the source base:")

    #Add the technical key by joining the array column as the key
    df_join_TK= df_base_with_array.join(df_dimension_array,on='array_col',how='left')
    df_join_TK=df_join_TK.drop("array_col")
    
    #debug**********************
    standard_debug(_debug,df_join_TK,"9.Add the technical key by joining the array column as the key:")

    #***Needs to repair the logic for selecting the right columns
    df_TK= df_join_TK.select(*[f.col(column) for column in df_join_TK.columns if column not in _col_keys])

    #debug**********************
    standard_debug(_debug,df_join_TK,"10.***Needs to repair the logic for selecting the right columns:")

    if(_incremental==True):    
        #If the table exists it will append the new incoming data  
        if spark.catalog._jcatalog.tableExists(f"{_fact_table}"):
            df_base=spark.sql(f"select * from {_fact_table}")

            if _TK_consistence==True:
                #In order to keep the consistancy, the data must to be checked
                #Otherwise the data will be appended anyway.
                df = [column for column in df_base.columns if column not in _TK_name]
                
                df_base=df_base.join(df_TK,on=df,how='leftanti')
                #debug**********************
                standard_debug(_debug,df_base,"Check new database:")

            df_base_new = df_base.unionByName(df_TK, allowMissingColumns=True)

            #Check if the distinct is needed as part of that (it can be too slow)
            df_base_new = df_base_new.distinct()

        #if not the table will be created
        else:
            df_base_new=df_TK
    #if the append is not required it will create a new table
    else:
        df_base_new=df_TK

    #debug**********************
    standard_debug(_debug,"","11.If the table exists it will append the new data. Otherwise the table will be recreated:")
    
    standard_debug(_debug,df_base_new,"12.Final Fact table:")
    store_and_mount_table(df_base_new,_fact_tablename,_fact_database,file_location, True)

    #Save the new dimension table
    #Store database with the function
    df_dimension_array=df_dimension_array.drop('array_col')

    #debug**********************
    standard_debug(_debug,df_dimension_array,"13.Final Dimension table:")

    store_and_mount_table(df_dimension_array,_dim_tablename,_dim_database,file_location, True)


