In [0]:


from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit

# Initialize Spark session
spark = SparkSession.builder.appName("InfaToDatabricks").getOrCreate()


def read_srm_resources():
    df_srm_resources = spark.read.table("metlife_gto.SRM_RESOURCES")
    return df_srm_resources


def read_prj_resources():
    df_prj_resources = spark.read.table("metlife_gto.PRJ_RESOURCES")
    return df_prj_resources


def read_odf_ca_resource():
    df_odf_ca_resource = spark.read.table("metlife_gto.ODF_CA_RESOURCE")
    return df_odf_ca_resource


def read_lkp_manager_id():
    query = "SELECT srm.ID as IN_ID, srm.USER_ID as lkp_USER_ID FROM metlife_gto.SRM_RESOURCES srm"
    df_lkp_manager_id = spark.sql(query)
    return df_lkp_manager_id


def read_lkup_clrty_lst_login_ts():
    df_lkup_clrty_lst_login_ts = spark.read.table("metlife_gto.CMN_SEC_USERS")
    return df_lkup_clrty_lst_login_ts


def read_lkup_srm_resources():
    df_lkup_srm_resources = spark.read.table("metlife_gto.SRM_RESOURCES")
    return df_lkup_srm_resources


def read_lkp_cmn_lookups_v():
    query = "SELECT LAST_UPDATED_DATE as cmn_LAST_UPDATED_DATE, NAME as cmn_NAME, ID as cmn_ID, LANGUAGE_CODE as cmn_LANGUAGE_CODE, LOOKUP_TYPE as cmn_LOOKUP_TYPE FROM metlife_gto.lkp_cmn_lookups_v"
    df_lkp_cmn_lookups_v = spark.sql(query)
    return df_lkp_cmn_lookups_v


def read_lkup_prtrackmode():
    query = "SELECT ID as prtrack_ID, LAST_UPDATED_DATE as prtrack_LAST_UPDATED_DATE, NAME as prtrack_NAME, LOOKUP_TYPE as prtrack_LOOKUP_TYPE, LOOKUP_CODE as prtrack_LOOKUP_CODE, LANGUAGE_CODE as prtrack_LANGUAGE_CODE FROM metlife_gto.lkup_pr_track_mode"
    df_lkup_prtrackmode = spark.sql(query)
    return df_lkup_prtrackmode


# Read SQ_SRM_RESOURCES
df_read_srm_resources = read_srm_resources()

# Read SQ_PRJ_RESOURCES
df_read_prj_resources = read_prj_resources()

# EXPTRANS2 - transformation
df_exptrans2 = (
    df_read_prj_resources.alias("prj")
    .withColumn("PRID", col("prj.PRID").cast("int"))
    .withColumnRenamed("LAST_UPDATED_DATE", "PRJ_RESOURCES_LAST_UPDATED_DATE")
)

# JNRTRANS - Join between SQ_SRM_RESOURCES and SQ_PRJ_RESOURCES
df_jnrtrans = df_read_srm_resources.join(
    df_exptrans2, df_read_srm_resources.ID == df_exptrans2.PRID, "left"
)

# Read SQ_ODF_CA_RESOURCE
df_read_odf_ca_resource = read_odf_ca_resource()

# EXPTRANS3 - transformation
df_exptrans3 = df_read_odf_ca_resource.withColumnRenamed(
    "ID", "ODF_CA_RESOURCE_ID"
).withColumnRenamed("LAST_UPDATED_DATE", "ODF_CA_RESOURCE_LAST_UPDATED_DATE")

# JNRTRANS1 - Join between JNRTRANS and EXPTRANS3
df_jnrtrans1 = df_jnrtrans.join(
    df_exptrans3, df_jnrtrans.ID == df_exptrans3.ODF_CA_RESOURCE_ID, "left"
)

# EXPTRANS - Setting defaults
df_exptrans = (
    df_jnrtrans1.withColumn("PRTRACKMODE", df_jnrtrans1.PRTRACKMODE.cast("char(10)"))
    .withColumn("out_SRM_RESOURCE_TYPE", lit("SRM_RESOURCE_TYPE"))
    .withColumn("out_prTrackMode", lit("prTrackMode"))
    .withColumn("out_en", lit("en"))
    .withColumnRenamed("LAST_UPDATED_DATE", "SRM_RESOURCES_LAST_UPDATED_DATE")
)

# Lkp_MANAGER_ID - Lookup to get manager id
df_lkp_manager_id_out = df_exptrans.join(
    read_lkp_manager_id().alias("mgr"),
    col("MANAGER_ID") == col("mgr.lkp_USER_ID"),
    "left",
).select(df_exptrans["*"], col("mgr.IN_ID"))

# lkp_CMN_LOOKUPS_V - Lookup to get SRM_RSRC_TYPE_NAME and LAST_UPDATED_DATE
df_lkp_cmn_lookups_v_out = df_lkp_manager_id_out.join(
    read_lkp_cmn_lookups_v(),
    (col("PERSON_TYPE") == col("cmn_ID"))
    & (col("out_SRM_RESOURCE_TYPE") == col("cmn_LOOKUP_TYPE"))
    & (col("out_en") == col("cmn_LANGUAGE_CODE")),
    "left",
).select(
    *[col(c) for c in df_lkp_manager_id_out.columns],
    col("cmn_NAME").alias("in_SRM_RSRC_TYPE_NAME"),
    col("cmn_LAST_UPDATED_DATE").alias("in_SRM_RSRC_TYPE_LAST_UPDATED_DATE"),
)

# lkup_prTrackMode - Lookup to get the prTrackMode_NAME and LAST_UPDATED_DATE
df_lkup_prtrackmode_out = df_lkp_cmn_lookups_v_out.join(
    read_lkup_prtrackmode(),
    (col("out_prTrackMode") == col("prtrack_LOOKUP_TYPE"))
    & (col("PRTRACKMODE") == col("prtrack_LOOKUP_CODE"))
    & (col("out_en") == col("prtrack_LANGUAGE_CODE")),
    "left",
).select(
    *[col(c) for c in df_lkp_cmn_lookups_v_out.columns],
    col("prtrack_name").alias("in_prTrackMode_NAME"),
    col("prtrack_LAST_UPDATED_DATE").alias("in_prTrackMode_LAST_UPDATED_DATE"),
)

# EXPTRANS1 - Transformations related to dates and Null handling
df_exptrans1 = df_lkup_prtrackmode_out

df_exptrans1.display()

# T_STG_RSRC_PER_with_PreSQL - Writting to table
df_exptrans1.write.mode("overwrite").saveAsTable("metlife_gto.T_STG_RSRC_PER")

# Stop Spark session
spark.stop()
     
