In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

# Function to get or create SparkSession safely
def get_spark_session(app_name="Credit Card Approval Preprocessing"):
    try:
        # If there's already an active SparkContext, reuse it
        spark = SparkSession.builder.appName(app_name).getOrCreate()
        print(f"SparkSession {app_name} started successfully.")
        return spark
    except Exception as e:
        print(f"Error initializing SparkSession: {e}")
        raise e

# Initialize or get existing Spark session
spark = get_spark_session()

StatementMeta(, 3d074d4f-b9be-48da-8c16-060c92adb3de, 3, Finished, Available, Finished)

SparkSession Credit Card Approval Preprocessing started successfully.


In [3]:
application_df = spark.read.format("csv").option("header","true").load("Files/application_record.csv")
# df now is a Spark DataFrame containing CSV data from "Files/application_record.csv".
display(application_df)


StatementMeta(, 3d074d4f-b9be-48da-8c16-060c92adb3de, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 27eda847-2362-4960-9767-6e008ebdcbc7)

In [4]:
credit_df = spark.read.format("csv").option("header","true").load("Files/credit_record.csv")
# df now is a Spark DataFrame containing CSV data from "Files/credit_record.csv".
display(credit_df)

StatementMeta(, 3d074d4f-b9be-48da-8c16-060c92adb3de, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 79ef6c78-d098-41e9-844e-4e493fbd754e)

In [5]:
# 1. Data Cleaning
application_df = application_df.withColumn("CNT_CHILDREN", application_df["CNT_CHILDREN"].cast("integer"))
application_df = application_df.withColumn("AMT_INCOME_TOTAL", application_df["AMT_INCOME_TOTAL"].cast("float"))
application_df = application_df.withColumn("DAYS_BIRTH", application_df["DAYS_BIRTH"].cast("integer"))
application_df = application_df.withColumn("DAYS_EMPLOYED", application_df["DAYS_EMPLOYED"].cast("integer"))
application_df = application_df.withColumn("CNT_FAM_MEMBERS", application_df["CNT_FAM_MEMBERS"].cast("integer"))

# Fill missing values in OCCUPATION_TYPE with 'Unknown'
application_df = application_df.na.fill({'OCCUPATION_TYPE': 'Unknown'})

# Drop any remaining rows with missing values
application_df = application_df.dropna()

StatementMeta(, 3d074d4f-b9be-48da-8c16-060c92adb3de, 7, Finished, Available, Finished)

In [6]:
# 2. Feature Engineering
from pyspark.sql.functions import expr

application_df = application_df.withColumn('AGE_YEARS', expr("abs(DAYS_BIRTH) / 365"))

# Convert DAYS_EMPLOYED to Years Employed
application_df = application_df.withColumn('YEARS_EMPLOYED', expr("abs(DAYS_EMPLOYED) / 365"))

# Remove unneeded columns from the application data (e.g., FLAG_MOBIL might not be relevant)
application_df = application_df.drop('FLAG_MOBIL', 'DAYS_BIRTH', 'DAYS_EMPLOYED')

# Process Credit Records
# Convert necessary columns to appropriate types in credit_df if needed
credit_df = credit_df.withColumn("MONTHS_BALANCE", credit_df["MONTHS_BALANCE"].cast("integer"))
credit_df = credit_df.withColumn("STATUS", credit_df["STATUS"].cast("string"))

# Filter out irrelevant credit statuses ('X' means no loan, 'C' means closed loans)
credit_df = credit_df.filter((credit_df.STATUS != 'X') & (credit_df.STATUS != 'C'))

StatementMeta(, 3d074d4f-b9be-48da-8c16-060c92adb3de, 8, Finished, Available, Finished)

In [7]:
# 3. Data Joining
# Join the application and credit datasets on 'ID'
joined_df = application_df.join(credit_df, on='ID', how='inner')

# Select relevant columns for further analysis
final_df = joined_df.select(
    'ID', 'CODE_GENDER', 'FLAG_OWN_CAR', 'FLAG_OWN_REALTY', 'CNT_CHILDREN', 'AMT_INCOME_TOTAL',
    'NAME_INCOME_TYPE', 'NAME_EDUCATION_TYPE', 'NAME_FAMILY_STATUS', 'NAME_HOUSING_TYPE',
    'AGE_YEARS', 'YEARS_EMPLOYED', 'OCCUPATION_TYPE', 'CNT_FAM_MEMBERS', 'STATUS'
)

# Preview the final dataset
display(final_df)

StatementMeta(, 3d074d4f-b9be-48da-8c16-060c92adb3de, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1c264afe-3aa4-47fa-821d-cc1ed788e1fa)

In [8]:
# 4. Save the processed data to a new CSV file
output_path = "Files/processed_credit_data.csv"
final_df.write.csv(output_path, header=True)

print(f"Processed data saved successfully to {output_path}")

StatementMeta(, 3d074d4f-b9be-48da-8c16-060c92adb3de, 10, Finished, Available, Finished)

Processed data saved successfully to Files/processed_credit_data.csv
