In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, datediff, lit, year, month, count
from datetime import date
import psycopg2 
import os

In [2]:
def ncc_difference(stgSubsidyReportFact_df):
    df = stgSubsidyReportFact_df.withColumn("recieve_days", datediff(col("recieve_date"),col("start_subsidize")))
    final_df = df.withColumn("spend_diff", col("actual_spending")-col("subsidy_money"))
    return final_df

In [3]:
def grade_difference(povertyfact_df):
    # create window for ordered dataframe
    # create previous-value columns by "lag" function
    windowSpec = Window.partitionBy("family_code").orderBy(["family_code", "year", "b1_grade", "b2_grade"])
    temp_b1 = lag("b1_grade").over(windowSpec).cast("integer")
    temp_b2 = lag("b2_grade").over(windowSpec).cast("integer")

    # Calculate the difference between current and previous b1_grade, b2_grade
    df = povertyfact_df.withColumn("b1_diff", col('b1_grade') - temp_b1)
    df = df.withColumn("b2_diff", col('b2_grade') - temp_b2)
    return df

In [14]:
def count_member(povertyfact_df, member_df):
    # create a dataframe for number of member each family_id
    count_df = member_df.groupBy("family_id").count()

    # join povertyfact to count_df and assign "count" value to "member_num" -> drop "count" column at the end
    joined_df = povertyfact_df.join(count_df, on="family_id", how="left")
    updated_df = joined_df.withColumn("member_num", joined_df["count"])
    # updated_df = updated_df.na.fill(value=-1)
    final_df = updated_df.drop("count")
    return final_df

In [5]:
def find_age_member(memberSurveyFact_df):
    final_df = memberSurveyFact_df.withColumn('age', date.today().year - col('year_of_birth')) 
    return final_df

In [6]:
def join_ncc_hongheo(dimSurvey, dimFamilyMember, dimNCC, dimSubsidy):
    hongheo_df = dimSurvey.join(dimFamilyMember, on="family_id", how="right").select('member_id','full_name','identity_card_number','family_id',
                                                                                    'a_grade','b1_grade','b2_grade','final_result')

    dimNCC_tmp = dimNCC.where("rowiscurrent=True")
    dimSubsidy_tmp = dimSubsidy.where("rowiscurrent=True")
    ncc_df = dimNCC_tmp.join(dimSubsidy_tmp, on="profile_code", how="left").select('profile_code',dimNCC_tmp.ncc_code,dimNCC.full_name,'identity_number',
                                                                                   'subsidy_code','year','spend_type','subsidy_name','subsidy_money','submoney','recieve_date')

    final_df = ncc_df.join(hongheo_df, hongheo_df.identity_card_number==ncc_df.identity_number, how="left") \
                     .select('profile_code','ncc_code',ncc_df.full_name,'identity_number',
                             'subsidy_code','year','subsidy_money','recieve_date',
                             'member_id','family_id','a_grade','b1_grade','b2_grade','final_result')
    return final_df

In [7]:
def find_keys(spark, config, finalfact_df, businesskeys:dict): 
        dimMember = spark.read.format("jdbc") \
                .option("url", f"{config['URL_BASE_LOCAL']}:{config['PORT']}/NguoiDanDM") \
                .option("driver", f"{config['DRIVER']}") \
                .option("dbtable", 'public."DimFamilyMember"') \
                .option("user", f"{config['USER']}") \
                .option("password", f"{config['PASSWORD']}") \
                .load()

        dimSurvey = spark.read.format("jdbc") \
                .option("url", f"{config['URL_BASE_LOCAL']}:{config['PORT']}/NguoiDanDM") \
                .option("driver", f"{config['DRIVER']}") \
                .option("dbtable", 'public."DimSurvey"') \
                .option("user", f"{config['USER']}") \
                .option("password", f"{config['PASSWORD']}") \
                .load()

        dimNCC = spark.read.format("jdbc") \
                .option("url", f"{config['URL_BASE_LOCAL']}:{config['PORT']}/NguoiDanDM") \
                .option("driver", f"{config['DRIVER']}") \
                .option("dbtable", 'public."DimNCC"') \
                .option("user", f"{config['USER']}") \
                .option("password", f"{config['PASSWORD']}") \
                .load()
        
        
        # Filter dimNCC based on businesskeys and rowiscurrent
        key_dimNCC = dimNCC.where((dimNCC.profile_code.isin(businesskeys['ncc'])) & (dimNCC.rowiscurrent == True)) \
                                .select('profile_code', 'profilekey')
        key_dimMember = dimMember.where(dimMember.member_id.isin(businesskeys['member'])) \
                                .select('member_id', 'memberkey')
        key_dimSurvey = dimSurvey.where((dimSurvey.family_id.isin(businesskeys['survey'])) & (dimSurvey.rowiscurrent == True)) \
                                .select('family_id', 'surveykey')
                
        # Join finalfact_df with filtered_dimNCC on profile_code
        joined_df = finalfact_df.join(key_dimNCC, on='profile_code', how='left')
        joined_df = joined_df.join(key_dimMember, on='member_id', how='left')
        joined_df = joined_df.join(key_dimSurvey, on='family_id', how='left')
        
        # Select all columns and rename the joined profilekey column (optional)
        joined_df = joined_df.withColumn('year', year('recieve_date'))
        joined_df = joined_df.withColumn('month', month('recieve_date'))
        final_df = joined_df.withColumn('datekey', lit(year('recieve_date').cast('integer')*100 + month('recieve_date').cast('string')))
        
        return final_df

In [8]:
def value_of_subsidy(fact_df):
    temp_df = fact_df.select('profilekey','memberkey','surveykey','datekey','year','month',
                              'profile_code','ncc_code',fact_df.full_name,'identity_number',
                              'member_id','family_id','a_grade','b1_grade','b2_grade','final_result')
    
    # Count the total subsidy of each NCC
    grouped_df = temp_df.groupBy(['year','month','identity_number']).count()
    joined_df = temp_df.join(grouped_df, on=['identity_number','year','month'], how='left')
    count_df = joined_df.withColumn('total_subsidy', joined_df['count'])
    final_df = count_df.drop("count")
    
    # Sum the value of subsidy for each NCC
    grouped_df = fact_df.groupBy(['year','month','identity_number']).sum('subsidy_money')
    joined_df = temp_df.join(grouped_df, on=['identity_number','year','month'], how='right')
    sum_df = joined_df.withColumn('total_money', joined_df['sum(subsidy_money)']).drop('sum(subsidy_money)')
    
    # Combine
    final_df = final_df.join(sum_df, on='profilekey', how='left').select([final_df.profilekey,final_df.memberkey,final_df.surveykey,final_df.datekey,final_df.year,final_df.month,
                                                                           final_df.profile_code,final_df.ncc_code,final_df.full_name,final_df.identity_number,
                                                                           final_df.member_id,final_df.family_id,final_df.a_grade,final_df.b1_grade,final_df.b2_grade,final_df.final_result,
                                                                           'total_subsidy','total_money'])
    
    return final_df.distinct()

In [16]:
import json

with open("../../config.json", "r") as file:
        config = json.load(file)
    
spark = SparkSession.builder.appName("Test connect to Postgresql") \
                .config('spark.jars.packages', 'org.postgresql:postgresql:42.7.3') \
                .getOrCreate()

povertyfact_df = spark.read.format("jdbc") \
                .option("url", f"{config['URL_BASE_LOCAL']}:{config['PORT']}/LdtbxhStage") \
                .option("driver", f"{config['DRIVER']}") \
                .option("dbtable", 'public."stgPovertyStatusFact"') \
                .option("user", f"{config['USER']}") \
                .option("password", f"{config['PASSWORD']}") \
                .load()

member_df = spark.read.format("jdbc") \
                .option("url", f"{config['URL_BASE_LOCAL']}:{config['PORT']}/hongheovna") \
                .option("driver", f"{config['DRIVER']}") \
                .option("dbtable", 'public.family_member_info') \
                .option("user", f"{config['USER']}") \
                .option("password", f"{config['PASSWORD']}") \
                .load()

temp_df = grade_difference(povertyfact_df)
finalfact_df = count_member(temp_df, member_df)

print("====================")
print(finalfact_df.show(5))
print("====================")

with psycopg2.connect(
        database="LdtbxhStage",
        user=f"{config['USER']}",
        password=f"{config['PASSWORD']}",
        host=f"{config['HOST_LOCAL']}",
        port=f"{config['PORT']}"
) as conn:
        with conn.cursor() as cur:
                for row in finalfact_df.collect():
                        query = """UPDATE public."stgPovertyStatusFact"
                                SET
                                        member_num=%s,
                                        b1_diff=%s,
                                        b2_diff=%s
                                WHERE family_id=%s"""
                        cur.execute(query, (row['member_num'], row['b1_diff'], row['b2_diff'], row['family_id']))
                

+--------------------+----+-------------+-------------+-----------+----------+--------------------+--------------------+--------------------+----------+-------+--------+--------+-------+-------+------------+
|           family_id|year|province_name|district_name|family_code|owner_name|        hard_reasons|        get_policies|       need_policies|member_num|a_grade|b1_grade|b2_grade|b1_diff|b2_diff|final_result|
+--------------------+----+-------------+-------------+-----------+----------+--------------------+--------------------+--------------------+----------+-------+--------+--------+-------+-------+------------+
|c27fd2c3-42eb-45a...|2023|     Đắk Nông|       Cư Jút|       7003|  Độ A Đạt|[không có đất sản...|[hỗ trợ y tế, hỗ ...|[hỗ trợ nước sạch...|         1|   true|      30|      58|   NULL|   NULL|    hộ nghèo|
|643fd2d7-214b-479...|2023|     Đắk Nông|       Cư Jút|       7009|Phùng A Sử|[không có đất sản...|[hỗ trợ y tế, hỗ ...|[hỗ trợ nước sạch...|         2|   true|      45

In [10]:
# INSERT INTO "DimNCC"(profilekey) VALUES('00000000-0000-0000-0000-000000000000');
# INSERT INTO "DimSurvey"(surveykey) VALUES('00000000-0000-0000-0000-000000000000');
# INSERT INTO "DimFamilyMember"(memberkey) VALUES('00000000-0000-0000-0000-000000000000');

In [11]:
spark.stop()