# AMEX: Credit Default Predictor  

Description, Bussiness Problem, Data Explanation and ETL




- Business Problem:
The business problem is credit default prediction, which is essential for managing risk in consumer lending businesses. Lenders need accurate models to assess the creditworthiness of potential borrowers and make informed lending decisions.


- Business Context:
American Express, the largest payment card issuer globally, is hosting a competition to explore improved credit default prediction models. They aim to enhance the customer experience by making it easier for cardholders to be approved for a credit card. By leveraging machine learning techniques and an industrial-scale dataset, participants will challenge the existing credit default prediction model used by American Express.

- Main Goal:
Develop a machine learning model that surpasses the current credit default prediction model used by American Express, improving accuracy and performance metrics to enhance the customer experience and optimize lending decisions.


## The Data

### Extract, Tranform, Load


The datasets can be found on Kaggle, and we will utilize the Kaggle API to download them.

There are four distinct datasets available: train data, test data, train labels, and submission data.

Before delving into the project, it is crucial to review the sizes of these datasets to gain an understanding of the dataset's magnitude we are working with.


- Submission data: 61.95 mb
- Train labels data: 30.75 mb
- Train data: 16.39 gb
- Test data: 32.82 gb



Upon reviewing the sizes of the datasets, we discovered that two of them are quite large, approximately 16 and 32 GB in size. This indicates the potential need for techniques and technologies capable of handling large datasets. Whether it's during the ETL (Extract, Transform, Load) process, data analysis, or model training, the large dataset size can pose challenges.

For the current phase, our plan is to work with JupyterLab locally on a MacBook Pro equipped with an M1 Pro chip. 

Will be explained later in case a computational enhancer is required. 


Lets start with the code to extract the data locally.



In [1]:
import os
from kaggle.api.kaggle_api_extended import KaggleApi

def kaggle_downloads(file, final_path):
    # Please create an account and download the kaggle.json with your credentials. 
    api = KaggleApi()
    api.authenticate()

    # Download the file
    api.dataset_download_file(file, path=final_path, quiet=False)

    # Unzip file
    extension = os.path.splitext(file)[1]
    if extension == ".zip":
        import zipfile
        with zipfile.ZipFile(os.path.join(final_path, file), "r") as zip_ref:
            zip_ref.extractall(final_path)
        os.remove(os.path.join(final_path, file))

    print("File downloaded correctly.")

# Use the function
file_name = "amex-default-prediction
path= "/Users/arielsolis"

kaggle_downloads(nombre_archivo, ruta_destino)



amex-default-prediction.zip: Skipping, found more recently modified local copy (use --force to force download)


It is crucial to note that if you are utilizing MacOS, the "kaggle.json" file must be saved in the directory that the notebook is located in, followed by **"/.kaggle"**. This directory path ensures that the Kaggle API can access the necessary credentials for downloading the datasets.  


Due to the large size of the datasets, it is expected that compiling them may take a considerable amount of time. However, once the datasets are compiled, we can proceed to utilize specific packages or libraries to handle the data efficiently.


Lets start using pandas for train data and train labels

In [2]:
import pandas as pd 

#Load train and train labels datasets
train_labels = pd.read_csv('train_labels.csv')


Indeed, Pandas has efficient capabilities for loading datasets quickly, even for larger datasets.


Certainly! In cases where the local storage and computational efficiency become limiting factors, it is a wise choice to leverage Apache Spark for handling the train data and test data instead of relying solely on Pandas.

Apache Spark is a powerful distributed computing framework designed to process and analyze large-scale datasets efficiently. Here's why using Spark is beneficial in such scenarios:
multiple machines or cores. This distributed processing capability significantly enhances the performance and scalability of data operations, making it ideal for handling large datasets.

- Fault Tolerance: PySpark incorporates fault tolerance mechanisms, ensuring that data processing tasks can recover from failures. It automatically handles data replication and re-executes failed tasks, ensuring the reliability and stability of data processing workflows.

- Scalability: As the dataset size increases, PySpark can easily scale up by adding more computational resources. It can efficiently handle datasets that surpass the memory limits of a single machine by leveraging distributed computing resources.

- Built-in Data Processing Functions: PySpark provides a wide range of built-in functions for data processing, transformation, and analysis. It offers a familiar DataFrame API similar to Pandas, making it convenient for data manipulation tasks.

- Compatibility: PySpark integrates well with other big data technologies and platforms, such as Hadoop and Spark clusters. It supports various data sources, including CSV, Parquet, and more, allowing seamless integration with different data formats.

By using PySpark locally, we can leverage its distributed computing capabilities, fault tolerance mechanisms, and scalability features to handle and process the test data efficiently, even when dealing with large datasets that exceed the capacity of a single machine.


In [6]:
# Build a Spark session
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("My Spark Application") \
    .config("spark.sql.debug.maxToStringFields", "100")\
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/23 19:44:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/23 19:44:55 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [2]:
spark_df = spark.read.csv('train_data.csv', header=True,inferSchema=True)


# Specify the output path for the Parquet file
output_path = 'train_data.parquet'

# Write the DataFrame to Parquet format
spark_df.write.parquet(output_path)
spark_df.coalesce(1)

# Stop the SparkSession
spark.stop()

23/05/23 19:14:42 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Certainly! From the previous code, there are two notable aspects. Firstly, we are converting the CSV file to the Parquet format, and secondly, the resulting output is a folder containing the dataset divided into partitions. Let's explore the reasons behind these choices and address concatenating the partitions into a single dataset.

Conversion to Parquet:

- Improved Performance: Parquet is a columnar storage format that offers efficient compression and encoding techniques. By converting the dataset to Parquet, we can optimize storage and achieve faster query performance due to its columnar layout.
- Compatibility: Parquet is widely supported in various data processing frameworks, including Apache Spark and Pandas. It provides interoperability between different tools, allowing seamless integration and data sharing.


The result of this code will be a folder with the dataset in partitions. Let's try to concatenate all these partitions to have only one dataset.


In [3]:
import os
import pandas as pd
import pyarrow.parquet as pq

# File
folder = '/Users/arielsolis/train_data.parquet'

# List of  files
files_parquet = [file for file in os.listdir(folder) if file.endswith('.parquet')]

# Read and cocatenate those files
dfs = []
for file in files_parquet:
    file_path = os.path.join(folder, file)
    table = pq.read_table(file_path)
    df = table.to_pandas()
    dfs.append(df)
    
    
# Concatenate DataFrames
train = pd.concat(dfs, ignore_index=True)

train.to_parquet('/Users/arielsolis/train.parquet')

In [4]:
train = pd.read_parquet('train.parquet')

Now that that we load the train dataset lets merge the train and target dataset. We dont know if the target dataset is sort as same as the train dataset so its better to merge both with a left join

In [5]:
# Merge the datasets
train_merge = train.merge(train_labels,how='left',on='customer_ID')

# This is and important step a simple validation of the new dataset

# Validate same number of rows
if train.shape[0] == train_merge.shape[0]:
   print(f'Both datasets have {train_merge.shape[0]} records')
else:
    print('Not the same amount of records')
    
# Validate the target no null values and just two values [0,1]

if train_merge.target.isna().sum() ==0:
    
    print('Target column without null values')
    print(f'the Target column has {train_merge.target.unique()} unique values')
else:
    print('Target with null values')

Both datasets have 5531451 records
Target column without null values
the Target column has [0 1] unique values


In [6]:
train_merge.to_parquet('train_dataset.parquet')


In the case of the test data set, lets preprocess as same as the train dataset








In [7]:
spark_df = spark.read.csv('test_data.csv', header=True,inferSchema=True)


# Specify the output path for the Parquet file
output_path = 'test_data.parquet'

# Write the DataFrame to Parquet format
spark_df.write.parquet(output_path)
spark_df.coalesce(1)

# Stop the SparkSession
spark.stop()


23/05/23 19:46:04 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [2]:


# File
folder = '/Users/arielsolis/test_data.parquet'

# List of  files
files_parquet = [file for file in os.listdir(folder) if file.endswith('.parquet')]

# Read and cocatenate those files
dfs = []
for file in files_parquet:
    file_path = os.path.join(folder, file)
    table = pq.read_table(file_path)
    df = table.to_pandas()
    dfs.append(df)
    
    
# Concatenate DataFrames
test = pd.concat(dfs, ignore_index=True)


In [3]:
import os
import pandas as pd
import pyarrow.parquet as pq

test.to_parquet('test.parquet')

Now with oall that preprocess lets check the size of the datasets:

- Train data: 6.68 gb
- Test data: 13.7 gb

50% and 58% of size reduce!!


After obtaining the datasets in the desired format, it is recommended to utilize a solution for storing these datasets beyond just the local environment. In this case, we will employ Amazon S3 buckets from AWS as the storage solution. However, it's important to note that the choice of using AWS S3 is based on a simple preference, and other cloud storage options or even personal drives could be used as well.

Here are the reasons why using AWS S3 buckets is a popular choice:

- Scalability: AWS S3 offers virtually unlimited scalability, allowing you to store and manage large volumes of data without worrying about storage capacity limitations. It can handle datasets of any size, providing a flexible and scalable storage solution.

- Durability and Reliability: S3 ensures high durability and reliability for your data. It replicates data across multiple availability zones, providing redundancy and protecting against data loss. It also offers versioning and backup features, ensuring the safety and integrity of your datasets.

- Accessibility and Availability: AWS S3 provides easy and secure access to your data from anywhere at any time. It offers fine-grained access controls, enabling you to manage permissions and share data with specific users or services. Additionally, S3 provides high availability, ensuring that your datasets are accessible without downtime or interruptions.

- Integration and Ecosystem: AWS S3 integrates seamlessly with a wide range of AWS services, such as data processing frameworks (e.g., Apache Spark), machine learning platforms (e.g., Amazon SageMaker), and analytics tools. It forms part of the robust AWS ecosystem, offering various data management and analysis capabilities.

While AWS S3 is chosen here as a preferred storage solution, it's important to note that other cloud storage options like Google Cloud Storage, Microsoft Azure Blob Storage, or personal drives such as Google Drive or Dropbox can also be used based on individual preferences, requirements, and existing infrastructure.








### AWS Storage (s3 buckets)


Indeed, before utilizing AWS products and services, it is essential to create an AWS account. Here are the steps to create an AWS account:

- Visit the AWS website: Go to the official AWS website at https://aws.amazon.com/.

Once you have the account we can start working on AWS services!


In [7]:

import boto3 # AWS package

# You need credentials to be able to use python to create, load or download anything in AWS services

ACCESS_KEY = 'AKIAYESKYKNUCG5RIZ5J'  
SECRET_KEY = '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'

BUCKET_NAME = 'amex-dfs' # Define a bucket name
REGION = 'us-east-2'  # Specify the desired region for the bucket
location_constraint = 'us-east-2' # You have to use same location as REGION
s3 = boto3.client('s3',region_name=REGION, aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY)



In [30]:


# Create the S3 bucket
s3.create_bucket(Bucket=BUCKET_NAME, CreateBucketConfiguration={'LocationConstraint': location_constraint})

{'ResponseMetadata': {'RequestId': 'M7EB2VA00GWKJA7Z',
  'HostId': 'mOjlUwxmz1++pqDbLz/P390907FafbRDi7AF/Hp3f+aBI5l0OlsGtK7RvDPZRLBj4OsjM59JocA=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'mOjlUwxmz1++pqDbLz/P390907FafbRDi7AF/Hp3f+aBI5l0OlsGtK7RvDPZRLBj4OsjM59JocA=',
   'x-amz-request-id': 'M7EB2VA00GWKJA7Z',
   'date': 'Wed, 24 May 2023 00:37:34 GMT',
   'location': 'http://amex-dfs.s3.amazonaws.com/',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'Location': 'http://amex-dfs.s3.amazonaws.com/'}

After creating the s3 bucket we can add the datasets to a folder, also just in case the test dataset is to large we are going to add the folder with the partitions

In [None]:
s3.upload_file('test.parquet', BUCKET_NAME, 'test_data.parquet')
s3.upload_file('train_dataset.parquet', BUCKET_NAME, 'train_data.parquet')

You have to be patient, this case would depend on your internet velocity so uploading the files into s3 could take some time!

## SUMMARY

- To connect locally, the Kaggle API is used, and it is crucial to save the "kaggle.json" file in the correct path.

- Once the datasets are downloaded, they can be loaded using Python with both Spark and Pandas. Spark is preferred for handling heavy files due to its distributed computing capabilities.

- The datasets can be transformed to the Parquet format using Spark, which provides efficient storage and optimized query performance. The goal is to create a single unpartitioned DataFrame for further processing.

- Next, the train dataset can be joined with the target dataset, ensuring that there are no errors or mismatches in the join operation. This step helps validate the integrity and consistency of the data.

- To store the transformed files remotely and access them without relying on local storage, an AWS account is created. The transformed datasets can be saved using AWS S3, which provides scalable and durable object storage in the cloud. This enables remote access to the datasets and reduces the reliance on local storage infrastructure.

Thanks,

Ariel Solis 