In [1]:

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField
from pyspark.sql.functions import col

db_path="s3a://tsel-bucket/data/warehouse/tablespace/managed/hive/"
spark = (SparkSession
    .builder
    .appName("homecredit-spark")
    .config("spark.sql.warehouse.dir", db_path)
    .config("spark.hadoop.fs.s2a.s3guard.ddb.region", "us-east-1")
#    .config("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation","true")
    .config("spark.kerberos.access.hadoopFileSystem","s3a://tsel-bucket/")
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
    .master("local[5]") # should be possible to change this to SPARK on Yarn or SPARK on Kubernetes
    .getOrCreate())

homecredit_raw_df=spark.read.option("header", True) \
                    .option("inferSchema",True) \
                    .csv("/home/cdsw/04_RAW_TRAINING_DATA/processed_data.csv")

                                                                                

In [2]:
print((homecredit_raw_df.count(), len(homecredit_raw_df.columns)))

                                                                                

(356251, 722)


In [13]:
# exprs = [col(column).alias(column.replace(' ', '_')) for column in homecredit_df.columns]
# homecredit_formatted_df = homecredit_df.select(*exprs)

import unicodedata
import re
from pyspark.sql.functions import *
from pyspark.sql.window import *

def normalize(column: str) -> str:
    """
    Normalize column name by replacing invalid characters with underscore
    strips accents and make lowercase
    :param column: column name
    :return: normalized column name
    """
    n = re.sub(r"[ ,;:{}()/\n\t=\-\+]+", '_', column.lower())
    return unicodedata.normalize('NFKD', n).encode('ASCII', 'ignore').decode()


# using the function
homecredit_df = homecredit_raw_df.toDF(*map(normalize, homecredit_raw_df.columns))
homecredit_df=homecredit_df.drop(col("_c0"))


homecredit_df = homecredit_df.withColumn("monotonically_increasing_id", monotonically_increasing_id())
window = Window.orderBy(col('monotonically_increasing_id'))
homecredit_df = homecredit_df.withColumn('increasing_id', row_number().over(window))

## cast to float
cast_cols =["new_phone_to_birth_ratio_employer", "prev_app_credit_perc_max", "refused_app_credit_perc_max", "instal_payment_perc_max", "instal_payment_perc_min" ]
for col_name in cast_cols:
    homecredit_df = homecredit_df.withColumn(col_name, col(col_name).cast('double'))


homecredit_df = homecredit_df.withColumn("created", current_timestamp())
homecredit_df = homecredit_df.withColumn("event_timestamp", current_timestamp() - expr("INTERVAL 1 seconds") * col("increasing_id"))
homecredit_df =homecredit_df.drop(col("monotonically_increasing_id"))
homecredit_df = homecredit_df.drop(col("increasing_id")) 

#homecredit_df=homecredit_df.na.drop() -- we just get 58 rows if we use drop nas


In [14]:
print((homecredit_df.count(), len(homecredit_df.columns)))

(356251, 723)


In [15]:

# Load the processed formatted files to hive

homecreditsample_df = homecredit_df.select(col( "event_timestamp" ), col("sk_id_curr"), col("flag_own_car"), col("flag_own_realty"),col("cnt_children"), col("created")).limit(20)
# homecreditsample_df = homecredit_df.limit(20)
                                           
homecredit_df.write \
    .mode("overwrite") \
    .format("parquet") \
    .saveAsTable('homecredit_processed_data')

homecreditsample_df.write \
    .mode("overwrite") \
    .format("parquet") \
    .saveAsTable('homecredit_processed_data_sample')

Hive Session ID = c5a12e6b-9edf-4b81-b2a9-bd83177e68b8
                                                                                

In [16]:
#generating the Feature Definition File FEAST Datatype Mappings
def spark_type_to_primitive_feast_value_type(
    name: str
) -> str:
    
    
    type_map = {
        "int": "Int32",
        "bigint": "Int64",
        "float": "Float32",
        "string": "String",
        "double": "Float32",
        "timestamp": "TIMESTAMP",

    }
    return type_map[name]

## Copy the output generated here into the feature specifications file in Feature Repo. This is needed because FEAST doesn't seem to infer features properly by itself. 

for i in homecredit_df.dtypes:
    name =i[0]
    pDtype= i[1]
    if name != "sk_id_curr" and name != "event_timestamp" and name != "created" :
        print(f"Field(name=\"{name}\" , dtype={spark_type_to_primitive_feast_value_type(pDtype)}),") 

Field(name="index" , dtype=Int32),
Field(name="target" , dtype=Float32),
Field(name="code_gender" , dtype=Int32),
Field(name="flag_own_car" , dtype=Int32),
Field(name="flag_own_realty" , dtype=Int32),
Field(name="cnt_children" , dtype=Int32),
Field(name="amt_income_total" , dtype=Float32),
Field(name="amt_credit" , dtype=Float32),
Field(name="amt_annuity" , dtype=Float32),
Field(name="amt_goods_price" , dtype=Float32),
Field(name="region_population_relative" , dtype=Float32),
Field(name="days_birth" , dtype=Int32),
Field(name="days_employed" , dtype=Float32),
Field(name="days_registration" , dtype=Float32),
Field(name="days_id_publish" , dtype=Int32),
Field(name="own_car_age" , dtype=Float32),
Field(name="flag_mobil" , dtype=Int32),
Field(name="flag_emp_phone" , dtype=Int32),
Field(name="flag_work_phone" , dtype=Int32),
Field(name="flag_cont_mobile" , dtype=Int32),
Field(name="flag_phone" , dtype=Int32),
Field(name="flag_email" , dtype=Int32),
Field(name="cnt_fam_members" , dtype=Float