In [0]:
# AIOps Util notebook containing common functions used in different notebooks and scripts.
import json
import adal
import base64
from pyspark.sql.functions import to_timestamp,lit
from azure.storage.blob import BlockBlobService
from dateutil.parser import parse
from pyspark.sql.types import StructType, StructField, StringType
from datetime import datetime
import pandas as pd

In [0]:
#%run ./icm_config.ipynb
import os
if os.environ['CUSTOMER'] == 'CSIO':
    %run /home/jovyan/work/DAGS_Airflow/icm_config_csio.ipynb
elif os.environ['CUSTOMER'] == 'globelife':
    %run /home/jovyan/work/DAGS_Airflow/icm_config.ipynb
elif os.environ['CUSTOMER'] == 'regeneron':
    %run /home/jovyan/work/DAGS_Airflow/icm_config_regeneron.ipynb

In [0]:
%run /home/jovyan/work/DAGS_Airflow/rca_config.ipynb

In [0]:
# Common Logging function for all the notebooks and scripts
def get_logger(name, sc):
  log4jLogger = sc._jvm.org.apache.log4j
  logger = log4jLogger.LogManager.getLogger(name)
  logger.info("pyspark script logger initialized for {}".format(name))
  
  return logger 

In [0]:
# Initialize the logger
#logger = get_logger("AIOps_Util")

In [0]:
# This function is to read the data from azure blob storage. The function can either load all the files or only the latest file based on arguements passed.
def load_blob_data_df(source_config, schema, last_run, load_only_latest_file):
  """
  This function reads the data from blob to a spark dtaframe and returns it. Below are the input/output of the function.
  Input: configurations, schema for the file, timestamp of the last read file, flag to read one file or all files
  Output: dataframe, timestamp of the latest file
  """
  # Load the blob credentials and Initialize
  storage_account_name = source_config.storage_account_name
  container_name = source_config.container_name
  data_path = source_config.data_path
  
  # Converting string to timestamp format for last run from DB format to timestamp format 
  date_time_format = "%Y-%m-%d %H:%M:%S"
  try:
    last_run_timestamp = parse(last_run).strftime(date_time_format)
  except:
    last_run_timestamp = (datetime.datetime.now() - datetime.timedelta(days=30*365)).strftime(date_time_format) 
    
  # Fetch blob credentials from dbutils secret store and generate the connection string 
  access_key = dbutils.secrets.get(scope = "azure-blob-access-key", key ="access-token")
  azure_blob_base_path = 'wasbs://{}@{}.blob.core.windows.net/'.format(container_name, storage_account_name)
  spark.conf.set("fs.azure.account.key."+storage_account_name+".blob.core.windows.net", access_key)
  
  try:
    # Establish blob connection and load the data.
    block_blob_service = BlockBlobService(account_name=storage_account_name, account_key=access_key)
    generator = block_blob_service.list_blobs(container_name)
    blob_df = spark.createDataFrame([], schema= schema)
    latest_files = []
    last_modified = last_run_timestamp
    if not load_only_latest_file:
      # This loads the the delta data files based on the last_modified file from blob storage 
      for blob in generator:
        blob_last_modified_date = blob.properties.last_modified.strftime(date_time_format)
        if blob_last_modified_date > last_run_timestamp and data_path in blob.name :
          if(last_modified < blob_last_modified_date ):
            last_modified = blob_last_modified_date
          latest_files.append(azure_blob_base_path + blob.name)
    else:
      latest_file, last_modified = sorted([(blob.name, blob.properties.last_modified) for blob in generator], key = lambda x:x[1], reverse = True)[0]
      latest_files.append(azure_blob_base_path + latest_file)

  except:
    return blob_df, last_modified
  
  # This DROPMALFORMED option while reading the data drops all the data rows which doesnt adhere to the configured schema.
  if len(latest_files) > 0:
    blob_df = spark.read.format("csv")\
                   .schema(schema)\
                   .option("header", "True")\
                   .option("mode", "DROPMALFORMED")\
                   .load(latest_files)
  else:
    return blob_df, last_modified
  
  return blob_df, last_modified


In [0]:
# Load data from the given database and database server based on the query supplied and return the data as a DataFrame.
# This method uses the Azure Service Principle credentials for the database authentication.
def load_data_df(db_server,port, db_name, query):
  
  logger.info("Performing data load from {} database".format(db_name))
  service_principal_id = dbutils.secrets.get(scope = "aiops-secret-scope", key ="service_principal_id")
  service_principal_secret = dbutils.secrets.get(scope = "aiops-secret-scope", key ="service_principal_secret")
  tenant_id = dbutils.secrets.get(scope = "aiops-secret-scope", key ="tenant_id")

  # Form the JDBC url with the server name and database
  db_url = "jdbc:sqlserver://{}:{};database={};".format(db_server,port, db_name)
  
  # Located in App Registrations from Azure Portal
  authority = general_config['authority_url'] + tenant_id
  context = adal.AuthenticationContext(authority)
  resource_app_id_url = general_config['resource_app_url']
  token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
  access_token = token["accessToken"]
  
  jdbc_df = spark.read.format("jdbc") \
              .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
              .option("url", db_url) \
              .option("encrypt", "true") \
              .option("databaseName", db_name) \
              .option("hostNameInCertificate", "*.database.windows.net") \
              .option("query", query) \
              .option("accessToken", access_token) \
              .option("trustServerCertificate" , "true") \
              .load() 
  
  logger.info("Returning dataframe for data load from {}".format(db_name))
  # Return the final Dataframe
  return jdbc_df

In [0]:
# Load data from the given ODS database and database server based on the query supplied and return the data as a DataFrame.
# This method uses the Database user name and password for the database authentication.
def load_data_df_ods(db_server,port, db_name, query):
  
  logger.info("Performing data load from {} database".format(db_name))
  ods_user_name = dbutils.secrets.get(scope = "aiops-secret-scope", key ="ods_user_name")
  ods_user_password = dbutils.secrets.get(scope = "aiops-secret-scope", key ="ods_user_password")
  
  # Form the JDBC url with the server name and database
  db_url = "jdbc:sqlserver://{}:{};database={};".format(db_server,port, db_name)
  
  jdbc_df=spark.read.format("jdbc")\
        .option("driver", 'com.microsoft.sqlserver.jdbc.SQLServerDriver')\
        .option("url", db_url)\
        .option("query",query)\
        .option("user", ods_user_name)\
        .option("password", ods_user_password)\
        .option("serverTimezone", "UTC")\
        .load()

  logger.info("Returning dataframe for data load from {}".format(db_name))
  # Return the final Dataframe
  return jdbc_df 

In [0]:
# Load data from multiple query into a single dataframe
def load_data_df_multi_query(db_server,port, db_name, *query):
  df=load_data_df(db_server,port, db_name, query[0])
  for i in range(1,len(query)): 
      df=df.join(load_data_df(db_server,port, db_name, query[i]), on=['CHECK_TIME', 'DEVICE'], how='outer')
  logger.info("Returning combined dataframe for data load from {}".format(db_name))
  return df

In [0]:
# Write the supplied dataframe into Modeling DB
def write_df(df,table_name,option):
  
  logger.info("Started persisting the dataframe into {} in Modeling DB".format(table_name))
  db_server=icm_modelDBconfig['db_server_name']
  db_port=icm_modelDBconfig['db_port']
  db_name=icm_modelDBconfig['database_name']
  
  service_principal_id = dbutils.secrets.get(scope = "aiops-secret-scope", key ="service_principal_id")
  service_principal_secret = dbutils.secrets.get(scope = "aiops-secret-scope", key ="service_principal_secret")
  tenant_id = dbutils.secrets.get(scope = "aiops-secret-scope", key ="tenant_id")

  # Located in App Registrations from Azure Portal
  authority = general_config['authority_url'] + tenant_id
  context = adal.AuthenticationContext(authority)
  resource_app_id_url = general_config['resource_app_url']
  token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
  access_token = token["accessToken"]
    
  url = "jdbc:sqlserver://{}:{};".format(db_server,db_port)
  connectionProperties = {
    "database":db_name,
    "driver":"com.microsoft.sqlserver.jdbc.SQLServerDriver",
    "accessToken":access_token,
    "encrypt":"true",
    "hostNameInCertificate":"*.database.windows.net"
  }
  

  # Write is put under try catch in case overwrite to avoid data loss and store in backup table
  if(option=='overwrite'):  
    try:
      df.write.option("truncate", "true").jdbc(url, table_name, option, connectionProperties)
      logger.info("Successfully persisted dataframe into {} in Modeling DB".format(table_name))
    except:
      created_on = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
      df=df.withColumn('exception_on',lit(created_on).cast(StringType()))
      
      backup_file_location= '/backup_files/'
      # creating directory if not present
      dbutils.fs.mkdirs(backup_file_location)
      filepath = '/dbfs'+backup_file_location+table_name+'_'+created_on+'.csv'
      # Storing the data to excel file if any issues comes during updating the data to DB
      df.toPandas().to_csv(filepath)

      df.write.option("truncate", "true").jdbc(url, table_name+'_backup', 'append', connectionProperties)
      logger.info("Error while overwriting the {} table".format(table_name))
  else:
    df.write.option("truncate", "true").jdbc(url, table_name, option, connectionProperties)
    logger.info("Successfully persisted dataframe into {} in Modeling DB".format(table_name))
  

In [0]:
#Module will return Custom or generic cost sheet name w.r.t each customer
def get_cost_sheet_name(customer,cloud_name):
  custom_sheet = customer + '_'+ cloud_name + '_pricing'
  dbfs_table_df = spark.sql('show tables')
  df = dbfs_table_df.select('tableName').filter((dbfs_table_df['tableName'] == custom_sheet))
  if df.count()>0:
    cost_sheet_name = custom_sheet
  else:
    cost_sheet_name = cloud_name + '_pricing_csv'
  return cost_sheet_name

In [0]:
def write_data_to_db(pandas_df,table,write_type):
  ''' This function takes the pandas dataframe and converts it into spark dataframe and writes it into DB with exception handling  
    Parameters:
      a : pandas_df - Pandas Dataframe 
      b : table - DB table name
      c): write_type - Option to append or overwrite the table  '''   
  try:
    if not pandas_df.empty:
      pandas_df = pandas_df.fillna(value=pd.np.nan)
      spark_df = spark.createDataFrame(pandas_df)
      spark_df = spark_df.replace(float('nan'), None)
      write_df(spark_df , table, write_type)
      logger.info(table + " - Table stored to database")   
    else:
      logger.info(table + " - No Data")
  except:
      logger.info(table + " - Error writing data to database")