#Utils: General Shared Functions
** Description: ** Notebook created to deliver some common use Functions in Databricks to make our lives easier.
___
> ** FUNCTION: createTableFromDF(dfUtil,database,tableName,area,subject,dropTable,writeMode): ** <BR>
Used to Create a Delta Table from a Spark Dataframe in one of the existing Databases in a dynamic way.
The Table Schema has to be already defined in the parameter DataFrame so the Table's Columns are created with the appropriate data formats.
>>** Parameters Needed: **
 * **dfUtil:** (Spark DF) The Dataframe containing the data that you want to create the Delta Table.
 * **database:** (String) use the 'sandbox' value to include the table in the sandbox database.
 * **tableName:** (String) the name of the Table you want to create in Databricks.
 * **area:** (String) the name of your area in the Sandbox (egs: 'credit-risk','data-engineers','data-scientists').
 * **subject:** (String) the subject/project/username that you are creating this table for. Can be you username in the Sandbox
 * **dropTable:** (Boolean) if you want to Drop the table from the database before creating it. (eg: True/False)
 * **writeMode:** (String) The write mode for you to record the data in the table. (eg: 'append': always appends the data in the table / 'overwrite': always overwrites the data in the table.)
  
>>** Example Code: ** <BR>
 createTableFromDF( dftst , 'sandbox' , 'test_table_name' , 'data-engineers' , 'fimazu' , False , 'append' ) <BR>

> ** <br> FUNCTION: moveFileFromDBFSToSandbox(area,sourceFile,targetFile): ** <BR>
Used to move files uploaded manually into Databricks DBFS to the target area Sandbox. <BR>
  
>>** Parameters Needed: **
 * **area:** (String) the name of your area in the Sandbox (egs: 'credit-risk','data-engineers','data-scientists').
 * **sourceFile:** (String) the name of the source file uploaded into DBFS.
 * **targetFile:** (String) the name of target file to be copied to the area Sandbox. the file can be renamed.
  
>>** Example Code: ** <BR>
 moveFileFromDBFSToSandbox('data-engineers','testSourceFile.csv','testDestinationFile.csv'):

> ** <br> FUNCTION: flatten_structs(nested_df): ** <BR>
Used to flatten nested structured columns in a Dataframe (eg: Struct type columns). Just pass the dataframe containing the Nested Columns as parameter. <BR>
  
>>** Parameters Needed: **
 * **Nested Data Frame (df):** The Dataframe containing the desired columns that you need un-nested. A new Dataframe will be returned.
  
>>** Example Code: ** <BR>
 df_Final = flatten_structs( df_StructColumns )  
  
> ** <br> FUNCTION: flatten_array_struct_df(nested_df): ** <BR>
Used to flatten nested columns in a Dataframe (eg: Array type columns). Just pass the dataframe containing the Nested Columns as parameter. <BR>
  
>>** Parameters Needed: **
 * **Nested Data Frame (df):** The Dataframe containing the desired columns that you want need un-nested. A new Dataframe will be returned.
  
>>** Example Code: ** <BR>
 df_Final = flatten_array_struct_df( df_NestedColumns )

> ** <br> FUNCTION: parseJSONColumns(nested_df): ** <BR>
Used to deserialize Json Columns that are in string format and flattens these nested json columns in a Dataframe (eg: Struct type columns). 
Just pass the dataframe containing the Nested Columns as parameter. <BR>
  
>>** Parameters Needed: **
 * **Serialized Json Data Frame (df):** The Dataframe containing the desired columns that you need deserialized and un-nested. A new Dataframe will be returned.
  
>>** Example Code: ** <BR>
 df_Final = parseJSONColumns( df_SerializedColumns )

In [0]:
#general configurations and imports for the notebook run
#SET spark.databricks.delta.schema.autoMerge.enabled=True
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
spark.conf.set("spark.sql.crossJoin.enabled", "true")
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
sqlContext.setConf("spark.sql.shuffle.partitions", "20")

from pyspark.sql.types import *
import os, shutil, glob, re, json, requests, time
from pyspark.sql.functions import * #col, lit, udf, substring
from pyspark.sql.window import Window
from datetime import datetime, date, timedelta
from pyspark.sql import functions as f

In [0]:
# FUNCTION: createTableFromDF(dfUtil,database,tableName,area,subject,dropTable,writeMode):
#   Used to Create a Delta Table from a Spark Dataframe in one of the existing Databases in a dynamic way. The Table Schema has to be already defined in the parameter DataFrame so the Table's Columns are created with the appropriate data formats.
#
# Parameters Needed:
#  - dfUtil: (Spark DF) The Dataframe containing the data that you want to create the Delta Table.
#  - database: (String) use the 'sandbox' value to include the table in the sandbox database.
#  - tableName: (String) the name of the Table you want to create in Databricks.
#  - area: (String) the name of your area in the Sandbox (egs: 'credit-risk','data-engineers','data-scientists').
#  - subject: (String) the subject/project/username that you are creating this table for. Can be you username in the Sandbox
#  - dropTable: (Boolean) if you want to Drop the table from the database before creating it. (eg: True/False)
#  - writeMode: (String) The write mode for you to record the data in the table. (eg: 'append': always appends the data in the table / 'overwrite': always overwrites the data in the table.)
#
# Example Code:
#   createTableFromDF( dftst , 'sandbox' , 'test_table_name' , 'data-engineers' , 'fimazu' , False , 'append' )
def createTableFromDF(dfUtil,database,tableName,area,subject,dropTable,writeMode):
  #writeMode = 'append'/'overwrite'
  mntUtil = '/mnt/'+database+'/'+area+'/'+subject+'/'+tableName
  if dropTable:    
    dbutils.fs.rm(mntUtil,True)
    spark.sql("""DROP TABLE IF EXISTS """+database+"""."""+tableName)
  
  #Create the Curated table for Kustomer Conversations
  dfUtil.write.format('delta').option('mergeSchema', 'true').mode(writeMode).save(mntUtil)
  #Creating the Raw table in the DB if not exists
  spark.sql("""
      CREATE TABLE IF NOT EXISTS """+database+"""."""+tableName+""" 
      USING DELTA LOCATION '/mnt/"""+database+"""/"""+area+"""/"""+subject+"""/"""+tableName+"""'
      """)

In [0]:
# FUNCTION: moveFileFromDBFSToSandbox(area,sourceFile,targetFile):
#   Used to move files uploaded manually into Databricks DBFS to the target area Sandbox.
# Parameters Needed:
#  - area: (String) the name of your area in the Sandbox (egs: 'credit-risk','data-engineers','data-scientists').
#  - sourceFile: (String) the name of the source file uploaded into DBFS.
#  - targetFile: (String) the name of target file to be copied to the area Sandbox. the file can be renamed.
# Example Code:
#   moveFileFromDBFSToSandbox('data-engineers','testSourceFile.csv','testDestinationFile.csv'):
def moveFileFromDBFSToSandbox(area,sourceFile,targetFile):
  mntFrom = 'dbfs:/FileStore/tables/' + area + '/' + sourceFile
  mntTo   = '/mnt/sandbox/' + area + '/' + targetFile
  #copy/move the file to a sandbox folder or other "mounted" folder in Databricks
  dbutils.fs.mv(mntFrom,mntTo)

In [0]:
# FUNCTION: flatten_structs(nested_df):
#   Used to flatten nested structured columns in a Dataframe (eg: Struct type columns). Just pass the dataframe containing the Nested Columns as parameter.
# Parameters Needed:
#  - Nested Data Frame (df): The Dataframe containing the desired columns that you need un-nested. A new Dataframe will be returned.
# Example Code:
#   df_Final = flatten_array_struct_df( df_NestedColumns )
def flatten_structs(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        
        parents, df = stack.pop()
        
        array_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:5] == "array"
        ]
        
        flat_cols = [
            f.col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]
        
        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))
        
    return nested_df.select(columns)

In [0]:
# FUNCTION: flatten_array_struct_df(nested_df):
#   Used to flatten nested columns in a Dataframe (eg: Struct or Array type columns). Just pass the dataframe containing the Nested Columns as parameter.
# Parameters Needed:
#  - Nested Data Frame (df): The Dataframe containing the desired columns that you want need un-nested. A new Dataframe will be returned.
# Example Code:
#   df_Final = flatten_array_struct_df( df_NestedColumns )
def flatten_array_struct_df(df):
    
    array_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:5] == "array"
        ]
    
    while len(array_cols) > 0:
        
        for array_col in array_cols:
            
            cols_to_select = [x for x in df.columns if x != array_col ]
            
            df = df.withColumn(array_col, f.explode(f.col(array_col)))
            
        df = flatten_structs(df)
        
        array_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:5] == "array"
        ]
    return df
spark.udf.register("flatten_array_struct_df", flatten_array_struct_df)

In [0]:
#Function that checks just if a String is a serialized Json string. Returns Boolean(True/False)
def is_json(myjson):
  try:
    json_object = json.loads(myjson)
  except ValueError as e:
    return False
  return True

# FUNCTION: parseJSONColumns(nested_df):
#   Used to deserialize Json Columns that are in string format and flattens these nested json columns in a Dataframe (eg: Struct type columns). Just pass the dataframe containing the Nested Columns as parameter.
# Parameters Needed:
#  - Serialized Json Data Frame (df): The Dataframe containing the desired columns that you need deserialized and un-nested. A new Dataframe will be returned.
#  - flatten_struct_cols (True/False): Parameter to set call for the "flatten_structs" Function to transform Json attributes into actual DF Columns
# Example Code:
#   df_Final = parseJSONColumns( df_SerializedColumns )
def parseJSONColumns(df, flatten_struct_cols, sanitize=True):
    #iterates through all DF columns
    for i in df.columns:
        #gets String value from each specific Column to validate if it's a de-serializable Json
        test_json = str(df.distinct().rdd.flatMap(lambda x: x).collect()[df.columns.index(i)])
        #if Column content is a Json, procedes to the next steps
        if is_json(test_json):
          # sanitize if requested.
          if sanitize:
              df = (
                  df.withColumn(
                      i,
                      concat(lit('{"data": '), i, lit('}'))
                  )
              )
          # infer schema and apply it
          schema = spark.read.json(df.rdd.map(lambda x: x[i])).schema
          df = df.withColumn(i, from_json(col(i), schema))

          # unpack the wrapped object if needed
          if sanitize:
              df = df.withColumn(i, col(i).data)
    #Calls the flatten_structs Function to transform Json attributes into DF Columns if parameter is True
    if flatten_struct_cols:
      df = flatten_structs(df)
    return df