In [0]:
from pyspark.sql.functions import current_timestamp

In [0]:
def f_get_secret(key):
    try:
        return dbutils.secrets.get(scope="cryptoproject", key=key)
    except Exception as e:
        raise(e)

In [0]:
def f_add_input_file_name_loadtime(df,date_part=None,file_name=None):
    from pyspark.sql.functions import input_file_name,split,current_timestamp,lit,to_date,regexp_extract
    try:
        if(file_name is None and date_part is None):
            df_final=df.select("*").\
                withColumn("input_file_name",input_file_name()).\
                withColumn("network", regexp_extract(input_file_name(), r"unstructured_data/(.*?)/txn_date", 1)). \
                withColumn("txn_date", regexp_extract(input_file_name(), r"txn_date=([0-9\-]+)", 1)). \
                withColumn("load_timestamp",current_timestamp())
        # elif(file_name is None):
        #     df_final=df.select("*").\
        #         withColumn("input_file_name",split(input_file_name(),'/')[4]).\
        #         withColumn("date_part",lit(date_part)).\
        #         withColumnRenamed("date_part",to_date(date_part)).\
        #         withColumn("load_timestamp",current_timestamp())
        # else:
        #     df_final=df.select("*").\
        #         withColumn("input_file_name",lit(file_name)).\
        #         withColumn("date_part",to_date(lit(date_part))).\
        #         withColumn("load_timestamp",current_timestamp())
        return df_final
    except Exception as e:
        raise(e)

In [0]:
def f_validate_schema(df,sink_schema):
    no_rows=df.count()
    if(no_rows<=100000):
       no_files=1
    elif(no_rows>100000 and no_rows<=1000000):
       no_files=3
    elif(no_rows>1000000 and no_rows<=10000000):
       no_files=5
    source_schema=df.limit(1).dtypes
    if(source_schema==sink_schema):
        return no_files
    else:
        raise Exception("Schema is not matched")

In [0]:
def f_get_primary_key(df):
    try:
        import mack as mk
        primary_key=mk.find_composite_key_candidates(df)
        merge_condition=" "
        for i in range(len(primary_key)):
            if(i==len(primary_key)-1):
                merge_condition+="tgt."+primary_key[i]+"=src."+primary_key[i]
            else:
                merge_condition+="tgt."+primary_key[i]+"=src."+primary_key[i]+" AND "
        return (merge_condition)
    except Exception as err:
        print("Error occured ",str(err))
        raise err

In [0]:
def f_merge_delta_data(input_df, catalog_name, schema_name, table_name, merge_condition, partition_column):
  spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true")
  # dynamic: use merge_condition  A.partition_column = B.partition_column 
  # if use merge_condition  A.partition_column = 'value', don'n need set dynamic is true
  try:
    target_table = f"{catalog_name}.{schema_name}.{table_name}"
    from delta.tables import DeltaTable
    if (spark._jsparkSession.catalog().tableExists(target_table)):
      deltaTable = DeltaTable.forName(spark, target_table)
      deltaTable.alias("tgt").merge(
          input_df.alias("src"),
          merge_condition) \
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()
    else:
      input_df.write.mode("overwrite").partitionBy(partition_column).format("delta").saveAsTable(target_table)
  except Exception as err:
      print("Error occured ",str(err))
      raise err

In [0]:
def add_ingestion_date(input_df):
  output_df = input_df.withColumn("ingestion_date", current_timestamp())
  return output_df

In [0]:
def re_arrange_partition_column(input_df, partition_column):
  column_list = []
  for column_name in input_df.schema.names:
    if column_name != partition_column:
      column_list.append(column_name)
  column_list.append(partition_column)
  output_df = input_df.select(column_list)
  return output_df

In [0]:
def overwrite_partition(input_df, db_name, table_name, partition_column):
  output_df = re_arrange_partition_column(input_df, partition_column)
  spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
  if (spark._jsparkSession.catalog().tableExists(f"{db_name}.{table_name}")):
    output_df.write.mode("overwrite").insertInto(f"{db_name}.{table_name}")
  else:
    output_df.write.mode("overwrite").partitionBy(partition_column).format("parquet").saveAsTable(f"{db_name}.{table_name}")

In [0]:
def df_column_to_list(input_df, column_name):
  df_row_list = input_df.select(column_name) \
                        .distinct() \
                        .collect()
  
  column_value_list = [row[column_name] for row in df_row_list]
  return column_value_list

In [0]:
def merge_delta_data(input_df, db_name, table_name, folder_path, merge_condition, partition_column):
  spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true")
  # dynamic: use merge_condition  A.partition_column = B.partition_column 
  # if use merge_condition  A.partition_column = 'value', don'n need set dynamic is true
  from delta.tables import DeltaTable
  if (spark._jsparkSession.catalog().tableExists(f"{db_name}.{table_name}")):
    deltaTable = DeltaTable.forPath(spark, f"{folder_path}/{table_name}")
    deltaTable.alias("tgt").merge(
        input_df.alias("src"),
        merge_condition) \
      .whenMatchedUpdateAll()\
      .whenNotMatchedInsertAll()\
      .execute()
  else:
    input_df.write.mode("overwrite").partitionBy(partition_column).format("delta").saveAsTable(f"{db_name}.{table_name}")

In [0]:
import os
import requests
import json

def get_access_token():
    url = "https://open.larksuite.com/open-apis/auth/v3/tenant_access_token/internal"
    payload = json.dumps({
	"app_id": os.getenv("LARK_APP_ID"),
	"app_secret": os.getenv("LARK_APP_SECRET")
    })

    headers = {
    'Content-Type': 'application/json'
    }

    response = requests.request("POST", url, headers=headers, data=payload)
    return response.json().get("tenant_access_token")

def f_push_alert_message(exception_message, context): 
    notebook_path = context.get("extraContext").get("notebook_path")
    notebook_link = "dbc-b211122e-383d.cloud.databricks.com/editor/notebooks/" + str(context.get("extraContext").get("notebook_id"))
    workflow = context.get("tags").get("jobName")

    message = f'[ERROR] Notebook execution failure\nNotebook: {notebook_path}\nNotebook link: {notebook_link}\nWorkflow: {workflow}\n{exception_message}'

    url = "https://open.larksuite.com/open-apis/im/v1/messages?receive_id_type=chat_id"
    content = {'type':'text','content': message}
    content_json = {
        content['type']: content['content']
    }

    req_body = {
        "msg_type": "text",
        "receive_id": os.getenv("LARK_RECIEVER_ID"),
        "content": json.dumps(content_json)
    }

    payload = json.dumps(req_body)

    token = get_access_token()
    headers = {
    'Content-Type': 'application/json',
    'Authorization': f'Bearer {token}'
    }

    response = requests.request("POST", url, headers=headers, data=payload)
    return response.text


In [0]:
def f_get_configured_logger(context):
    log4jLogger = spark.sparkContext._jvm.org.apache.log4j 
    notebook_path = context.get("extraContext").get("notebook_path")

    logger = log4jLogger.LogManager.getLogger("CustomLogs") 
    logger.info(f"Starting logging for notebook {notebook_path}")
    # dbfs:/cluster-logs/<cluster_id>/driver/log4j-active.log
    return logger