In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, regexp_replace, trim, upper, lower, round, datediff, current_date
import pandas as pd
import mysql.connector

In [1]:
spark = SparkSession.builder \
    .appName("CreditCardApproval_Cleaning") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()


25/03/22 04:14:18 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
!pip install mysql-connector

[0m

In [6]:
gcs_bucket = "dataproc-staging-us-central1-458263062208-tw36mmqt"
gcs_data_path = f"gs://{gcs_bucket}/notebooks/jupyter/jupyter/Big Data Class Notebooks/Project/Project2/Data/application_record.csv"

application_df = spark.read.option("header", "true").option("inferSchema", "true").csv(gcs_data_path)

mysql_config = {
    'host': '34.122.208.22',
    'user': 'root',
    'password': 'BigData@12345',
    'database': 'loan_data',
    'table_name': 'credit_record'
}

mysql_conn = mysql.connector.connect(
    host=mysql_config["host"],
    user=mysql_config["user"],
    password=mysql_config["password"],
    database=mysql_config["database"]
)


query = f"SELECT * FROM {mysql_config['table_name']}"
pandas_df = pd.read_sql(query, mysql_conn) # If you try to read via Spark JDBC, you will face issues. That is why using Pandas 
mysql_conn.close()

# IN the project as well, follow the same and you can have upto 5 mb file in SQL server

credit_df = spark.createDataFrame(pandas_df)

25/03/22 04:15:32 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /10.128.0.4:35164 is closed
25/03/22 04:15:32 WARN BlockManagerMasterEndpoint: Error trying to remove broadcast 0 from block manager BlockManagerId(1, my-cluster-w-0.us-central1-c.c.celtic-science-452211-u6.internal, 35911, None)
java.io.IOException: Connection from /10.128.0.4:35164 closed
	at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147) ~[spark-network-common_2.12-3.3.2.jar:3.3.2]
	at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117) ~[spark-network-common_2.12-3.3.2.jar:3.3.2]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transpor

In [7]:
credit_df.show(5)

25/03/22 04:16:31 WARN TaskSetManager: Stage 4 contains a task of very large size (9759 KiB). The maximum recommended task size is 1000 KiB.
[Stage 4:>                                                          (0 + 1) / 1]

+-------+--------------+------+
|     ID|MONTHS_BALANCE|STATUS|
+-------+--------------+------+
|5001711|            -3|     0|
|5001711|            -2|     0|
|5001711|            -1|     0|
|5001711|             0|     X|
|5001712|           -18|     0|
+-------+--------------+------+
only showing top 5 rows



                                                                                

In [8]:
application_df.show(5)

+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+
|     ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|    NAME_INCOME_TYPE| NAME_EDUCATION_TYPE|  NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|
+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+
|5008804|          M|           Y|              Y|           0|        427500.0|             Working|    Higher education|      Civil marriage| Rented apartment|    -12005|        -4542|         1

In [9]:
# Handing Missing and Inconsistent Data

In [10]:

# Drop completely empty rows
application_df = application_df.dropna(how="all")
credit_df = credit_df.dropna(how="all")

# Fill missing values
application_df = application_df.fillna({"OCCUPATION_TYPE": "Unknown"})
credit_df = credit_df.fillna({"STATUS": "X"})

# Handling incorrect data (negative days should be converted to positive)
application_df = application_df.withColumn("DAYS_BIRTH", col("DAYS_BIRTH") * -1)
application_df = application_df.withColumn("DAYS_EMPLOYED", when(col("DAYS_EMPLOYED") < 0, col("DAYS_EMPLOYED") * -1).otherwise(col("DAYS_EMPLOYED")))


In [11]:
# Trim whitespace
application_df = application_df.select([trim(col(c)).alias(c) for c in application_df.columns])

# Standardize categorical values
application_df = application_df.withColumn("CODE_GENDER", upper(col("CODE_GENDER")))
application_df = application_df.withColumn("NAME_FAMILY_STATUS", lower(col("NAME_FAMILY_STATUS")))

# Convert amount fields to float & round to 2 decimals
application_df = application_df.withColumn("AMT_INCOME_TOTAL", round(col("AMT_INCOME_TOTAL"), 2))

In [12]:
# Age in years
application_df = application_df.withColumn("AGE", round(col("DAYS_BIRTH") / 365, 0))

# Employment length in years
application_df = application_df.withColumn("EMPLOYMENT_YEARS", round(col("DAYS_EMPLOYED") / 365, 0))

In [15]:
application_df.select('AGE','DAYS_BIRTH').show()

+----+----------+
| AGE|DAYS_BIRTH|
+----+----------+
|33.0|     12005|
|33.0|     12005|
|59.0|     21474|
|52.0|     19110|
|52.0|     19110|
|52.0|     19110|
|52.0|     19110|
|62.0|     22464|
|62.0|     22464|
|62.0|     22464|
|46.0|     16872|
|46.0|     16872|
|46.0|     16872|
|49.0|     17778|
|49.0|     17778|
|49.0|     17778|
|49.0|     17778|
|49.0|     17778|
|49.0|     17778|
|29.0|     10669|
+----+----------+
only showing top 20 rows



In [None]:
GCS/jupyter/Big Data Class Notebooks/Project/Project2/Project Part 2/Data Cleaning & Transformation.ipynb

In [None]:
GCS/jupyter/Big Data Class Notebooks/Project/Project2/Project Part 2/cleaned_data/untitled.txt

In [17]:
gcs_bucket

'dataproc-staging-us-central1-458263062208-tw36mmqt'

In [18]:
cleaned_gcs_path = f"gs://{gcs_bucket}/notebooks/jupyter/jupyter/Big Data Class Notebooks/Project/Project2/Project_5_steps/cleaned_data/"
application_df.write.mode("overwrite").parquet(cleaned_gcs_path + "application_record_cleaned.parquet")
credit_df.write.mode("overwrite").parquet(cleaned_gcs_path + "credit_record_cleaned.parquet")

25/03/22 04:28:13 WARN TaskSetManager: Stage 12 contains a task of very large size (9759 KiB). The maximum recommended task size is 1000 KiB.
                                                                                