### Initializing

In [None]:
from pyspark.sql import SparkSession
# Create a new Spark session with case-sensitive configuration
spark = SparkSession.builder \
    .appName("NewSparkSession") \
    .config("spark.sql.caseSensitive", "true") \
    .getOrCreate()

In [None]:
from pyspark.sql.functions import *
from pyspark import StorageLevel

In [None]:
cosmos_endpoint = "###"
cosmos_primary_key = "###"
cosmos_database_name = "Capstone"
cosmos_container_name = "Application"
cosmos_container_name2 = "Credit"

In [None]:
# Configuring Spark for Cosmos DB access
spark.conf.set("spark.azure.cosmos.accountEndpoint", cosmos_endpoint)
spark.conf.set("spark.azure.cosmos.accountKey", cosmos_primary_key)
spark.conf.set("spark.azure.cosmos.database", cosmos_database_name)
spark.conf.set("spark.azure.cosmos.container", cosmos_container_name)
spark.conf.set("spark.azure.cosmos.container", cosmos_container_name2)
spark.conf.set("spark.azure.cosmos.read.inferSchema.enabled", "true")  # Optional: Infer schema automatically

In [None]:
# reading Application dataset as application_df

application_df = spark.read.format("cosmos.oltp") \
.option("spark.cosmos.accountEndpoint", cosmos_endpoint) \
.option("spark.cosmos.accountKey", cosmos_primary_key) \
.option("spark.cosmos.database", cosmos_database_name) \
.option("spark.cosmos.container", cosmos_container_name) \
.load()
 
# application_df.show()

In [None]:
# reading Credit dataset as credit_df

credit_df = spark.read.format("cosmos.oltp") \
.option("spark.cosmos.accountEndpoint", cosmos_endpoint) \
.option("spark.cosmos.accountKey", cosmos_primary_key) \
.option("spark.cosmos.database", cosmos_database_name) \
.option("spark.cosmos.container", cosmos_container_name2) \
.load()

### Optimising for better querying

In [None]:
application_df = application_df.cache()
credit_df = credit_df.cache()

### Shape

In [None]:
print(f"No. of records in ApplicationDF: {application_df.count()}")
print(f"No. of columns in ApplicationDF: {len(application_df.columns)}")

In [None]:
print(f"No. of records in CreditDF: {credit_df.count()}")
print(f"No. of records in CreditDF: {len(credit_df.columns)}")

### Type Casting 

In [None]:
application_df = application_df \
    .withColumn("ID", col("ID").cast("int")) \
    .withColumn("CNT_CHILDREN", col("CNT_CHILDREN").cast("int")) \
    .withColumn("AMT_INCOME_TOTAL", col("AMT_INCOME_TOTAL").cast("float")) \
    .withColumn("DAYS_BIRTH", col("DAYS_BIRTH").cast("int")) \
    .withColumn("DAYS_EMPLOYED", col("DAYS_EMPLOYED").cast("int")) \
    .withColumn("FLAG_MOBIL", col("FLAG_MOBIL").cast("int")) \
    .withColumn("FLAG_WORK_PHONE", col("FLAG_WORK_PHONE").cast("int")) \
    .withColumn("FLAG_PHONE", col("FLAG_PHONE").cast("int")) \
    .withColumn("FLAG_EMAIL", col("FLAG_EMAIL").cast("int")) \
    .withColumn("CNT_FAM_MEMBERS", col("CNT_FAM_MEMBERS").cast("float"))

application_df.printSchema()

credit_df = credit_df \
    .withColumn("ID", col("ID").cast("int")) \
    .withColumn("MONTHS_BALANCE", col("MONTHS_BALANCE").cast("int")) \
    .withColumn("STATUS", col("STATUS").cast("string"))

credit_df.printSchema()

### Renaming Column name to standard form

In [None]:
application_df = application_df\
  .withColumnRenamed("CODE_GENDER", "Gender")\
  .withColumnRenamed("FLAG_OWN_CAR", "Car")\
  .withColumnRenamed("FLAG_OWN_REALTY", "property")\
  .withColumnRenamed("CNT_CHILDREN", "Children_Count")\
  .withColumnRenamed("AMT_INCOME_TOTAL", "Annual_Income")\
  .withColumnRenamed("NAME_INCOME_TYPE", "Income_Category")\
  .withColumnRenamed("NAME_EDUCATION_TYPE", "Education")\
  .withColumnRenamed("NAME_FAMILY_STATUS", "Marital_Status")\
  .withColumnRenamed("NAME_HOUSING_TYPE", "Housing_type")\
  .withColumnRenamed("DAYS_BIRTH", "BirthDate")\
  .withColumnRenamed("DAYS_EMPLOYED", "EmpDays")\
  .withColumnRenamed("FLAG_MOBIL", "Mobile")\
  .withColumnRenamed("FLAG_WORK_PHONE", "Work_Phone")\
  .withColumnRenamed("FLAG_PHONE", "Phone")\
  .withColumnRenamed("FLAG_EMAIL", "Email")\
  .withColumnRenamed("OCCUPATION_TYPE", "Occupation")\
  .withColumnRenamed("CNT_FAM_MEMBERS", "Family_size")

In [None]:
application_df.printSchema()

In [None]:
application_df = application_df.drop("id")
application_df = application_df.dropDuplicates()
# application_df.count()

In [None]:
credit_df = credit_df.drop("id")
credit_df = credit_df.dropDuplicates()
# credit_df.count()

### Finding and dropping ID which are repeating

In [None]:
application_df = application_df.orderBy('ID', ascending=False)
application_df = application_df.dropDuplicates(subset=['ID'])

### Findng the null values


In [None]:
application_df.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in application_df.columns])\
    .show()

As we can see that occupation as highest null values(134190) but all other columns has no null values, and 134190 is a huge number and 
dropping it will make a significant change in data, we can assign "unknown" as occupation.

In [None]:
application_df = application_df.fillna({'Occupation': 'Unknown'})

### Finding count of unique values of all columns so that columns with 1 unique value can be dropped

In [None]:
application_df.select([countDistinct(col(c)).alias(c) for c in application_df.columns]).show()

dropping column mobile since it has only 1 distint value

In [None]:
application_df = application_df.drop('Mobile')

### Converting columns into much more standard form

In [None]:
# Cleaning 'education_level' column
application_df = application_df.withColumn(
    'Education',
    when(col('Education') == 'Secondary / secondary special', 'Secondary')
    .otherwise(col('Education'))
)

# Cleaning 'marital_status' column
application_df = application_df.withColumn(
    'Marital_Status',
    when(col('Marital_Status') == 'Single / not married', 'Single')
    .otherwise(col('Marital_Status'))
)

# Cleaning 'house_type' column
application_df = application_df.withColumn(
    'Housing_type',
    when(col('Housing_type') == 'House / apartment', 'House')
    .otherwise(col('Housing_type'))
)

### Converting Birthdate into age

In [None]:
application_df = application_df.withColumn("Age", ((col("BirthDate") / 365) * -1).cast("int"))

### Converting EmpDays to work experience (years)

In [None]:
application_df = application_df.withColumn(
    'experience_year',
    when(col('EmpDays') < 0, (col('EmpDays') / 365) * -1)
    .otherwise(0)
    .cast("int")
)

dropping BirthData and EmpDays column since age and experience is calculated

In [None]:
application_df = application_df.drop('BirthDate','EmpDays')

In [None]:
joined_df = application_df.join(credit_df, on="ID", how="inner")
joined_df = joined_df.cache()
joined_df.show()

In [None]:
joined_df.count()

In [None]:
write_df = joined_df.repartition(1)

### Writing back to storage

In [None]:
storage_account_name = "newtcapstonestorage"
storage_account_key = "7pP7X7l8ryenn0XDORhX6bqiLDClUqfn3moIfPN4KuyrXh8dN7reOOYHaWtVkBwXIauPn+Knj1hn+AStUW9zgQ=="
container_name = "transformeddata"
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key)
output_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/"
write_df.write.mode("overwrite").option("header", "true").parquet(output_path)