In [0]:
import json
import ast
import sys
from aurora_config import *
from log import syslog, sysError
from spark_conf import sparkConfig
from pyspark.sql import SQLContext
from pyspark.sql.functions import *

def main():
  #reading the input params from configuration file
  spark = sparkConfig()

  input_param = ast.literal_eval(sys.argv[1])
  hostname=input_param['ENV']
  outputbucket=input_param['S3BUCKET']
  tableconfig=input_param['INGESTIONFILE']
  driver=input_param['DRIVER']
  secrets=get_username_password(hostname,tableconfig)
  username=secrets[0]
  p=secrets[2]
  jdbc_url=secrets[1]
  connectionProperties = {
    "user" : username,
    "password" : p,
    "driver":driver
  }

  #reading the tables
  wc_vcats_data_e = spark.read.jdbc(url=jdbc_url, table='wc_vcats_data_e', properties=connectionProperties)
  wc_vcats_data_e = wc_vcats_data_e.select(col("*"), when(wc_vcats_data_e.plan_sub_type == "Fine Arts/Specialized Organiza", "Fine Arts/Specialized Organization").otherwise(
    wc_vcats_data_e.plan_sub_type).alias("plan_sub_type1")).drop(col("plan_sub_type")).withColumnRenamed("plan_sub_type1","plan_sub_type")
  wc_vcats_data_busln = spark.read.jdbc(url=jdbc_url, table='wc_vcats_data_busln', properties=connectionProperties)
  wc_vcats_data_n = spark.read.jdbc(url=jdbc_url, table='wc_vcats_data_n', properties=connectionProperties)

  #This function uses to process the fact tables by taking parameters  as table name and column from the configuration file
  def fact_table(table, col1):
    try:
      df_wc_vcats_data_fact = spark.read.jdbc(url=jdbc_url, table=table, properties=connectionProperties)
      if table == "wc_vcats_data_e":
        df_wc_vcats_data_fact = df_wc_vcats_data_fact.select(col("*"), when(df_wc_vcats_data_fact.plan_sub_type == "Fine Arts/Specialized Organiza", "Fine Arts/Specialized Organization").otherwise(
                                df_wc_vcats_data_fact.plan_sub_type).alias("plan_sub_type1")).drop(col("plan_sub_type")).withColumnRenamed("plan_sub_type1","plan_sub_type")

      bus_line = ["Institutional Investor Group", "FAS"]
      df_wc_vcats_data_fact_filtered = wc_vcats_data_busln.select("dim_key").distinct().where(col("bus_line_roll_up1").isin(bus_line))
      df_data_e_filtered = df_wc_vcats_data_fact.join(df_wc_vcats_data_fact_filtered, df_wc_vcats_data_fact[col1] == df_wc_vcats_data_fact_filtered["dim_key"], "inner").drop("dim_key")
      df_fact = df_data_e_filtered.coalesce(1).write.mode("overwrite").option("path", outputbucket + "/" + table).saveAsTable(
        "inst_dni_fin_mstr." + table)
      syslog('Total Number of records got ingested for {0}={1}'.format(table, df_data_e_filtered.count()))
      return df_fact
    except Exception as ex:
      sysError('Unable to ingest the data for Fact table ', ex)

  #This function uses to process the Dimension tables by taking parameters as table name and column names from the configuration file
  def dimensions_table(table, col1, col2):
    try:
      df_wc_vcats_dimension = spark.read.jdbc(url=jdbc_url, table=table, properties=connectionProperties)
      bus_line = ["Institutional Investor Group", "FAS"]
      df_wc_vcats_data_busln_filtered = wc_vcats_data_busln.select("dim_key").distinct().where(col('bus_line_roll_up1').isin(bus_line))
      df_data_e_filtered = wc_vcats_data_e.join(df_wc_vcats_data_busln_filtered,
                                                  wc_vcats_data_e["business_sub_line"] ==
                                                  df_wc_vcats_data_busln_filtered["dim_key"], "inner").select(col2)
      df_data_n_filtered = wc_vcats_data_n.join(df_wc_vcats_data_busln_filtered,
                                                  wc_vcats_data_n["business_sub_line"] ==
                                                  df_wc_vcats_data_busln_filtered["dim_key"], "inner").select(col2)
      df_data_dimension_filtered = df_data_e_filtered.union(df_data_n_filtered).distinct()
      df_data_dimension_finaljoin = df_wc_vcats_dimension.join(df_data_dimension_filtered,
                                                            df_wc_vcats_dimension[col1] == df_data_dimension_filtered[
                                                              col2], "inner").drop(df_data_dimension_filtered[col2])
      df_dimension = df_data_dimension_finaljoin.coalesce(1).write.mode('overwrite').option("path",outputbucket+"/"+table).saveAsTable("inst_dni_fin_mstr."+table)
      syslog('Total Number of records got ingested for  {0}={1}'.format(table,df_data_dimension_finaljoin.count()))
      return df_dimension
    except Exception as ex:
      sysError('Unable to ingest the data for Dimension table ', ex)

  # This function uses to process the wc_vcats_data_busln table by taking parameters as table name and column names from the configuration file
  def dwc_vcats_data_busln(table, col1):
    try:
      data_busln = spark.read.jdbc(url=jdbc_url, table=table, properties=connectionProperties)
      bus_line = ["Institutional Investor Group", "FAS"]
      df = data_busln.where(col(col1).isin(bus_line))
      df_busln = df.coalesce(1).write.mode('overwrite').option("path",outputbucket+"/"+table).saveAsTable("inst_dni_fin_mstr."+table)
      syslog('Total Number of records got ingested for {0}={1}'.format(table,df.count()))
      return df_busln
    except Exception as ex:
      sysError('Unable to ingest the data for Businessline table ', ex)
 # this function is used to process the wc_vcats_fund_ds table
  def dwc_vcats_fund_ds(table):
    try:
      fund_fs=spark.read.jdbc(url=jdbc_url, table=table, properties=connectionProperties)
      final_fund_fs=fund_fs.coalesce(1).write.mode('overwrite').option("path",outputbucket+"/"+table).saveAsTable("inst_dni_fin_mstr."+table)
      syslog('Total Number of records got ingested for   {0}={1}'.format(table, fund_fs.count()))
      return final_fund_fs
    except Exception as ex:
      sysError('Unable to ingest the data for fund table ', ex)
# this function is used to process the wc_vcats_cashflow_f and wc_vcats_siebel_pln_ds to get the vcats_plan_company_name table
  def vcats_plan_company_name(table1,table2):
    try:
      df_wc_vcats_cashflow_f=spark.read.jdbc(url=jdbc_url, table=table1, properties=connectionProperties)
      df_wc_vcats_siebel_pln_ds = spark.read.jdbc(url=jdbc_url, table=table2, properties=connectionProperties)
      df_wc_vcats_cashflow_startdate = df_wc_vcats_cashflow_f.withColumn("Date", trunc("AS_OF_DATE", "month"))
      df_wc_vcats_cashflow_col = (df_wc_vcats_cashflow_startdate.where(col("PLAN_NUM_WO_PREFIX") != "52686").
                                  select(col("Date"), col("PLAN_NUM_WO_PREFIX").alias("Plan__")))
      df_wc_vcats_siebel_pln_ds_col = (
        df_wc_vcats_siebel_pln_ds.where(col("BUS_LINE").isin("IAM", "IRPS", "Small Markets")).
        select(col("PLAN_NUM"), col("BUS_LINE").alias("Business_Line"), col("CO_NAME").alias("Company_Name"),
               col("PLAN_NAME").alias("Plan_Name")))
      df_joined = df_wc_vcats_cashflow_col.join(df_wc_vcats_siebel_pln_ds_col,df_wc_vcats_cashflow_col.Plan__ == df_wc_vcats_siebel_pln_ds_col.PLAN_NUM).drop("PLAN_NUM")
      df_distinct=df_joined.distinct()
      df_vcats_plan_company_name = df_distinct.coalesce(1).write.mode("overwrite").option("path",outputbucket+"/"+"vcats_plan_company_name").saveAsTable("inst_dni_fin_mstr."+"vcats_plan_company_name")
      syslog('Total Number of records got ingested for  {0}={1}'.format("vcats_Plan_Company_Name", df_distinct.count()))
      return df_vcats_plan_company_name

    except Exception as ex:
      sysError('Unable to ingest the data for vcats_plan_company_name table ', ex)

  # below code is used for parsing configuration json file to get the table names and column names
  with open(tableconfig) as json_file:
    data = json.load(json_file)
    table_list = data['tables']

  tabdict = {}
  for table in table_list:
    for key, value in table.items():
      tabdict[key] = value
  table_names = list(tabdict.keys())

  for i in range(len(table_names)):
    tablename = table_names[i]
    if tablename in tabdict.keys() and tabdict[tablename]['type'] == "fact":
      table = tabdict[tablename]['table']
      col1 = tabdict[tablename]['column'][0]
      fact_table(table, col1)
    elif tablename in tabdict.keys() and tabdict[tablename]['type'] == "Dimension" and tablename != "wc_vcats_data_busln" and tablename !="wc_vcats_fund_ds":
      table = tabdict[tablename]['table']
      col1 = tabdict[tablename]['column'][0]
      col2 = tabdict[tablename]['column'][1]
      dimensions_table(table, col1, col2)
    elif tablename in tabdict.keys() and tabdict[tablename]['type'] == "Dimension" and tablename == "wc_vcats_data_busln":
      table = tabdict[tablename]['table']
      col1 = tabdict[tablename]['column']
      dwc_vcats_data_busln(table, col1)
    elif tablename in tabdict.keys() and tabdict[tablename]['type']=="Dimension" and tablename=="wc_vcats_fund_ds":
      table=tabdict[tablename]['table']
      dwc_vcats_fund_ds(table)
    elif tablename in tabdict.keys() and tabdict[tablename]['type']=='Derived':
      table1 = tabdict[tablename]['table'][0]
      table2 = tabdict[tablename]['table'][1]
      vcats_plan_company_name(table1,table2)
    else:
      print("enter the correct table name")
    i += 1

if __name__ == '__main__':
    main()