<img src="https://www.google.com/url?sa=i&url=https%3A%2F%2Faws.amazon.com%2Fes%2Fsolutions%2Fcase-studies%2FKueski%2F&psig=AOvVaw2G4P4XL7b6yQb4dv5CCv6i&ust=1640206653391000&source=images&cd=vfe&ved=0CAsQjRxqFwoTCOjUsInk9fQCFQAAAAAdAAAAABAD">


## Glue Job Preprocessing with PySpark-Kueski Challenge

AWS Glue supports an extension of the PySpark Python dialect for scripting extract, transform, and load (ETL) jobs. In this notebook we use this dialect for creating an ETL script to run a Glue job. 

<a id='contents' />

## Table of contents

1. [Loading libraries](#loading)
2. [Feature Engineering](#etl)
3. [Writing results to S3](#s3)

<a id='loading' />

## 1. Loading libraries:
[(back to top)](#contents)

In [1]:
print('Loading the required libraries')
import sys
import pyspark.sql.functions as func
from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.sql.types import *
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from pyspark.sql import Window
import json
import boto3
import ast
import datetime
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import gc
import sys
from pyspark.conf import SparkConf
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
import pandas as pd

kms_key_id = '36f1041a-656a-4df4-954a-49b6a39e4b54'

spark_conf = SparkConf().setAll([
    ("spark.hadoop.fs.s3.enableServerSideEncryption", "true"),
    ("spark.hadoop.fs.s3.serverSideEncryption.kms.keyId", kms_key_id),
    ("spark.sql.sources.partitionOverwriteMode", "dynamic"),
    ("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"),
    ("spark.sql.catalogImplementation", "hive"),
    ("hive.exec.dynamic.partition.mode", "nonstrict")
])
sc = SparkContext.getOrCreate(conf=spark_conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
logger = glueContext.get_logger()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
164,application_1622637872219_0164,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Loading the required libraries

In [2]:
# Dejar variables del processing_job en el proceso.
# Definir variable updated_at. ( podria ser una particion para quedarme solo con aquellas particiones que son las ultimas)
# Primary key deberia ser el loan_id. Indice sobre el id del cliente.
today       = datetime.strptime("2021-10-21", '%Y-%m-%d').date()
updated_at  = today.strftime('%Y-%m-%d')
source_path =  "s3://arcosmtk-working-directory/credits_scoring_test/dataset_credit_risk.csv.gz"
destination_path = "s3://arcosmtk-working-directory/credits_scoring_test/offline_serving/"


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
df = glueContext.read.csv(source_path,header=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# Formateo como fechas y ids. Deberia hacer QA por ejemplo fechas de laburo topearla a current date o cualquier otra estrategia.
df = df.withColumn("id", col('id').cast(IntegerType()))
df = df.withColumn("loan_id", col('loan_id').cast(IntegerType()))
df = df.withColumn("loan_amount", col('loan_amount').cast(DoubleType()))
df = df.withColumn("loan_date", to_date('loan_date','yyyy-MM-dd'))
df = df.withColumn("job_start_date", to_date('job_start_date','yyyy-MM-dd'))
# skip OutOfBoundsError in to_timestamp function dont see here. QA 
df = df.withColumn("job_start_date",when(col("job_start_date") > current_date(),current_date()).otherwise(col('job_start_date')))
df = df.withColumn("birthday", to_date('birthday','yyyy-MM-dd'))
# skip OutOfBoundsError in to_timestamp function dont see here. QA 
df = df.withColumn("birthday",when(col("birthday") > current_date(),current_date()).otherwise(col('birthday')))
# La convierto a long para poder usar estrategia de ventaneo.
df = df.withColumn("loan_date_ts", unix_timestamp(to_timestamp(col('loan_date'),'yyyy-MM-dd')))
# Declared processing_date.
df = df.withColumn("processing_date", lit(updated_at))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<a id='etl' />

## Featuring Engineering
[(back to top)](#contents)

In [14]:
# Feature nb_previous_loans
df = df.withColumn("nb_previous_loans", dense_rank().over(Window.partitionBy("id").orderBy(asc("loan_date")))-1)

# avg_amount_loans_previous
w = Window.partitionBy('id')\
                 .orderBy(asc('loan_date_ts'))\
                 .rangeBetween(Window.unboundedPreceding, -1)
df = df.withColumn('avg_amount_loans_previous', mean('loan_amount').over(w))

# Reproduccion de la variable age. #TODO aplicar control de calidad sobre la columna.
df = df.withColumn("age",(datediff(current_date(), col('birthday'))/365.25).cast(IntegerType()))

# Ask DataScience better way to impute incorrect datos. For example bod or age > 100 years.
df = df.withColumn("years_on_the_job",(datediff(current_date(), col('job_start_date'))/365.25).cast(IntegerType()))

# De nuevo no hay control de calidad sobre los datos de entrada. Convendria mejor un OneHOt Encoder.
@udf(returnType=StringType()) 
def is_own_car(x):
    return  0 if x == 'N' else 1
df = df.withColumn("flag_own_car",is_own_car(col("flag_own_car")))



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- loan_id: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- code_gender: string (nullable = true)
 |-- flag_own_car: string (nullable = true)
 |-- flag_own_realty: string (nullable = true)
 |-- cnt_children: string (nullable = true)
 |-- amt_income_total: string (nullable = true)
 |-- name_income_type: string (nullable = true)
 |-- name_education_type: string (nullable = true)
 |-- name_family_status: string (nullable = true)
 |-- name_housing_type: string (nullable = true)
 |-- days_birth: string (nullable = true)
 |-- days_employed: string (nullable = true)
 |-- flag_mobil: string (nullable = true)
 |-- flag_work_phone: string (nullable = true)
 |-- flag_phone: string (nullable = true)
 |-- flag_email: string (nullable = true)
 |-- occupation_type: string (nullable = true)
 |-- cnt_fam_members: string (nullable = true)
 |-- status: string (nullable = true)
 |-- birthday: date (nullable = true)
 |-- job_start_date: date (nullable = true)
 |-- loan_date: date (

In [16]:
df2 = df.toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
df2.job_start_date.min(),df2.job_start_date.max()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(datetime.date(1978, 9, 10), datetime.date(2021, 12, 22))

<a id='etl' />

## Writing results to S3
[(back to top)](#contents)

In [18]:
# Guardo en parquet (feature store offline) Discuss partitioning strategy. Take into account the refresh of old partitions
# in prod enviromment. BE carefull by overwrite partitions. Need update parquet using upserts.
#TODO change strategy to  "feature online" (SQL database) upsert records instead.
df = df.repartition("loan_date")
df.write.mode('overwrite')\
        .format('parquet')\
        .partitionBy('loan_date')\
        .save(destination_path)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
# online featute store replication
# TODO

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…