In [1]:
# Imports
from pyspark.sql.functions import lit
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from datetime import date
from pyspark.sql.types import LongType, IntegerType, DecimalType, BooleanType, DateType, StringType, StructField, StructType

In [15]:
# Inputs
batch_id = ""
file_name = ""
directory = ""
provider = ""
data_source = ""

database_password = ""
blob_account_key = ""
blob_sas_token = ""

In [16]:
%run Library/functions

In [17]:
configs = config_get(database_password, blob_account_key, blob_sas_token)

In [18]:
df = file_load(configs, 
    directory, 
    file_name, 
    "|"
  )

display(df)

In [19]:
sql = "(SELECT Id AS StoreFundId, FundCode AS StoreFundCode FROM store.entity_all_fund) fund"
df_fund = sql_dataframe(configs, sql)

sql = "(SELECT Id AS StorePolicyId, PolicyNumber AS StorePolicyNumber FROM store.entity_all_policy) policy"
df_policy = sql_dataframe(configs, sql)

sql = "(SELECT Id AS StoreProductId, ProductCode AS StoreProductCode FROM store.entity_all_product) product"
df_products = sql_dataframe(configs, sql)

sql = "(SELECT Id AS StoreUnitHolderId, UnitHolderCode AS StoreUnitHolderCode FROM store.entity_all_unit_holder) unit_holder"
df_unit_holder = sql_dataframe(configs, sql)    

sql = "(SELECT Id AS StoreDistributionId, DistributionNumber AS StoreDistributionNumber FROM store.fact_fund_distribution) distribution"
df_distribution = sql_dataframe(configs, sql)

sql = "(SELECT Id AS StoreDistMasterId, DistributionComponentCode AS StoreDistMasterCompCode FROM reference.distribution_component_master) distcompmast"
df_dc_master = sql_dataframe(configs, sql)

In [25]:
df = df.join(df_products, df["ProductId"] == df_products["StoreProductCode"], "left")

df = df.join(df_policy, df["PolicyNumber"] == df_policy["StorePolicyNumber"], "left")

df = df.join(df_unit_holder, df["UnitHolderId"] == df_unit_holder["StoreUnitHolderCode"], "left")

df = df.join(df_fund, df["FundId"] == df_fund["StoreFundCode"], "left")

df = df.join(df_distribution, df["DistributionNumber"] == df_distribution["StoreDistributionNumber"], "left")

df = df.join(df_dc_master, df["DistributionComponentCode"] == df_dc_master["StoreDistMasterCompCode"], "left")

# display(df)

In [26]:
df_mapped = df.select([
    lit(int(0)).alias("Id").cast(LongType()),
    lit(batch_id).alias("BatchId"),
    lit(col("DistributionNumber").isNotNull() & col("DistributionDate").isNotNull()).alias("IsValid"),
    lit(provider).alias("Provider"),
    lit(data_source).alias("DataSource"),

    col("StoreUnitHolderId").alias("UnitHolderId").cast(LongType()),
    col("StoreProductId").alias("ProductId").cast(LongType()),
    col("StoreFundId").alias("FundId").cast(LongType()),
    col("StorePolicyId").alias("PolicyId").cast(LongType()),
    col("StoreDistributionId").alias("DistributionId").cast(LongType()),
    col("StoreDistMasterId").alias("DistributionComponentMasterId").cast(LongType()),
    
    col("PolicyNumber"),
    
    col("DistributionComponentCode"),
	col("DistributionComponentName"),
	col("DistributionNumber").cast(LongType()),
	col("DistributionDate").cast(DateType()),
	col("DistributionComponentAmount").cast(DecimalType(20,5)),
	col("DistributionComponentCPU").cast(DecimalType(20,5)),
	col("DistributionComponentUnits").cast(DecimalType(20,5)),
	col("Currency"),
	col("ReversalIndicator"),
	col("ComponentTax").cast(IntegerType()),
	col("PaymentNumber"),
	col("FactPolicyDistributionComponentTransactionKey").cast(LongType()),
	col("ProductKey").cast(LongType()),
	col("PolicyKey").cast(LongType()),
	col("UnitHolderKey").cast(LongType()),
	col("FundKey").cast(LongType()),
	col("DistributionComponentKey").cast(LongType())])

# display(df_mapped)

In [28]:
sql_append(configs, df_mapped, "stage.fact_policy_distribution_component")