In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StringType, IntegerType, StructType, DoubleType, DecimalType
from pyspark.sql.functions import when, lit, col

In [5]:
spark = SparkSession.builder.appName("ETL").getOrCreate()

raw_data_loc = "../data/raw/application/application_train.csv"
df_raw = spark.read.csv(raw_data_loc, inferSchema=True, header=True)

cleaned_data_loc = "../data/processed/"

In [6]:
#df_raw.show()
# df_raw.columns
# df_raw.printSchema()
# df_raw.describe()

In [7]:
# val dfNew = df.withColumn("newColName", df.originalColName.cast(IntegerType))
#     .drop("originalColName").withColumnRenamed("newColName", "originalColName")

In [8]:
# Replace DAYS_BIRTH with AGE
df_age = df_raw.withColumn("AGE", (df_raw['DAYS_BIRTH'] / -365).cast(IntegerType())).drop("DAYS_BIRTH")
# df_age.select("AGE").show()
# df_age.select("DAYS_BIRTH").show()

# Replace DAYS_EMPLOYED with YEARS_EMPLOYED
df_employed = df_age.withColumn("YEARS_EMPLOYED_with_anom", (df_age["DAYS_EMPLOYED"] / -365).cast(DecimalType(10,4))).drop("DAYS_EMPLOYED")
# df_employed.select("YEARS_EMPLOYED").show()

# anom_YEARS_EMPLOYED
df_employed_anom = df_employed.withColumn("YEARS_EMPLOYED_anom", (df_employed["YEARS_EMPLOYED_with_anom"] < 0).cast(IntegerType()))
df_employed_anom = df_employed_anom.withColumn("YEARS_EMPLOYED", when(df_employed["YEARS_EMPLOYED_with_anom"] >= 0, df_employed["YEARS_EMPLOYED_with_anom"]))
df_employed_anom = df_employed_anom.drop("YEARS_EMPLOYED_with_anom")
# df_employed_anom.select(["YEARS_EMPLOYED_anom", "YEARS_EMPLOYED"]).show()

def replace_with_NA(col, to_replace):
    return when(col != to_replace, col)

# Remove the 2 Unknown values
df_fam_null = df_employed_anom.withColumn("NAME_FAMILY_STATUS", 
                                            replace_with_NA(df_employed_anom["NAME_FAMILY_STATUS"],
                                                            "Unknown"
                                                           )
                                         )
# df_fam_null.select("NAME_FAMILY_STATUS").filter(col("NAME_FAMILY_STATUS").isNull()).show()

# Remove the 4 XNA values
df_gender_null = df_fam_null.withColumn("CODE_GENDER", 
                                            replace_with_NA(df_employed_anom["CODE_GENDER"],
                                                            "XNA"
                                                           )
                                       )
# df_gender_null.select("CODE_GENDER").filter(col("CODE_GENDER").isNull()).show()
# df_gender_null.select("CODE_GENDER").show()#

df_gender_null.count() == df_raw.count()

True

In [9]:
# Feature Engineering


def domain_features(df_in):
    df_in = df_in.withColumn('CREDIT_INCOME_PERCENT', (df_in['AMT_CREDIT'] / df_in['AMT_INCOME_TOTAL']))
    df_in = df_in.withColumn('ANNUITY_INCOME_PERCENT', (df_in['AMT_ANNUITY'] / df_in['AMT_INCOME_TOTAL']))
    df_in = df_in.withColumn('CREDIT_TERM', (df_in['AMT_ANNUITY'] / df_in['AMT_CREDIT']))
    df_in = df_in.withColumn('YEARS_EMPLOYED_PERCENT', (df_in['YEARS_EMPLOYED'] / df_in['AGE']))
    return df_in

df_cleaned = domain_features(df_gender_null)
# df_cleaned.select(["YEARS_EMPLOYED", "AGE", "YEARS_EMPLOYED_PERCENT"]).show()
df_cleaned.count() == df_raw.count()

True

In [None]:
# Export distributed
df_cleaned.write.mode("Overwrite").csv(cleaned_data_loc + "app_train_cleaned/")

In [None]:
# Export single csv file
df_cleaned.toPandas().to_csv(cleaned_data_loc + "app_train_processed.csv", index=False)