In [11]:
from urllib.request import urlretrieve
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (
    SparkSession.builder.appName("project2 preprocessing")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "4g")
    .config("spark.executor.memory", "8g")
    .getOrCreate()
)


file_path=f"../data/landing/pop-income.csv"
output_path=f"../data/raw/test.csv"

df=spark.read.csv(file_path, header=True, inferSchema=True)

# Columns that should be included in every dataset
common_columns = ['FID', 'sa2_maincode_2016', 'geometry', 'sa2_name_2016', 'yr']
other_columns = "estmts_prsnl_incme_yr_endd_30_jne_emplye_ernrs_nm|estmts_prsnl_incme_yr_endd_30_jne_emplye_ernrs_mdn_age_yrs|estimates_personal_income_year_ended_30_june_total_employee_m|estimates_personal_income_year_ended_30_june_median_employee|estimates_personal_income_year_ended_30_june_mean_employee|estmts_prsnl_incme_yr_endd_30_jne_emplye_mn_srce_of_pc|estmts_prsnl_incme_yr_endd_30_jne_unncrprtd_bsnss_ernrs_nm|estm_prsnl_incme_yr_end_30_jne_uncrptd_bsn_erns_mdn_age_yrs|estmts_prsnl_incme_yr_endd_30_jne_ttl_unncrprtd_bsnss_m|estmts_prsnl_incme_yr_endd_30_jne_mdn_unncrprtd_bsnss|estmts_prsnl_incme_yr_endd_30_jne_mn_unncrprtd_bsnss|estmts_prsnl_incme_yr_endd_30_jne_unncrprtd_bsnss_mn_srce_of_pc|estmts_prsnl_incme_yr_endd_30_jne_invstmnt_ernrs_nm|estmts_prsnl_incme_yr_endd_30_jne_invstmnt_ernrs_mdn_age_yrs|estimates_personal_income_year_ended_30_june_total_investment_m|estimates_personal_income_year_ended_30_june_median_investment|estimates_personal_income_year_ended_30_june_mean_investment|estmts_prsnl_incme_yr_endd_30_jne_invstmnt_mn_srce_of_pc|estmts_prsnl_incme_yr_endd_30_jne_sprnntn_annty_ernrs_nm|estm_prsnl_incme_yr_end_30_jne_sprnt_anty_erns_mdn_age_yrs|estmts_prsnl_incme_yr_endd_30_jne_ttl_sprnntn_annty_m|estmts_prsnl_incme_yr_endd_30_jne_mdn_sprnntn_annty|estmts_prsnl_incme_yr_endd_30_jne_mn_sprnntn_annty|estmts_prsnl_incme_yr_endd_30_jne_sprnntn_annty_mn_srce_of_pc|estm_prsnl_incme_yr_end_30_jne_tl_erns_excl_gvrnmt_pns_alwncs_n|estm_prsnl_incme_yr_end_30_jne_tl_erns_excl_gvrnmt_pns_alwncs_m|estm_prsnl_incme_yr_end_30_jne_tl_excl_gvrnmt_pns_alwncs_m|estm_prsnl_incme_yr_end_30_jne_mdn_tl_excl_gvrnmt_pns_alwncs|estm_prsnl_incme_yr_end_30_jne_mn_tl_excl_gvrnmt_pns_alwncs|estm_prsnl_incme_yr_end_30_jne_tl_excl_gvrnmt_pns_alwncs_p802_r|estm_prsnl_incme_yr_end_30_jne_tl_excl_gvrnmt_pns_alwncs_p805_r|estm_prsnl_incme_yr_end_30_jne_tl_excl_gvrnmt_pns_alwncs_p205_r|estm_prsnl_incme_yr_end_30_jne_tl_excl_gvrnmt_pns_alwncs_p105_r|estm_prsnl_incme_yr_end_30_jne_tl_excl_gvrnmt_pns_alwncs_gni_cf|estm_prsnl_incme_yr_end_30_jne_tl_excl_gvrnmt_pns_alwncs_shre_o|estm_prsnl_incme_yr_end_30_jne_tl_excl_gvrnmt_pns_alwncs_shre_0|estm_prsnl_incme_yr_end_30_jne_tl_excl_gvrnmt_pns_alwncs_shre_1|estm_prsnl_incme_yr_end_30_jne_tl_excl_gvrnmt_pns_alwncs_lwst_q|estm_prsnl_incme_yr_end_30_jne_tl_excl_gvrnmt_pns_alwncs_scnd_q|estm_prsnl_incme_yr_end_30_jne_tl_excl_gvrnmt_pns_alwncs_thrd_q|estm_prsnl_incme_yr_end_30_jne_tl_excl_gvrnmt_pns_alwncs_hgst_q|grss_cptl_gns_rprtd_txpyrs_yr_endd_30_jne_by_prsns_nm|grss_cptl_gns_rprtd_txpyrs_yr_endd_30_jne_by_amnt_m|grss_cptl_gns_rprtd_txpyrs_yr_endd_30_jne_by_mdn|grss_cptl_gns_rprtd_txpyrs_yr_endd_30_jne_by_mn|slctd_gvrnmnt_pnsns_allwncs_30_jne_age_pnsn_cntrlnk_nm|slctd_gvrnmnt_pnsns_allwncs_30_jne_crr_pymnt_nm|slctd_gvrnmnt_pnsns_allwncs_30_jne_crr_allwnce_nm|slctd_gvrnmnt_pnsns_allwncs_30_jne_dsblty_spprt_pnsn_nm|slctd_gvrnmnt_pnsns_allwncs_30_jne_nwstrt_allwnce_nm|slctd_gvrnmnt_pnsns_allwncs_30_jne_prntng_pymnt_sngle_nm|slctd_gvrnmnt_pnsns_allwncs_30_jne_prntng_pymnt_prtnrd_nm|slctd_gvrnmt_pns_alwncs_30_jne_yth_alwnce_fl_tme_stdnprc_nm|slctd_gvrnmnt_pnsns_allwncs_30_jne_yth_allwnce_nm|slctd_gvrnmnt_pnsns_allwncs_30_jne_fmly_tx_bnft_nm|slctd_gvrnmnt_pnsns_allwncs_30_jne_fmly_tx_bnft_b_nm|slctd_gvrnmnt_pnsns_allwncs_30_jne_cmmnwlth_rnt_assstnce_nm|exprmntl_hshld_wlth_estmts_mdlld_prvte_dwllngs_mn_nt_wrth|ttl_prsnl_incme_wkly_prsns_agd_15_yrs_cnss_ernng_1_499_pr_wk_pc|tl_prsnl_incme_wkly_prsn_agd_15_yrs_cns_erng_500_999_pr_wk_pc|tl_prsnl_incme_wkly_prsn_agd_15_yrs_cns_erng_1000_1999_pr_wk_pc|tl_prsnl_incme_wkly_prsn_agd_15_yrs_cns_erng_2000_2999_pr_wk_pc|ttl_prsnl_incme_wkly_prsns_agd_15_yrs_cnss_ernng_3000_pr_wk_pc|ttl_prsnl_incme_wkly_prsns_agd_15_yrs_cnss_ernng_nl_pc|ttl_prsnl_incme_wkly_prsns_agd_15_yrs_cnss_ngtve_pc|tl_prsnl_incme_wkly_prsn_agd_15_yrs_cns_indqtly_dscrb_std_pc|equivalised_total_household_income_census_median_weekly".split("|")

# Number of columns in the DataFrame
total_columns = len(df.columns)

# Threshold: Keep rows with at least half of the columns non-null
thresh_value = total_columns // 2 + 1  # Half of columns, rounded up

# Drop rows that have less than thresh_value non-null values
df_cleaned = df.dropna(thresh=thresh_value)

                                                                                

In [12]:
columns_with_null=[]

for column in df.columns:
    # Filter rows where a column has null values
    df_null = df_cleaned.filter(df_cleaned[column].isNull())

# Count rows with null values
    null_count = df_null.count()

    if (null_count>0):
        columns_with_null.append(column)


                                                                                

In [13]:
from pyspark.sql.functions import col, when, avg
from pyspark.sql import Window
from pyspark.sql.functions import coalesce, lag, lead


def null_filler(df, column_name):
# Create a window specification partitioned by sa2_maincode_2016 and ordered by yr
    window_spec = Window.partitionBy('sa2_maincode_2016').orderBy('yr')

# Use lag (previous year) and lead (next year) functions
    df_with_adjacent_values = df.withColumn('prev_year_value', lag(column_name).over(window_spec)) \
                            .withColumn('next_year_value', lead(column_name).over(window_spec))


# Calculate the average of prev_year_value and next_year_value, ignoring nulls
    df_filled = df_with_adjacent_values.withColumn('filled_value',
        when(
            col(column_name).isNull(), 
            (coalesce(col('prev_year_value'), col('next_year_value')) + coalesce(col('next_year_value'), col('prev_year_value'))) / 2
        ).otherwise(col(column_name))
    )

# Handle cases where both prev_year_value and next_year_value are null
    df_filled = df_filled.withColumn('filled_value',
        when(
            col('filled_value').isNull(), 
            lag('prev_year_value', 2).over(window_spec)  # or lead for next 2 years
        ).otherwise(col('filled_value'))
    )

    df_final = df_filled.withColumn(column_name, col('filled_value')).drop('prev_year_value', 'next_year_value', 'filled_value')
    return(df_final)


In [15]:
def null_filler_multi(df, columns_list):
    for column in columns_list:
        df=null_filler(df, column)
    return df

df_filled=null_filler_multi(df_cleaned, columns_with_null)

In [16]:
spark = (
    SparkSession.builder.appName("project2 preprocessing")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "4g")
    .config("spark.executor.memory", "8g")
    .getOrCreate()
)

output_path=f"../data/raw/income.csv"
df_filled=df_filled.drop("geometry")
df_filled.write.csv(output_path, header=True, mode="overwrite")

24/09/18 12:19:15 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
                                                                                