#Assignment: Spark SQL and Data Frames

Dataset: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page (February 2021)

Tech Stack:

1.   PySpark
2.   Google BigQuery



In [1]:
import sys
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import *

import pandas as pd
import numpy as np

In [2]:
spark = (SparkSession
    .builder 
    .appName("spark-cleansing") 
    .getOrCreate()
    )
sc = spark.sparkContext
sc.setLogLevel("WARN")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/25 14:31:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

In [4]:
df = (
    spark.read
    .format("csv")
    .option("sep", ",")
    .option("header", True)
    .load('/home/Archie/final-project/spark/resources/data/application_record.csv')
)

In [5]:
rows = df.count()
cols = len(df.columns)

print(f'Dimensions of Data: {(rows,cols)}')
print(f'Rows of Data: {rows}')
print(f'Columns of Data: {cols}')

Dimensions of Data: (438557, 18)
Rows of Data: 438557
Columns of Data: 18


In [6]:
df.show(5)

+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+
|     ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|    NAME_INCOME_TYPE| NAME_EDUCATION_TYPE|  NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|
+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+
|5008804|          M|           Y|              Y|           0|        427500.0|             Working|    Higher education|      Civil marriage| Rented apartment|    -12005|        -4542|         1

In [7]:
df_transform1 = df.withColumn("YEARS_BIRTH", floor(abs(df["DAYS_BIRTH"] / 365.25))) \
                    .withColumn("YEARS_EMPLOYED", floor(abs(df["DAYS_EMPLOYED"] / 365.25))) \
                    .drop("DAYS_BIRTH") \
                    .drop("DAYS_EMPLOYED")

In [8]:
df_transform1.show(5)

+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-----------------+----------+---------------+----------+----------+---------------+---------------+-----------+--------------+
|     ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|    NAME_INCOME_TYPE| NAME_EDUCATION_TYPE|  NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|YEARS_BIRTH|YEARS_EMPLOYED|
+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-----------------+----------+---------------+----------+----------+---------------+---------------+-----------+--------------+
|5008804|          M|           Y|              Y|           0|        427500.0|             Working|    Higher education|      Civil marriage| Rented apartment|         1|              1|  

In [9]:
df_transform2 = df_transform1.withColumn("CODE_GENDER", when(df.CODE_GENDER == "F", 1).otherwise(0)) \
                                .withColumn("FLAG_OWN_CAR", when(df["FLAG_OWN_CAR"] == "Y", 1).otherwise(0)) \
                                .withColumn("FLAG_OWN_REALTY", when(df["FLAG_OWN_REALTY"] == "Y", 1).otherwise(0))

In [10]:
df_transform2.show(5)

+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-----------------+----------+---------------+----------+----------+---------------+---------------+-----------+--------------+
|     ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|    NAME_INCOME_TYPE| NAME_EDUCATION_TYPE|  NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|YEARS_BIRTH|YEARS_EMPLOYED|
+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-----------------+----------+---------------+----------+----------+---------------+---------------+-----------+--------------+
|5008804|          0|           1|              1|           0|        427500.0|             Working|    Higher education|      Civil marriage| Rented apartment|         1|              1|  

In [11]:
df_transform3 = df_transform2.na.drop("all")

In [12]:
rows = df_transform3.count()
cols = len(df_transform3.columns)

print(f'Dimensions of Data: {(rows,cols)}')
print(f'Rows of Data: {rows}')
print(f'Columns of Data: {cols}')

Dimensions of Data: (438557, 18)
Rows of Data: 438557
Columns of Data: 18


                                                                                

In [13]:
df_transform3.coalesce(1).write \
      .option("header","true") \
      .option("sep",",") \
      .mode("overwrite") \
      .csv("/home/Archie/final-project/spark/resources/data/spark_output")

                                                                                

In [14]:
df_transform3.toPandas().to_csv("/home/Archie/final-project/spark/resources/data/spark_output/applicant_record-full.csv", index=False) 

                                                                                