In [1]:
from datetime import datetime
from pyspark.sql.functions import col, sum, avg

In [2]:

### 0. GLOBAL VARIABLES
# Many dynamic-SQL functions use these lists to reference 

# The '28 data elements' that can receive scores for external corrosion risk
score_cols = ["CP_Score", "CIS_Readings_Score", "Joint_Coating_Type_Score", "Joint_Coating_Install_Date_Score", "Joint_Coating_Condition_Score", "Mainline_Coating_Type_Score", "Mainline_Coating_Condition_SME_Score",
  "Mainline_Coating_Install_Date_Score", "Active_Anomaly_Counts_Score", "ILI_Survey_Date_Score", "Stray_Current_Interference_Score", "Soil_Type_Score", "Casing_Inhibitor_Score", "Casing_Shorted_Score",
  "Casing_Type_Score", "Land_Use_Score", "Exposed_Pipe_Score", "Pipe_Installation_Date_Score", "Pipe_Wall_Thickness_Score", "Maximum_Operating_Pressure_Score", "Pipe_Outside_Diameter_Score",
  "Nominal_Material_Yield_Strength_Score", "Remaining_HalfLife_ECDA_Score", "EC_ECDA_Anomaly_Count_Score", "Year_ECDA_Score", "Hydrostatic_Retest_Date_Score", "EC_Reassessment_Interval_Score",
  "External_Corrosion_Failure_History_Score"]

# The 28 names in the element weightage reference table corresponding to each of the above data elements
weightage_cols = ["cathodic_protection", "cathodic_protection", "joint_coating_type", "joint_coating_install_date", "joint_coating_condition", "mainline_coating", "mcc",
  "age", "active_anomaly_counts", "ili_survey_date", "stray_current_interference", "soil_type", "casing_inhibitor", "casing_is_shorted",
  "casing_type", "land_use", "exposed_pipe", "pipe_installation_date", "pipe_wall_thickness_base", "maximum_operating_pressure_base", "pipe_outside_diameter_base",
  "nominal_material_yield_strength", "ecda_half_life", "ec_ecda_anomaly_counts", "year_of_ecda", "evaluation_date", "ec_reassessment_interval",
  "external_corrosion_failure_history"]

# The 8 data groups into which the 28 data elements 'roll up'
group_cols = ["Assessment_Mitigation_Group_Score", "Stray_Current_Interference_Group_Score", "ECDA_Feature_Group_Score", "Joint_Coating_Group_Score",
  "ILI_Group_Score", "Failure_History_Group_Score", "Baseline_Susceptibility_Group_Score", "Pipe_Specifications_Group_Score"]


In [3]:
### 1. SETUP FUNCTIONS
# get a reference to a string from the key vault
def get_secret(key, scope = "Azure-KeyVault-Scope"):
  return dbutils.secrets.get(scope = scope,
                             key = key)

# configure the Databricks environment
def get_configs():
  return {"fs.azure.account.auth.type": "OAuth",
     "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
     "fs.azure.account.oauth2.client.id": get_secret("APR-TURING-ClientId"),
     "fs.azure.account.oauth2.client.secret": get_secret("APR-TURING-secret"),
     "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/" + get_secret("APR-TURING-TenantId") + "/oauth2/token",
     "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

# process an external parameter in teh notebook
def get_widget_argument(arg):
  dbutils.widgets.text(arg, "", "")
  dbutils.widgets.get(arg)
  return getArgument(arg)

# attach blob storage to the cluster
def mount_storage_with_configs(configs):
  dbutils.fs.mount(
  source = "abfss://" + get_secret("ADLS-GTM-connstr") + "/",
  mount_point = "/mnt/turingdata",
  extra_configs = configs)

# attach blob storage to the cluster
def mount_storage():
  if any(mount.mountPoint == "/mnt/turingdata" for mount in dbutils.fs.mounts()):
    return
  configs = get_configs()
  mount_storage_with_configs(configs)

# detach blog storage from the cluster
def unmount_storage():
  if any(mount.mountPoint == "/mnt/turingdata" for mount in dbutils.fs.mounts()):
    dbutils.fs.unmount("/mnt/turingdata")

# use the 'slicestart' parameter to get the date of interest
def get_date_from_slicestart():
  slicestart = get_widget_argument("SliceStart")
  date_time_obj = datetime.strptime(slicestart,'%m-%d-%Y')
  date_yy = str(date_time_obj.year)
  date_mm = str(date_time_obj.month)
  date_dd = str(date_time_obj.day)

  if len(date_dd) < 2:
    date_dd = "0" + date_dd
  if len(date_mm) < 2:
    date_mm = "0" + date_mm
  
  return date_yy, date_mm, date_dd


In [4]:
### 2. EXTERNAL READ/WRITE FUNCTIONS
# read in settings data as a Spark dataframe
def read_from_json(filename):
  return spark.read.json("/mnt/turingdata/raw/solutions/turing/data/" + filename) # finish this!!

# read in columnar raw data as a Spark dataframe
def read_from_parquet(filename, source, year, month, day):
  return spark.read.parquet("/mnt/turingdata/raw/global/pipeline_integrity/" + source + "/" + filename + "/" + year + "/" + month + "/" + filename + "_" + day + ".parquet")

def read_from_SQL(table_name):
  jdbc_address = get_secret("SDB-SERVER-TURING-url")
  jdbc_username = get_secret("SDB-DATABRICKS-TURING-userstr")
  jdbc_password = get_secret("SDB-DATABRICKS-TURING-pwd")

  jdbc_url = "jdbc:" + jdbc_address + ";database=TURING;user=" + jdbc_username + ";password=" + jdbc_password + ";encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

  return spark.read.jdbc(url = jdbc_url, table = table_name)

def write_to_blob_storage(val, location):
  fileName = location.split('/')[-1]
  path = "/".join(location.split('/')[:-1])

  if fileName not in [file.name for file in dbutils.fs.ls(path)]:
    dbutils.fs.put(location, val)
    
  else:
    print("INFO: File already exists, over-writing file " + location)
    try:
      dbutils.fs.rm(location)
      dbutils.fs.put(location, val)
    except:
      print("ERROR: Something went wrong while writing file, the existing file may have been deleted.")

def write_df_to_blob_storage(df, location):
  import csv
  from io import StringIO
  def row2csv(row):
      buffer = StringIO()
      writer = csv.writer(buffer)
      writer.writerow([s for s in row])
      buffer.seek(0)
      return buffer.read().strip()

  outputString = df.rdd.map(row2csv).coalesce(1)

  val = ",".join(schemaPeople.columns)
  val += "\n"
  val += outputString.reduce(lambda x,y: "\n".join([x,y]))
  
  write_to_blob_storage(val, location)

# write a Spark dataframe to a SQL server table
def write_to_SQL(df, table_name, write_mode = "overwrite"):
  jdbc_address = get_secret("SDB-SERVER-TURING-url")
  jdbc_username = get_secret("SDB-DATABRICKS-TURING-userstr")
  jdbc_password = get_secret("SDB-DATABRICKS-TURING-pwd")

  jdbc_url = "jdbc:" + jdbc_address + ";database=TURING;user=" + jdbc_username + ";password=" + jdbc_password + ";encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

  df.write.option("truncate", "true").jdbc(url = jdbc_url, table = table_name, mode = "overwrite")


In [5]:
### 3. LOOKUP FUNCTIONS
# get the value of an attribute of the ColorGuide table for a specific color
def get_color_attribute(table, attribute, color_category):
  return spark.sql("SELECT " + attribute + " FROM " + table + " WHERE color_category = '" + str(color_category) + "'").collect()[0].__getitem__(attribute)

# get a flag indicating whether a specific data element has been implemented for an issue
def get_implemented(table, issue, element):
  return (spark.sql("SELECT * FROM " + table)).select(issue + "_implemented")\
    .filter((col("element") == element))\
    .collect()[0].__getitem__(issue + "_implemented")

# increase the weight of a category containing out-of-scope data elements
def get_multiplier(table, issue, element):
  return (spark.sql("SELECT * FROM " + table)).select(issue + "_multiplier")\
    .filter((col("group") == element))\
    .collect()[0].__getitem__(issue + "_multiplier")

# get a flag indicating whether a specific data element is in scope for an issue
def get_scope(table, issue, element):
  return (spark.sql("SELECT * FROM " + table)).select(issue +"_scope")\
    .filter((col("element") == element))\
    .collect()[0].__getitem__(issue + "_scope")

# get the relative weight of a data element
def get_weight(table, element):
  return (spark.sql("SELECT * FROM " + table)).select("weightage")\
    .filter((col("element") == element))\
    .collect()[0].__getitem__("weightage")

# get the relative weight of a data group
def get_group_weight(group, method):
  return (spark.sql("SELECT * FROM groupWeightage")).select(group)\
    .filter((col("method") == method))\
    .collect()[0].__getitem__(group) 

In [6]:
### 4. DASHBOARD FUNCTIONS
# get the miles of pipe matching a certain color
def get_miles(table, color):
  result = spark.sql("SELECT SUM(Length_Ft) / 5280 AS miles FROM " + table + " WHERE Color_Code = '" + str(color) + "'").collect()[0].__getitem__("miles")
  if result == None:
    return '0'
  else:
    return str(result)

# get the percentage of pipe matching a certain color
def get_pct(table, color):
  total = spark.sql("SELECT SUM(Length_Ft) AS total FROM " + table).collect()[0].__getitem__("total")
  result = spark.sql("SELECT (SUM(Length_Ft) / " + str(total) +  ") * 100 AS pct FROM " + table + " WHERE Color_Code = '" + str(color) + "'").collect()[0].__getitem__("pct")
  if result == None:
    return '0'
  else:
    return str(result)

# get the kilometers of pipe matching a certain color
def get_kilometers(table, color):
  result = spark.sql("SELECT SUM(Length_Ft) / 3280.84 AS kilometers FROM " + table + " WHERE Color_Code = '" + str(color) + "'").collect()[0].__getitem__("kilometers")
  if result == None:
    return '0'
  else:
    return str(result)

# get the number of pipe units (routes, valve sections, etc.) containing at least one dynamic segment of a given color
def get_num_affected(table, color):
  result = spark.sql("SELECT COUNT(*) AS num_affected FROM " + table + " WHERE Mileage_" + color + " > 0").collect()[0].__getitem__("num_affected")
  if result == None:
    return '0'
  else:
    return str(result)

In [7]:
### 5. DYNAMIC SQL FUNCTIONS
# SQL code to list every data element used for confidence calculations
def get_data_elements_sql(prefix = "", issue = None, multiplier = 100):
  
  sqlContext.cacheTable("elementWeightage")
  sql = ""
  
  for i in range(len(score_cols)):
    if issue == None:
      sql += prefix + score_cols[i] + """ * """ + str(multiplier) + """ AS """ + score_cols[i] + """, """
      
    # mark whether a data element is in-scope-not-implemented or simply out-of-scope for a given issue
    else:
      sql += """
      CASE 
        WHEN """ + str(get_scope("elementweightage", issue, weightage_cols[i])) + """ = 1 AND """ + str(get_implemented("elementweightage", issue, weightage_cols[i])) + """ = 1
          THEN """ + prefix + score_cols[i] + """ * """ + str(multiplier) + """
        WHEN """ + str(get_scope("elementweightage", issue, weightage_cols[i])) + """ = 1 AND """ + str(get_implemented("elementweightage", issue, weightage_cols[i])) + """ = 0
          THEN -2
        ELSE -1
      END AS """ + score_cols[i] + """, """   
    
  spark.catalog.uncacheTable("elementWeightage") 
    
  return sql[:-2]

# list every data element used for confidence calculations according to the "overall DC score" rollup logic and rounding to a given precision
def get_data_elements_product_with_rounding_sql(table1_prefix, table2_prefix, table3_prefix, round_to, multiplier = 100):
  sql = ""
  
  for col in score_cols:
    sql += """
    CASE
      WHEN (""" + table2_prefix + col + """ IS NULL OR """ + table2_prefix + """nullmarker = 1) AND (""" + table3_prefix + col + """ IS NULL OR """ + table3_prefix + """nullmarker = 1)
        THEN ROUND(""" + table1_prefix + col + """ * """ + str(multiplier) + """, """ + round_to + """)
        
      WHEN (""" + table2_prefix + col + """ IS NULL OR """ + table2_prefix + """nullmarker = 1) AND (""" + table3_prefix + col + """ IS NOT NULL AND """ + table3_prefix + """nullmarker = 0)
        THEN ROUND((""" + table1_prefix + col + """ * """ + table3_prefix + col + """) * """ + str(multiplier) + """, """ + round_to + """)
        
      WHEN (""" + table2_prefix + col + """ IS NOT NULL AND """ + table2_prefix + """nullmarker = 0) AND (""" + table3_prefix + col + """ IS NULL OR """ + table3_prefix + """nullmarker = 1)
        THEN ROUND((""" + table1_prefix + col + """ * """ + table2_prefix + col + """) * """ + str(multiplier) + """, """ + round_to + """)
        
      ELSE ROUND(((""" + table1_prefix + col + """ * """ + table2_prefix + col + """) + 
        (""" + table1_prefix + col + """ * """ + table3_prefix + col + """) - 
        (""" + table1_prefix + col + """ * """ + table2_prefix + col + """ * """ + table3_prefix + col + """)) * """ + str(multiplier) + """, """ + round_to + """)
    END AS """ + col + """, """

  return sql[:-2]

# list every data group used for confidence calculations
def get_data_groups_sql(prefix = "", multiplier = 100):
  sql = ""
  
  for col in group_cols:
      sql += col + """ * """ + str(multiplier) + """ AS """ + col + """, """
    
  return sql[:-2]

# list every data group used for confidence calculations according to the "overall DC score" rollup logic and rounding to a given precision
def get_data_groups_product_with_rounding_sql(table1_prefix, table2_prefix, table3_prefix, round_to, multiplier = 100):
  sql = ""
  
  for col in group_cols:
    sql += """
    CASE
      WHEN (""" + table2_prefix + col + """ IS NULL OR """ + table2_prefix + """nullmarker = 1) AND (""" + table3_prefix + col + """ IS NULL OR """ + table3_prefix + """nullmarker = 1)
        THEN ROUND(""" + table1_prefix + col + """ * """ + str(multiplier) + """, 2)
        
      WHEN (""" + table2_prefix + col + """ IS NULL OR """ + table2_prefix + """nullmarker = 1) AND (""" + table3_prefix + col + """ IS NOT NULL AND """ + table3_prefix + """nullmarker = 0)
        THEN ROUND((""" + table1_prefix + col + """ * """ + table3_prefix + col + """) * """ + str(multiplier) + """, 2)
        
      WHEN (""" + table2_prefix + col + """ IS NOT NULL AND """ + table2_prefix + """nullmarker = 0) AND (""" + table3_prefix + col + """ IS NULL OR """ + table3_prefix + """nullmarker = 1)
        THEN ROUND((""" + table1_prefix + col + """ * """ + table2_prefix + col + """) * """ + str(multiplier) + """, 2)
        
      ELSE ROUND(((""" + table1_prefix + col + """ * """ + table2_prefix + col + """) + 
        (""" + table1_prefix + col + """ * """ + table3_prefix + col + """) - 
        (""" + table1_prefix + col + """ * """ + table2_prefix + col + """ * """ + table3_prefix + col + """)) * """ + str(multiplier) + """, 2)
    END AS """ + col + """, """
    
  return sql[:-2]

# roll up every data element used for confidence calculations
def get_dc_data_elements_rollup_sql(prefix = ""):
  sql = ""
  
  for col in score_cols:
      sql += get_rollup_sql(col, prefix) + " AS " + col + ", "
 
  # remove final comma
  return sql[:-2]

# roll up every data group used for confidence calculations
def get_dc_data_groups_rollup_sql(prefix = ""):
  sql = ""
  
  for col in group_cols:
      sql += get_rollup_sql(col, prefix) + " AS " + col + ", "
 
  # remove final comma
  return sql[:-2]

# calculate an overall confidence score by getting relative weights by method and gorup
def get_all_group_weight_sql(prefix, table_type):
  categories = ["assessment_mitigation", "stray_current_interference", "ecda_feature", "joint_coating", "ili",
                "failure_history", "baseline_susceptibility", "pipe_specifications"]
  
  sql = ""
  
  for cat in categories:
    sql += get_group_weight_sql(prefix, cat) + " + "
    
  # remove final comma
  return sql[:-2]

# SQL code to find the weight of a data group based on relative amounts of each calculation method
def get_group_weight_sql(prefix, group_prefix):
  return "((" + prefix + group_prefix + "_group_score * method.1_pct * """ + str(get_group_weight(group_prefix + "_score", 1)) + ") + (" + prefix + group_prefix + "_group_score * method.2_pct * """ + str(get_group_weight(group_prefix + "_score", 2)) + "))"

# SQL code to assign a special marker to a table's row indicating that none of its in-scope data elements have been implemented
def get_null_marker_sql(issue):
  implemented_cols = []
  sql = "CASE WHEN "
  
  for i in range(len(weightage_cols)):
    if get_scope("ElementWeightage", issue, weightage_cols[i]) == 1:
      implemented_cols.append(score_cols[i])
  
  for col in implemented_cols:
    sql += col + """ IS NULL AND """
    
  # remove final 'and'
  sql = sql[:-4] + " THEN 1 ELSE 0 END"
  
  return sql

# SQL code to roll up granular values weighted by their length
def get_rollup_sql(column_name, prefix = ""):
  return "SUM((" + column_name + ") * " + prefix + "Length_Ft) / SUM(" + prefix + "Length_Ft) "

# SQL code to flag every element as in- or out-of-scope for an issue
def get_scope_sql(issue):
  sql = ""
  
  for i in range(len(score_cols)):
    sql += "CASE WHEN " + str(get_scope("ElementWeightage", issue, weightage_cols[i])) + " = 1 THEN COALESCE(" + score_cols[i] + ", 0) ELSE NULL END AS " + score_cols[i] + ", "
    
  # remove final comma
  return sql[:-2]

In [8]:
### 6. SQL EXECUTION FUNCTIONS
# add columns to a supplamentary table that will enable it to join with route-level data
def add_route_information_to_table(base_table, series_join_column, result_table):
  spark.sql("""
    SELECT
      x.*,
      l.lineloopid AS Route_Section,
      l.routedescription AS Route, 
      p.OperatingUnitCode AS Company_Code
    FROM 
      """ + base_table + """ AS x
      INNER JOIN StationSeries AS ss ON ss.series_id = x.""" + series_join_column + """
      INNER JOIN LineLoop AS l ON l.lineloopid = ss.LineLoopId 
      INNER JOIN PipelineSystem AS p on p.pipelineid = l.pipelineid
  """).createOrReplaceTempView(result_table)

# add columns to two supplamentary tables that will enable them to join with route-level data
def add_route_information_to_two_tables(base_table, second_table, series_join_column, first_join_column, second_join_column, result_table):
  spark.sql("""
    SELECT
      x.*,
      y.*,
      l.lineloopid AS Route_Section,
      l.routedescription AS Route, 
      p.OperatingUnitCode AS Company_Code
    FROM 
      """ + base_table + """ AS x
      INNER JOIN """ + second_table + """ AS y ON x.""" + first_join_column + """ = y.""" + second_join_column + """
      INNER JOIN StationSeries AS ss ON ss.series_id = x.""" + series_join_column + """
      INNER JOIN LineLoop AS l ON l.lineloopid = ss.LineLoopId 
      INNER JOIN PipelineSystem AS p on p.pipelineid = l.pipelineid
  """).createOrReplaceTempView(result_table)

# assign color codes to a table based on risk and data confidence scores
def assign_color_codes(table_name, conf_column, risk_column):
  return spark.sql("""
    SELECT
      *,
      CASE 
        WHEN """ + conf_column + """ """ + str(get_color_attribute("ColorGuide", "dc_operator", 'red')) + str(get_color_attribute("ColorGuide", "dc_threshold", 'red')) + """ AND 
          """ + risk_column + str(get_color_attribute("ColorGuide", "risk_operator", 'red')) + str(get_color_attribute("ColorGuide", "risk_threshold", 'red')) + """ 
          THEN '""" + str(get_color_attribute("ColorGuide", "color_code", 'red')) + """'
        WHEN """ + conf_column + """ """ + str(get_color_attribute("ColorGuide", "dc_operator", 'maroon')) + str(get_color_attribute("ColorGuide", "dc_threshold", 'maroon')) + """ AND 
          """ + risk_column + str(get_color_attribute("ColorGuide", "risk_operator", 'maroon')) + str(get_color_attribute("ColorGuide", "risk_threshold", 'maroon')) + """ 
          THEN '""" + str(get_color_attribute("ColorGuide", "color_code", 'maroon')) + """'
        WHEN """ + conf_column + """ """ + str(get_color_attribute("ColorGuide", "dc_operator", 'blue')) + str(get_color_attribute("ColorGuide", "dc_threshold", 'blue')) + """ AND 
          """ + risk_column + str(get_color_attribute("ColorGuide", "risk_operator", 'blue')) + str(get_color_attribute("ColorGuide", "risk_threshold", 'blue')) + """ 
          THEN '""" + str(get_color_attribute("ColorGuide", "color_code", 'blue')) + """'
        ELSE '""" + str(get_color_attribute("ColorGuide", "color_code", 'gray')) + """'
      END AS Color_Code 
    FROM """ + table_name)

# rol up by route section, weighted by length
def get_straight_avg_by_route(input_table, result_table, issue):
  spark.sql("""
    SELECT
      Company_Code,
      Route,
      AVG(""" + issue + """_score) AS """ + issue + """_Score
    FROM
      """ + input_table + """
    GROUP BY Company_Code, Route
  """).createOrReplaceTempView(result_table)  
  
# roll up by route section, unweighted
def get_weighted_avg_by_route(input_table, result_table, issue):
  spark.sql("""
    SELECT
      Company_Code,
      Route,
      SUM(length * """ + issue + """_score) / SUM(length) AS """ + issue + """_Score
    FROM
      """ + input_table + """
    GROUP BY Company_Code, Route
  """).createOrReplaceTempView(result_table)

# roll up data elements into data groups for a given issue
def execute_full_group_rollup_sql(input_table,  result_table, issue):
  spark.sql("""
  SELECT
    *,
    
    ((GREATEST(hydrostatic_retest_date_score, Year_ECDA_Score) * 
      """ + str(get_weight("ElementWeightage", "evaluation_date")) + """) + 
    (COALESCE(ec_reassessment_interval_score, 0) * 
      """ + str(get_weight("ElementWeightage", "ec_reassessment_interval")) + """))
      * """ + str(get_multiplier('ElementWeightage', issue, 'assessment mitigation')) + """
      AS assessment_mitigation_group_score,
      
    (COALESCE(stray_current_interference_score, 0) *
      """ + str(get_weight("ElementWeightage", "stray_current_interference")) + """)
      * """ + str(get_multiplier('ElementWeightage', issue, 'stray current interference')) + """ AS stray_current_interference_group_score,
    
    ((COALESCE(Remaining_HalfLife_ECDA_Score, 0) * 
      """ + str(get_weight("ElementWeightage", "ecda_half_life")) + """) + 
      (COALESCE(ec_ecda_anomaly_count_score, 0) * 
      """ + str(get_weight("ElementWeightage", "ec_ecda_anomaly_counts")) + """) + 
      (COALESCE(Year_ECDA_Score, 0) * 
      """ + str(get_weight("ElementWeightage", "year_of_ecda")) + """))
      * """ + str(get_multiplier('ElementWeightage', issue, 'ecda')) + """ AS ecda_feature_group_score,
      
    ((COALESCE(joint_coating_type_score, 0) * 
      """ + str(get_weight("ElementWeightage", "joint_coating_type")) + """) + 
      (COALESCE(joint_coating_install_date_score, 0) * 
      """ + str(get_weight("ElementWeightage", "joint_coating_install_date")) + """) + 
      (COALESCE(joint_coating_condition_score, 0) * 
      """ + str(get_weight("ElementWeightage", "joint_coating_condition")) + """))
      * """ + str(get_multiplier('ElementWeightage', issue, 'joint coating')) + """ AS joint_coating_group_score,
     
    ((COALESCE(active_anomaly_counts_score, 0) * 
      """ + str(get_weight("ElementWeightage", "active_anomaly_counts")) + """) +
      (COALESCE(ili_survey_date_score, 0) * 
      """ + str(get_weight("ElementWeightage", "ili_survey_date")) + """))
      * """ + str(get_multiplier('ElementWeightage', issue, 'ili'))  + """ AS ili_group_score,
      
    (COALESCE(external_corrosion_failure_history_score, 0) *
      """ + str(get_weight("ElementWeightage", "external_corrosion_failure_history")) + """)
      * """ + str(get_multiplier('ElementWeightage', issue, 'failure history')) + """ AS failure_history_group_score,
      
    ((GREATEST(cp_score, cis_readings_score) *
      """ + str(get_weight("ElementWeightage", "cathodic_protection")) + """) +
      (GREATEST(COALESCE(mainline_coating_install_date_score, 0), COALESCE(pipe_installation_date_score, 0)) *
      """ + str(get_weight("ElementWeightage", "age")) + """) + 
      (COALESCE(mainline_coating_type_score, 0) * 
      """ + str(get_weight("ElementWeightage", "mainline_coating")) + """) +
      (COALESCE(mainline_coating_condition_SME_score, 0) * 
      """ + str(get_weight("ElementWeightage", "mcc")) + """) +
      (COALESCE(pipe_wall_thickness_score, 0) * 
      """ + str(get_weight("ElementWeightage", "pipe_wall_thickness_base")) + """) +
      (COALESCE(maximum_operating_pressure_score, 0) * 
      """ + str(get_weight("ElementWeightage", "maximum_operating_pressure_base")) + """) +
      (COALESCE(pipe_outside_diameter_score, 0) * 
      """ + str(get_weight("ElementWeightage", "pipe_outside_diameter_base")) + """) +
      (COALESCE(nominal_material_yield_strength_score, 0) * 
      """ + str(get_weight("ElementWeightage", "nominal_material_yield_strength")) + """) +
      (COALESCE(soil_type_score, 0) * 
      """ + str(get_weight("ElementWeightage", "soil_type")) + """) + 
      (COALESCE(casing_inhibitor_score, 0) * 
      """ + str(get_weight("ElementWeightage", "casing_inhibitor")) + """) +
      (COALESCE(Casing_Shorted_Score, 0) * 
      """ + str(get_weight("ElementWeightage", "casing_is_shorted")) + """) + 
      (COALESCE(casing_type_score, 0) * 
      """ + str(get_weight("ElementWeightage", "casing_type")) + """) + 
      (COALESCE(land_use_score, 0) * 
      """ + str(get_weight("ElementWeightage", "land_use")) + """) + 
      (COALESCE(exposed_pipe_score, 0) * 
      """ + str(get_weight("ElementWeightage", "exposed_pipe")) + """))
      * """ + str(get_multiplier('ElementWeightage', issue, 'baseline susceptibility')) + """ AS baseline_susceptibility_group_score,
       
    ((COALESCE(pipe_installation_date_score, 0) * 
      """ + str(get_weight("ElementWeightage", "pipe_installation_date")) + """) + 
      (COALESCE(pipe_wall_thickness_score, 0) * 
      """ + str(get_weight("ElementWeightage", "pipe_wall_thickness_spec")) + """) +
      (COALESCE(maximum_operating_pressure_score, 0) * 
      """ + str(get_weight("ElementWeightage", "maximum_operating_pressure_spec")) + """) + 
      (COALESCE(pipe_outside_diameter_score, 0) * 
      """ + str(get_weight("ElementWeightage", "pipe_outside_diameter_spec")) + """))
      * """ + str(get_multiplier('ElementWeightage', issue, 'pipe specification')) + """ AS pipe_specifications_group_score
      
  FROM 
  """ + input_table).createOrReplaceTempView(result_table)

# roll up data groups into an overall score for a given issue
def execute_full_score_rollup_sql(input_table, result_table, issue):
  spark.sql("""
  SELECT
    x.Company_Code AS Company_Code,
    x.Route AS Route,
    (
      """ + get_all_group_weight_sql("x.", issue) + """
    )  AS Route_""" + issue + """_Score, 
    """ + get_data_groups_sql("x.", 1) + """,
    """ + get_data_elements_sql("x.", None, 1) + """,
    x.nullmarker
  FROM 
    """ + input_table + """ x
    INNER JOIN (
      SELECT 
        Company_Code,
        Route,
        Data_Confidence_Calculation_Method AS 2_Pct,
        1 - Data_Confidence_Calculation_Method AS 1_Pct
      FROM
         Availability_Route_Score
    ) AS Method ON Method.Company_Code = x.Company_Code AND
        Method.Route = x.Route
""").createOrReplaceTempView(result_table)

In [9]:
### 7. HELPER FUNCTIONS
def see(query, n = 200):
  spark.sql(query).show(n, truncate = False)
  
def list_to_string(x):
  output = ""
  
  for elem in x:
    output += elem + ","
  
  return output[:-1]

def drop_temp_tables(list_of_tables):
  for table in list_of_tables:
    spark.catalog.dropTempView(table)

# see how many different values a column takes on
def distinct_values(table, column):
  return spark.sql("SELECT COUNT(DISTINCT " + column + ") AS distinct_" + column + "s FROM " + table).show()

def sum_col(df, col):
  return df.select(sum(col)).collect()[0][0]
  
def avg_col(df, col):
  return df.select(avg(col)).collect()[0][0]
  
def get_weighted_avg(table, col):
  return spark.sql("SELECT SUM(" + col + " * Length_Ft) / SUM(Length_Ft) AS weighted_avg FROM " + table).collect()[0].__getitem__("weighted_avg")

def get_max_date(table, col):
  return spark.sql("SELECT MAX(" + col + ") AS " + col + " FROM " + table).collect()[0][0]

In [10]:
### DEPRICATED FUNCTION
# Design changes rendered these obsolete, but the thing about design changes is that they are often reversed

def get_dc_data_groups_product(table1_prefix, table2_prefix):
  sql = ""
  
  for col in group_cols:
    sql += "COALESCE(" + table1_prefix + col + ", 1) * COALESCE(" + table2_prefix + col + ", 1) / 100 AS " + col + ", "
    
  return sql[:-2]

def get_dc_data_elements_product(table1_prefix, table2_prefix):
  sql = ""
  
  for col in score_cols:
    sql += "COALESCE(" + table1_prefix + col + ", 1) * COALESCE(" + table2_prefix + col + ", 1) AS " + col + ", "
    
  return sql[:-2]

# assign color numbers to a table based on risk and data confidence scores
def assign_color_numbers(table_name, conf_column, risk_column):
  return spark.sql("""
    SELECT
      *,
      CASE 
        WHEN """ + conf_column + """ """ + str(get_color_attribute("ColorGuide", "dc_operator", 'red')) + str(get_color_attribute("ColorGuide", "dc_threshold", 'red')) + """ AND 
          """ + risk_column + str(get_color_attribute("ColorGuide", "risk_operator", 'red')) + str(get_color_attribute("ColorGuide", "risk_threshold", 'red')) + """ 
          THEN 3
        WHEN """ + conf_column + """ """ + str(get_color_attribute("ColorGuide", "dc_operator", 'maroon')) + str(get_color_attribute("ColorGuide", "dc_threshold", 'maroon')) + """ AND 
          """ + risk_column + str(get_color_attribute("ColorGuide", "risk_operator", 'maroon')) + str(get_color_attribute("ColorGuide", "risk_threshold", 'maroon')) + """ 
          THEN 2
        WHEN """ + conf_column + """ """ + str(get_color_attribute("ColorGuide", "dc_operator", 'blue')) + str(get_color_attribute("ColorGuide", "dc_threshold", 'blue')) + """ AND 
          """ + risk_column + str(get_color_attribute("ColorGuide", "risk_operator", 'blue')) + str(get_color_attribute("ColorGuide", "risk_threshold", 'blue')) + """ 
          THEN 0
        ELSE 1
      END AS Color_Number
    FROM """ + table_name)