# **Mount Drive**

In [1]:
from google.colab import drive
import os
drive.mount('/content/drive')
os.chdir('/content/drive/My Drive/Colab Notebooks/Big Data Project')

Mounted at /content/drive


### In order for Python to find the Spark, download the findspark library and start it with findspark.init() function.

In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=6637ff8bde1010325faed3ce2fe3c9d155b1c4155f21b46a33a2166708d7325b
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


### In order to work with RDDs, we need to create a SparkContext.

In [134]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import col, countDistinct, isnan
from pyspark.sql.types import StructType, StructField, StringType, DateType, DoubleType, IntegerType


In [68]:
import pandas as pd
import datetime
import csv
from tabulate import tabulate



## Since we write local [*] in the master, it will use all cores in our machine. If we said local [4] it will work with 4 cores.

## getOrCreate is used to create a SparkSession if not present.

In [5]:
spark=SparkSession.builder\
    .master("local[*]")\
    .appName("LoanApproval")\
    .getOrCreate()

In [6]:
sc=spark.sparkContext

## Read Data - SBAnational.csv

In [14]:
data_path="SBAnational.csv"

In [41]:
loan_rdd=sc.textFile(data_path)

# EDA

In [42]:
loan_rdd.take(5)

['LoanNr_ChkDgt,Name,City,State,Zip,Bank,BankState,NAICS,ApprovalDate,ApprovalFY,Term,NoEmp,NewExist,CreateJob,RetainedJob,FranchiseCode,UrbanRural,RevLineCr,LowDoc,ChgOffDate,DisbursementDate,DisbursementGross,BalanceGross,MIS_Status,ChgOffPrinGr,GrAppv,SBA_Appv',
 '1000014003,ABC HOBBYCRAFT,EVANSVILLE,IN,47711,FIFTH THIRD BANK,OH,451120,28-Feb-97,1997,84,4,2,0,0,1,0,N,Y,,28-Feb-99,"$60,000.00 ",$0.00 ,P I F,$0.00 ,"$60,000.00 ","$48,000.00 "',
 '1000024006,LANDMARK BAR & GRILLE (THE),NEW PARIS,IN,46526,1ST SOURCE BANK,IN,722410,28-Feb-97,1997,60,2,2,0,0,1,0,N,Y,,31-May-97,"$40,000.00 ",$0.00 ,P I F,$0.00 ,"$40,000.00 ","$32,000.00 "',
 '1000034009,"WHITLOCK DDS, TODD M.",BLOOMINGTON,IN,47401,GRANT COUNTY STATE BANK,IN,621210,28-Feb-97,1997,180,7,1,0,0,1,0,N,N,,31-Dec-97,"$287,000.00 ",$0.00 ,P I F,$0.00 ,"$287,000.00 ","$215,250.00 "',
 '1000044001,"BIG BUCKS PAWN & JEWELRY, LLC",BROKEN ARROW,OK,74012,1ST NATL BK & TR CO OF BROKEN,OK,0,28-Feb-97,1997,60,2,1,0,0,1,0,N,Y,,30-Jun-97,"$3

In [91]:
def split_and_format(entry):
    csv_reader = csv.reader([entry])

    # Read the first (and only) row from the CSV reader
    parts = next(csv_reader)

    # Define a function to convert strings to appropriate types
    def convert_to_appropriate_type(value):
        # Convert empty strings to None
        if value == '':
            return None
        # Try converting to integer
        try:
            return int(value)
        except ValueError:
            pass
        # Try converting to float
        try:
            return float(value.replace('$', '').replace(',', ''))
        except ValueError:
            pass
        # Try converting to datetime
        try:
            return datetime.datetime.strptime(value, '%d-%b-%y').date()
        except ValueError:
            pass
        # Otherwise, return as string
        return value.strip()

    # Apply type conversion to each part of the entry
    return [tuple(convert_to_appropriate_type(part) for part in parts)]


In [92]:
loan_rdd=loan_rdd.flatMap(split_and_format)

In [93]:
loan_rdd.take(2)

[('LoanNr_ChkDgt',
  'Name',
  'City',
  'State',
  'Zip',
  'Bank',
  'BankState',
  'NAICS',
  'ApprovalDate',
  'ApprovalFY',
  'Term',
  'NoEmp',
  'NewExist',
  'CreateJob',
  'RetainedJob',
  'FranchiseCode',
  'UrbanRural',
  'RevLineCr',
  'LowDoc',
  'ChgOffDate',
  'DisbursementDate',
  'DisbursementGross',
  'BalanceGross',
  'MIS_Status',
  'ChgOffPrinGr',
  'GrAppv',
  'SBA_Appv'),
 (1000014003,
  'ABC HOBBYCRAFT',
  'EVANSVILLE',
  'IN',
  47711,
  'FIFTH THIRD BANK',
  'OH',
  451120,
  datetime.date(1997, 2, 28),
  1997,
  84,
  4,
  2,
  0,
  0,
  1,
  0,
  'N',
  'Y',
  None,
  datetime.date(1999, 2, 28),
  60000.0,
  0.0,
  'P I F',
  0.0,
  60000.0,
  48000.0),
 (1000024006,
  'LANDMARK BAR & GRILLE (THE)',
  'NEW PARIS',
  'IN',
  46526,
  '1ST SOURCE BANK',
  'IN',
  722410,
  datetime.date(1997, 2, 28),
  1997,
  60,
  2,
  2,
  0,
  0,
  1,
  0,
  'N',
  'Y',
  None,
  datetime.date(1997, 5, 31),
  40000.0,
  0.0,
  'P I F',
  0.0,
  40000.0,
  32000.0),
 (10000

In [94]:
loan_rdd.count()

899165

In [95]:
# Read the first line of the RDD to extract the header
header = loan_rdd.first()

# Remove the header from the RDD
loan_data_rdd = loan_rdd.filter(lambda x: x != header)

In [None]:
loan_data_rdd.count()

In [96]:
loan_data_rdd.take(2)

[(1000014003,
  'ABC HOBBYCRAFT',
  'EVANSVILLE',
  'IN',
  47711,
  'FIFTH THIRD BANK',
  'OH',
  451120,
  datetime.date(1997, 2, 28),
  1997,
  84,
  4,
  2,
  0,
  0,
  1,
  0,
  'N',
  'Y',
  None,
  datetime.date(1999, 2, 28),
  60000.0,
  0.0,
  'P I F',
  0.0,
  60000.0,
  48000.0),
 (1000024006,
  'LANDMARK BAR & GRILLE (THE)',
  'NEW PARIS',
  'IN',
  46526,
  '1ST SOURCE BANK',
  'IN',
  722410,
  datetime.date(1997, 2, 28),
  1997,
  60,
  2,
  2,
  0,
  0,
  1,
  0,
  'N',
  'Y',
  None,
  datetime.date(1997, 5, 31),
  40000.0,
  0.0,
  'P I F',
  0.0,
  40000.0,
  32000.0)]

# Report

In [165]:
# =========================================================================
# =========================================================================
# ============================ RDD REPORT =================================
# =========================================================================
# =========================================================================
def report_rdd(rdd, header):
    col = []
    d_type = []
    uniques = []
    n_uniques = []
    nan = []

    # Extract column names
    columns = list(header)

    # Function to calculate percentage of NaN values
    rdd_count = rdd.count()
    # Function to calculate the unique values and their count
    def map_unique_count(row):
        row_set = set(row)
        print(row)
        return len(row_set), list(row_set)[:2]
    # Collect statistics for each column
    report_data = []
    # Iterate over each column
    for i, col_name in enumerate(columns):
        # Append column name
        col.append(col_name)

        # Extract column data
        col_data = rdd.map(lambda x: x[i])
        print(f"{i} {col_data.take(2)}")

        # Determine data type
        dtype = type(col_data.first()).__name__
        d_type.append(dtype)

        # Collect unique values
        distinct_data = col_data.distinct()
        unique_sample = distinct_data.take(2)
        uniques.append(unique_sample)

        # Count number of unique values
        n_unique = distinct_data.count()
        n_uniques.append(n_unique)

        # n_unique = col_data.mapPartitions(map_unique_count)
        # print(n_unique.take(1))


        # Calculate percentage of NaN values
        nan_percentage_val = col_data.filter(lambda x: x is None).count() / rdd_count * 100
        nan.append(nan_percentage_val)

        report_data.append([col_name, dtype, unique_sample, n_unique, nan_percentage_val])

    return report_data
# =========================================================================
# =========================================================================
# ============================= DF REPORT =================================
# =========================================================================
# =========================================================================
def report_df(df, header):


    # Calculate the total number of rows
    rdd_count = df.count()

    # Initialize lists to store column statistics
    col_names = []
    data_types = []
    unique_samples = []
    num_uniques = []
    nan_percentages = []
    report_data = []

    # Iterate over each column
    for col_name in header:
        # Append column name
        col_names.append(col_name)
        selected_col = col(col_name)
        selected_col_df = df.select(selected_col)

        # Determine data type
        dtype = selected_col_df.dtypes[0][1]
        data_types.append(dtype)
        distinct_df = selected_col_df.distinct()
        # Collect unique values
        unique_sample = [row[col_name] for row in distinct_df.limit(2).collect()]
        unique_samples.append(unique_sample)

        # Count number of unique values
        n_unique = distinct_df.count()
        num_uniques.append(n_unique)

        # Calculate percentage of NaN values
        none_percentage_val = df.filter(selected_col.isNull()).count() / rdd_count * 100
        nan_percentages.append(none_percentage_val)
        report_data.append([col_name, dtype, unique_sample, n_unique, none_percentage_val])

    return report_data

In [167]:
# Create a new RDD with only the first 5 rows
sample_rdd = loan_data_rdd.take(2)
# Create a new RDD from the sample data
sample_rdd = sc.parallelize(sample_rdd)

In [163]:
# Define the schema for your DataFrame
# TODO: To be automated
schema = StructType([
    StructField("LoanNr_ChkDgt", IntegerType(), True),
    StructField("Name", StringType(), True),
    StructField("City", StringType(), True),
    StructField("State", StringType(), True),
    StructField("Zip", IntegerType(), True),
    StructField("Bank", StringType(), True),
    StructField("BankState", StringType(), True),
    StructField("NAICS", IntegerType(), True),
    StructField("ApprovalDate", DateType(), True),
    StructField("ApprovalFY", IntegerType(), True),
    StructField("Term", IntegerType(), True),
    StructField("NoEmp", IntegerType(), True),
    StructField("NewExist", IntegerType(), True),
    StructField("CreateJob", IntegerType(), True),
    StructField("RetainedJob", IntegerType(), True),
    StructField("FranchiseCode", IntegerType(), True),
    StructField("UrbanRural", StringType(), True),
    StructField("RevLineCr", StringType(), True),
    StructField("LowDoc", StringType(), True),
    StructField("ChgOffDate", DateType(), True),
    StructField("DisbursementDate", DateType(), True),
    StructField("DisbursementGross", DoubleType(), True),
    StructField("BalanceGross", DoubleType(), True),
    StructField("MIS_Status", StringType(), True),
    StructField("ChgOffPrinGr", DoubleType(), True),
    StructField("GrAppv", DoubleType(), True),
    StructField("SBA_Appv", DoubleType(), True)
])
# Create a DataFrame from the RDD with the specified schema
df = spark.createDataFrame(sample_rdd, schema)

In [166]:

%%time
report_res = report_df(df, header)

# Display the result
column_names = ['Column', 'Type', 'Unique Sample', 'N Unique', '%None']
print(tabulate(report_res, headers=column_names, tablefmt='grid'))

+-------------------+--------+----------------------------------------------------------+------------+---------+
| Column            | Type   | Unique Sample                                            |   N Unique |   %None |
| LoanNr_ChkDgt     | int    | [1000014003, 1000024006]                                 |          2 |       0 |
+-------------------+--------+----------------------------------------------------------+------------+---------+
| Name              | string | ['ABC HOBBYCRAFT', 'LANDMARK BAR & GRILLE (THE)']        |          2 |       0 |
+-------------------+--------+----------------------------------------------------------+------------+---------+
| City              | string | ['EVANSVILLE', 'NEW PARIS']                              |          2 |       0 |
+-------------------+--------+----------------------------------------------------------+------------+---------+
| State             | string | ['IN']                                                   |       

In [157]:
# Call the report function with the sample RDD and header
%%time
report_res = report_rdd(sample_rdd, header)

# Display the result
column_names = ['Column', 'Type', 'Unique Sample', 'N Unique', '%None']
print(tabulate(report_res, headers=column_names, tablefmt='grid'))

0 [1000014003, 1000024006]
1 ['ABC HOBBYCRAFT', 'LANDMARK BAR & GRILLE (THE)']
2 ['EVANSVILLE', 'NEW PARIS']
3 ['IN', 'IN']
4 [47711, 46526]
5 ['FIFTH THIRD BANK', '1ST SOURCE BANK']
6 ['OH', 'IN']
7 [451120, 722410]
8 [datetime.date(1997, 2, 28), datetime.date(1997, 2, 28)]
9 [1997, 1997]
10 [84, 60]
11 [4, 2]
12 [2, 2]
13 [0, 0]
14 [0, 0]
15 [1, 1]
16 [0, 0]
17 ['N', 'N']
18 ['Y', 'Y']
19 [None, None]
20 [datetime.date(1999, 2, 28), datetime.date(1997, 5, 31)]
21 [60000.0, 40000.0]
22 [0.0, 0.0]
23 ['P I F', 'P I F']
24 [0.0, 0.0]
25 [60000.0, 40000.0]
26 [48000.0, 32000.0]
+-------------------+----------+----------------------------------------------------------+------------+---------+
| Column            | Type     | Unique Sample                                            |   N Unique |   %None |
| LoanNr_ChkDgt     | int      | [1000024006, 1000014003]                                 |          2 |       0 |
+-------------------+----------+----------------------------------------

## Apply Map

## Apply ReduceByKey

## Save