In [0]:
# Assumtions Being Made :
# All Data is coming fom RDS and the staging is from S3
# All staging tables are the same as RDS with a staging timestamp added

# ** Source for OLAP is S3 and Target is Hive Tables
# ** All tables in Source and Target have the same number of columns when negating record_active_flag , start_date and end_date or evqivalent and the names need to be exactly same
# ** Stage Tables need to be joined before feeding it to any framework

In [0]:
import urllib
import sys
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window
import datetime

MOUNT_NAME = "/mnt/election-project-stage"
op_MOUNT_NAME = "/mnt/election-project-olap"
date_today = str(datetime.datetime.now()).split()[0]
try:
    dbutils.fs.ls(MOUNT_NAME)
except:
    file_type = "csv"
    first_row_is_header = "true"
    delimiter = ","
    aws_keys_df = (
        spark.read.format(file_type)
        .option("header", first_row_is_header)
        .option("sep", delimiter)
        .load("dbfs:/FileStore/yahoo_rama_accessKeys.csv")
    )
    ACCESS_KEY = aws_keys_df.select("Access key ID").collect()[0]["Access key ID"]
    SECRET_KEY = aws_keys_df.select("Secret access key").collect()[0][
        "Secret access key"
    ]
    ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")
    AWS_S3_BUCKET = "election-project-stage"
    SOURCE_URL = "s3n://{0}:{1}@{2}".format(
        ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET
    )
    dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)

try:
    dbutils.fs.ls(op_MOUNT_NAME)
except:
    file_type = "csv"
    first_row_is_header = "true"
    delimiter = ","
    aws_keys_df = (
        spark.read.format(file_type)
        .option("header", first_row_is_header)
        .option("sep", delimiter)
        .load("dbfs:/FileStore/yahoo_rama_accessKeys.csv")
    )
    ACCESS_KEY = aws_keys_df.select("Access key ID").collect()[0]["Access key ID"]
    SECRET_KEY = aws_keys_df.select("Secret access key").collect()[0][
        "Secret access key"
    ]
    ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")
    op_AWS_S3_BUCKET = "election-project-olap"
    op_SOURCE_URL = "s3n://{0}:{1}@{2}".format(
        ACCESS_KEY, ENCODED_SECRET_KEY, op_AWS_S3_BUCKET
    )
    dbutils.fs.mount(op_SOURCE_URL, op_MOUNT_NAME)



In [0]:
def get_data(input_, scd_type=2):
    """Gets Data From S3 as well as RDS Tables based on the Table name and SCD type in the input"""
    global table
    global df
    global df_dim
    global table_old
    table = "_".join(input_.lower().split())
    if (
        table == "election_result_fact"
        or table.lower()[-4:] == "fact"
        or table.lower()[-6:] == "result"
        or table.lower()[:8] == "election"
    ):
        table = "election_result"
        df = (
            spark.read.format("parquet")
            .option("inferSchema", "true")
            .option("header", "true")
            .option("sep", ",")
            .load(
                f"{MOUNT_NAME}/counting_stage/as_on_dt={date_today}/counting_stage_parquet/"
            )
        )
        df_dim = (
            spark.read.format("parquet")
            .option("inferSchema", "true")
            .option("header", "true")
            .option("sep", ",")
            .load(
                f"{op_MOUNT_NAME}/election-project-olap/election_result_fact_olap"
            )
        )
        dim_columns = df_dim.columns
    else:
        df = (
            spark.read.format("parquet")
            .option("inferSchema", "true")
            .option("header", "true")
            .option("sep", ",")
            .load(
                f"/mnt/election-project-stage/{table}_stage/as_on_dt={date_today}/{table}_stage_parquet/"
            )
        )
        if scd_type == 1:
            df_dim = spark.read.format("delta").load(
                f"dbfs:/user/hive/warehouse/election_olap_scd_type_one.db/{table}_olap_scd_1"
            )
        elif scd_type == 2:
            df_dim = (
            spark.read.format("parquet")
            .option("inferSchema", "true")
            .option("header", "true")
            .option("sep", ",")
            .load(
                f"{op_MOUNT_NAME}/election-project-olap/{table}_olap/"
            )
        )
#             df_dim = spark.read.format("delta").load(
#                 f"dbfs:/user/hive/warehouse/election_olap.db/{table}_olap"
#             )
        elif scd_type == 3:
            df_dim = spark.read.format("delta").load(
                f"dbfs:/user/hive/warehouse/election_olap_scd_type_three.db/{table}_olap_scd_3"
            )
        else:
            print("Wrong SCD Type entered")
            sys.exit()
        dim_columns = df_dim.columns
        for value in dim_columns:
            df_dim = df_dim.withColumnRenamed(value, "dim_" + value.lower())

        if table == "candidates":
            table_old = table[:]
            table = table[:-1]
        else:
            table = table
            table_old = table
    return table

In [0]:
def col_compare(src_col1 , tgt_col2):
    """Compares Two Columns and Flaghs 'U' if there is a CChange and Flags 'R' if there is no Change"""
    if src_col1.lower() == tgt_col2.lower():
        value = "R"
    else:
        value = "U"
    return value

cand_udf = F.udf(lambda x , y : col_compare(x , y))

def erase_dim_from_col(df):
    """Erases 'dim_' From column names of each column in the input table"""
    for value in df.columns:
        df = df.withColumnRenamed(value , value[4:])
    return df

In [0]:
def olap_insert(df , df_dim):
    """'Left Joins' the Source and Target Tables on the basis of table_id and filters the 
    join on the basis of null record_keys for the corresponding table_id and 
    finally renames table_timestamp to table_oltp_timestamp"""
    
    insert = df. \
    join(df_dim , 
         F.col([value 
                for value 
                in df.columns 
                if value[-2:] == "id"][0]) == F.col([value 
                                                     for value 
                                                     in df_dim.columns 
                                                     if value[-2:] == "id"][0]) , "left"). \
    filter(F.isnull(F.col(f"dim_{table}_record_key"))). \
    select(df.columns). \
    withColumnRenamed(f"{table}_timestamp" , f"{table}_oltp_timestamp")
    return df

def olap_update(df , df_dim):
    """'Left Joins' the Source and Target Tables on the basis of table_id and 
    filters the join on the basis of NOT NULL record_keys 
    for the corresponding table_id in Target table and finally 
    creates a flag by comparing all the 
    non-key , non-flag and non-datetime columns with 
    R if no change and U if there are any changes"""

    update_df = df. \
    join(df_dim , F.col([value 
                         for value 
                         in df.columns 
                         if value[-2:] == "id"][0]) == F.col([value 
                                                              for value 
                                                              in df_dim.columns 
                                                              if value[-2:] == "id"][0]) , "left"). \
    filter(~F.isnull(F.col(f"dim_{table}_record_key"))). \
    withColumn("update_stage_parameter" , 
               F.concat_ws("_" , *[value 
                                   for value 
                                   in df.columns 
                                   if value.split("_")[-1] != "timestamp"])). \
    withColumn("update_dim_parameter" , 
               F.concat_ws("_" , 
                           *[value for value in df_dim.columns 
                             if value.lower().split("_")[-1] != "timestamp" 
                             and value.lower().split("_")[-1] != "key" 
                             and value.lower().split("_")[-1] != "flag" 
                             and value.lower().split("_")[-1] != "date"])). \
    withColumn("update_flag" , cand_udf(F.col("update_stage_parameter") , F.col("update_dim_parameter")))
    return update_df

In [0]:
def scd_general_joiner(table , df , df_dim , scd_type = 2):
    """Joins the nessesary tables and gives the required output for the election project and 
    changes name of table_timestamp to table_oltp_timestamp to allign it with OLAP tables 
    for all input tables"""
    global records_select
    records_select = [value[4:] for value in df_dim.columns if value.lower().split("_")[-1] != "key"]
    if table == "candidate":
        df_crime = spark.read.format("parquet") \
        .option("inferSchema", "true") \
        .option("header", "true") \
        .option("sep", ",") \
        .load(f"/mnt/election-project-stage/candidates_has_crime_record_stage/as_on_dt={date_today}/candidates_has_crime_record_stage.parquet/")
        df_ed = spark.read.format("parquet") \
        .option("inferSchema", "true") \
        .option("header", "true") \
        .option("sep", ",") \
        .load(f"/mnt/election-project-stage/candidate_education_level_stage/as_on_dt={date_today}/candidate_education_level_stage.parquet/")

        df = df_crime. \
        groupby("candidates_candidate_id"). \
        agg(F.count("crime_record_crime_id").
            alias("candidate_no_of_crimes")). \
        join(df , df_crime.candidates_candidate_id == df.candidate_id , "inner"). \
        join(df_ed , df_ed.education_level == df.education_level_candidate_education_level , "inner"). \
        select("candidate_id" , "candidate_name" , "candidate_spouse" , 
               "candidate_no_of_crimes" , "candidate_dob" , 
               "candidate_net_worth" , "highest_attained_education_level" ,
               "candidates_timestamp" , "candidates_stage_timestamp"). \
        withColumnRenamed("candidates_timestamp" , "candidate_oltp_timestamp"). \
        withColumnRenamed("highest_attained_education_level" , "candidate_highest_education_level")
    elif table == "constituency":
        df_risk = spark.read.format("parquet") \
        .option("inferSchema", "true") \
        .option("header", "true") \
        .option("sep", ",") \
        .load(f"/mnt/election-project-stage/risk_stage/as_on_dt={date_today}/risk_stage.parquet/")

        df = df. \
        join(df_risk , F.col("risk_level") == F.col("constituency_risk_level") , "inner"). \
        select("constituency_id" , "constituency_name" , "district_of_constituency" , 
               "average_literacy_rate_in_constituency" , "risk_status" , 
               "constituency_timestamp" , "constituency_stage_timestamp"). \
        withColumnRenamed("average_literacy_rate_in_constituency" , "average_constituency_literacy_rate") .\
        withColumnRenamed("risk_status" , "constituency_risk"). \
        withColumnRenamed(f"{table}_timestamp" , f"{table}_oltp_timestamp")
    elif table == "election_result":
        df_cand_const = spark.read.format("parquet") \
        .option("inferSchema", "true") \
        .option("header", "true") \
        .option("sep", ",") \
        .load(f"/mnt/election-project-stage/candidates_has_constiuncy_stage/as_on_dt={date_today}/candidates_has_constiuncy_stage.parquet/")

        df_voters = spark.read.format("parquet") \
        .option("inferSchema", "true") \
        .option("header", "true") \
        .option("sep", ",") \
        .load(f"/mnt/election-project-stage/voters_stage/as_on_dt={date_today}/voters_stage.parquet/")

        df_cand = spark.read.format("parquet") \
        .option("inferSchema", "true") \
        .option("header", "true") \
        .option("sep", ",") \
        .load(f"/mnt/election-project-stage/candidates_stage/as_on_dt={date_today}/candidates_stage.parquet/")

        df = df_voters. \
        select("voter_id" , "polling_station_voter_polling_station_ID"). \
        groupby("polling_station_voter_polling_station_ID"). \
        agg(F.count("voter_id").alias("polling_station_no_of_voters")). \
        join(df , df.polling_station_polling_station_id == df_voters.polling_station_voter_polling_station_ID). \
        join(df_cand_const , df.candidate_candidate_id == df_cand_const.candidates_candidate_id). \
        join(df_cand , df_cand.candidate_id == df.candidate_candidate_id). \
        withColumnRenamed("election_id" , "polling_station_election_id"). \
        withColumnRenamed("election_date" , "polling_station_election_date"). \
        withColumnRenamed("vote_count" , "polling_station_vote_count"). \
        select([value for value in df_dim.columns if value.lower().split("_")[-1] != "key"])
    elif table == "polling_station":
        df = df.drop("constituency_constituency_id").withColumnRenamed(f"{table}_timestamp" , f"{table}_oltp_timestamp")
    else :
        df = df. \
        withColumnRenamed(f"{table}_timestamp" , f"{table}_oltp_timestamp")
    return df

In [0]:
def scd_type_one_frame_work(table , df , df_dim):
    """Works SCD type one for input Source and Target Tables"""
    if table == "election_result":
        df.createOrReplaceTempView("df_part_1")
        df_dim.createOrReplaceTempView("df_part_2")

        return spark.sql(f"""SELECT (
        SELECT CASE 
        WHEN 
        ISNULL((SELECT MAX(election_result_key) AS max_ FROM df_part_2))
        THEN 0
        ELSE 
        (SELECT MAX(election_result_key) AS max_ FROM df_part_2)
        END) + ROW_NUMBER() OVER(ORDER BY polling_station_election_id, 
        polling_station_polling_station_id , constituency_constituency_id , 
        candidates_candidate_id , candidates_candidate_id) 
        AS election_result_key, * 
        FROM df_part_1""")

    else:
        insert = olap_insert(df , df_dim)
        insert = insert. \
        withColumn(f"{table}_olap_timestamp" , F.current_timestamp())

        insert.createOrReplaceTempView("df_part_1")

        update_df = olap_update(df , df_dim)
        update_df = update_df.filter(F.col("update_flag") == "U"). \
                select(df.columns).withColumn(f"{table}_olap_timestamp" , F.current_timestamp())

        update_df.createOrReplaceTempView("df_part_2")

        spark.sql(
            """SELECT * 
            FROM df_part_1"""). \
        unionByName(spark.sql(
            """SELECT * 
            FROM df_part_2""")).createOrReplaceTempView("df_part_1")

        final_op = spark.sql(f"""SELECT 
        ROW_NUMBER() OVER(ORDER BY {table}_id) AS {table}_record_key, * 
        FROM df_part_1""")
    return final_op

In [0]:
def scd_type_two_frame_work(table , df , df_dim):
    """Works SCD type two for input Source and Target Tables"""
    if table == "election_result":
        df.createOrReplaceTempView("df_part_1")
        df_dim.createOrReplaceTempView("df_part_2")

        return spark.sql(f"""SELECT (
        SELECT CASE 
        WHEN 
        ISNULL((SELECT MAX(election_result_key) AS max_ FROM df_part_2))
        THEN 0
        ELSE 
        (SELECT MAX(election_result_key) AS max_ FROM df_part_2)
        END) + ROW_NUMBER() OVER(ORDER BY polling_station_election_id, polling_station_polling_station_id , constituency_constituency_id , candidates_candidate_id , candidates_candidate_id) AS election_result_key, * 
        FROM df_part_1""")

    else:
        alter_dob = [value for value in df_dim.columns if value.lower().split("_")[-1] == "dob"]
        if alter_dob != []:
            for value in alter_dob:
                df_dim = df_dim.withColumn(value , F.col(value).astype(T.DateType()))
        
        insert = df. \
    join(df_dim , 
         F.col([value 
                for value 
                in df.columns 
                if value[-2:] == "id"][0]) == F.col([value 
                                                     for value 
                                                     in df_dim.columns 
                                                     if value[-2:] == "id"][0]) , "left"). \
    filter(F.isnull(F.col(f"dim_{table}_record_key"))). \
    select(df.columns). \
    withColumnRenamed(f"{table}_timestamp" , f"{table}_oltp_timestamp")
        update_insert = df. \
        join(df_dim , F.col([value 
                             for value 
                             in df.columns 
                             if value[-2:] == "id"][0]) == F.col([value 
                                                                  for value 
                                                                  in df_dim.columns 
                                                                  if value[-2:] == "id"][0]) , "left"). \
        filter(~F.isnull(F.col(f"dim_{table}_record_key"))). \
        select(df.columns). \
        withColumnRenamed(f"{table}_timestamp" , f"{table}_oltp_timestamp")

        update_df = olap_update(df , df_dim)

        update_update = update_df. \
        filter(F.col("update_flag") == "U"). \
        select(df_dim.columns). \
        withColumn("dim_record_Active_Flag" , F.lit(0)). \
        withColumn(f"dim_{table}_record_end_date" , F.current_timestamp())

        op = update_df.filter(F.col("update_flag") == "R").select(df_dim.columns)
        
        update_update = erase_dim_from_col(update_update)
        op = erase_dim_from_col(op)

        records_to_update = list(map(lambda x : x[0] , op.select(f"{table}_id").collect()))

        df_part_1 = insert. \
        union(update_insert. \
              filter((~F.col(f"{table}_id").isin(records_to_update)))). \
        withColumn("record_Active_Flag" , F.lit(1)). \
        withColumn(f"{table}_record_start_date" , F.current_timestamp()). \
        withColumn(f"{table}_record_end_date" , F.to_date(F.lit("9999-12-31 00:00:00") , "yyyy-MM-dd HH:mm:ss")). \
        select(records_select)
        df_part_1.createOrReplaceTempView("df_part_1")

        df_part_2 = op.unionByName(update_update)
        df_part_2.createOrReplaceTempView("df_part_2")

        final_op = spark.sql("""SELECT * FROM df_part_2"""). \
        unionByName(spark.sql(f"""SELECT (
        SELECT CASE WHEN ISNULL((
        SELECT MAX({table}_record_key) AS max_ FROM df_part_2)) 
        THEN 0 
        ELSE (
        SELECT MAX({table}_record_key) AS max_ 
        FROM df_part_2) END) + ROW_NUMBER() OVER(
        ORDER BY {table}_id) AS {table}_record_key, * 
        FROM df_part_1"""))
        return final_op

In [0]:
def scd_type_three_frame_work(table , df , df_dim):
    """Works SCD type three for input Source and Target Tables"""
    if table == "election_result":
        df.createOrReplaceTempView("df_part_1")
        df_dim.createOrReplaceTempView("df_part_2")

        return spark.sql(f"""SELECT (
        SELECT CASE 
        WHEN 
        ISNULL((SELECT MAX(election_result_key) AS max_ FROM df_part_2))
        THEN 0
        ELSE 
        (SELECT MAX(election_result_key) AS max_ FROM df_part_2)
        END) + ROW_NUMBER() OVER(ORDER BY 
        polling_station_election_id, polling_station_polling_station_id , 
        constituency_constituency_id , candidates_candidate_id , 
        candidates_candidate_id) 
        AS election_result_key, * FROM df_part_1""")

    else:
        update_df = olap_update(df , df_dim). \
                filter(F.col("update_flag") == "U"). \
                withColumn(f"{table}_oalp_old_timestamp" , F.col(f"dim_{table}_oalp_timestamp")). \
                withColumn(f"{table}_oalp_timestamp" , F.current_timestamp()). \
               select([value[4:] 
                       for value in df_dim.columns 
                       if value.split("_")[-1] != "key" 
                       and value.split("_")[-1] != "flag"])


        insert = olap_insert(df , df_dim). \
                withColumn(f"{table}_oalp_timestamp" , F.current_timestamp()). \
                withColumn(f"{table}_oalp_old_timestamp" , F.lit(None)). \
        where(~F.col(f"{table}_id").isin(list(map(lambda x : x[0] , update_df.select(f"{table}_id").collect()))))
        
        insert.createOrReplaceTempView("df_part_1")
        update_df.createOrReplaceTempView("df_part_2")

        spark.sql("""SELECT * FROM df_part_1"""). \
        unionByName(
            spark.sql("""SELECT * FROM df_part_2""")) \
        .createOrReplaceTempView("df_part_1")


        erase_dim_from_col(df_dim).createOrReplaceTempView("df_part_2")

        final_op = spark.sql(f"""SELECT ROW_NUMBER() OVER(
        ORDER BY {table}_id) AS {table}_record_key, * 
        FROM df_part_1""")
    return final_op

In [0]:
def scd_over_write_to_hive(mode  , location , name , final_op):
    if name.split("_")[0].lower() == "candidate":
        name = name + "s"
    elif name == "election_result_olap":
        name = "election_result_fact"
    else:
        name = name
    list_of_columns_to_change_dtypes = [value[0] for value in final_op.dtypes if value[1] == "long" or value[1] == "bigint"]
    if list_of_columns_to_change_dtypes != []:
        for value in list_of_columns_to_change_dtypes:
            final_op = final_op.withColumn(value , F.col(value).astype(T.IntegerType()))
    """Overwrites data in Target hive Tables with the processed Tables"""
    final_op.write.format(mode).mode("overwrite").save(location + "/" + name)
    # "delta"   f"/tmp/delta/metastore.election_olap.{table}_olap"
    return print("Done")

In [0]:
# dbutils.widgets.combobox(name = "star_selector" , label="Choose the OLAP Table" , defaultValue="" , choices=["candidates" , "party" ,
#                                                                              "constituency" , "polling_station", 
#                                                                             "election_result_fact"])

# dbutils.widgets.combobox(name = "scd_selector" , label="Choose the SCD Type" , defaultValue="" , choices=["1","2","3"])


In [0]:
def scd_job_run(scd_type , table_name):
    """Runs all Nessesary Jobs based on Table Names and SCD Type as the input"""
    if scd_type == 1:
        try:
            try:
                table = get_data(table_name , scd_type)
            except Exception as ex:
                print(f"Error has Occurred , {ex}")
                sys.exit()
            try:
                df1 = scd_general_joiner(table , df , df_dim , scd_type)
            except Exception as ex:
                print(f"Error has Occurred , {ex}")
                sys.exit()
            try:
                op = scd_type_one_frame_work(table , df1 , df_dim)
            except Exception as ex:
                print(f"Error has Occurred , {ex}")
                sys.exit()
            return op
        except:
            return print("There was an Error while Executing")
    elif scd_type == 2:
        try:
            try:
                table = get_data(table_name , scd_type)
            except Exception as ex:
                print(f"Error has Occurred , {ex}")
                sys.exit()
            try:
                df1 = scd_general_joiner(table , df , df_dim , scd_type)
            except Exception as ex:
                print(f"Error has Occurred , {ex}")
                sys.exit()
            try:
                op = scd_type_two_frame_work(table , df1 , df_dim)
            except Exception as ex:
                print(f"Error has Occurred , {ex}")
                sys.exit()
            return op
        except:
            return print("There was an Error while Executing")
    elif scd_type == 3:
        try:
            try:
                table = get_data(table_name , scd_type)
            except Exception as ex:
                print(f"Error has Occurred , {ex}")
                sys.exit()
            try:
                df1 = scd_general_joiner(table , df , df_dim , scd_type)
            except Exception as ex:
                print(f"Error has Occurred , {ex}")
                sys.exit()
            try:
                op = scd_type_three_frame_work(table , df1 , df_dim)
            except Exception as ex:
                print(f"Error has Occurred , {ex}")
                sys.exit()
            return op
        except:
            return print("There was an Error while Executing")
    else:
        print("Define the SCD Type")
        sys.exit()

In [0]:
def run_job(list_ = ["election_result_fact" , "candidates" , "party" , "constituency" , "polling_station"]):
    """Runs jobs for all tables for given list of table names if data is already given in get data function"""
    for value in list_:
            output = scd_job_run(2 , value)
            output.write.mode("overwrite").parquet(f"{op_MOUNT_NAME}/election-project-olap/{value}_olap/")
            print(f"{value} Done")
    return print("All Tables Done ,  Check S3 for Data")

In [0]:
run_job()

polling_station Done
All Tables Done ,  Check S3 for Data
