In [1]:
# Do not delete or change this cell

import os

# Define a function to determine if we are running on data bricks
# Return true if running in the data bricks environment, false otherwise
def is_databricks():
    # get the databricks runtime version
    db_env = os.getenv("DATABRICKS_RUNTIME_VERSION")
    
    # if running on data bricks
    if db_env != None:
        return True
    else:
        return False

# Define a function to read the data file.  The full path data file name is constructed
# by checking runtime environment variables to determine if the runtime environment is 
# databricks, or a student's personal computer.  The full path file name is then
# constructed based on the runtime env.
# 
# Params
#   data_file_name: The base name of the data file to load
# 
# Returns the full path file name based on the runtime env
#
def get_training_filename(data_file_name):    
    # if running on data bricks
    if is_databricks():
        # build the full path file name assuming data brick env
        full_path_name = "/FileStore/tables/%s" % data_file_name
    # else the data is assumed to be in the same dir as this notebook
    else:
        # Assume the student is running on their own computer and load the data
        # file from the same dir as this notebook
        full_path_name = data_file_name
    
    # return the full path file name to the caller
    return full_path_name

In [2]:
import re
import seaborn as sns

def shapeOf(df):
  return (df.count(), len(df.columns))

def Filter(string): 
  return [str for str in string if re.match(r'[^\d]+|^', str).group(0) not in ['_c'] ] 

def getDataFrame(file_location):
  df = spark.read.option("header", "true").option("delimiter", ",").csv(file_location)
  df = df.select(Filter(df.columns))
  # df = df.select([col(c).cast("double") for c in df.columns])
  return df

def create_Scatter_plot(title, df, x_axis, y_axis):
  scatter_plot = sns.scatterplot(x=x_axis, y=y_axis, data=df.toPandas())
  scatter_plot.set(title = title);
  plt.tight_layout()
  display(plt.show())
  return None;

def find_indices(df, n):
  #Get the indices of the n highest
  most_important = sorted(range(len(df)), key=lambda i: df[i])[-n:]

  return most_important

def print_head_and_shape(df):
  print(df.toPandas().head())
  print("Shape: " + str(shapeOf(df)))
  return None;

In [3]:
raw_lending_club_df = getDataFrame(get_training_filename("loan.csv"))

# Process the data
1. Create a new column called loan_status, where the various late status are bucketed into one status 'Late'
1. Remove all columns where the number of NA values are greater than 50% of total values.

In [5]:
from pyspark.sql.functions import isnan, when, count, col

# Create a new column called loan_status, where the various late status are bucketed into one status 'Late'
# Also removing all rows where the 'loan_status' is fully paid
bucketed_df = raw_lending_club_df.filter(col('loan_status').isin(['Late (31-120 days)','Charged Off','Late (16-30 days)','Current']))
bucketed_df = bucketed_df.withColumn("loan_status", \
              when(bucketed_df["loan_status"].isin(['Late (31-120 days)','Late (16-30 days)']),'Late').otherwise(bucketed_df["loan_status"]))

In [6]:
# Write a function to remove columns which contain null values

from pyspark.sql import functions as fn

def drop_null_columns(df):
    """
    This function drops all columns which contain null values.
    :param df: A PySpark DataFrame
    """
    null_counts = df.select([fn.count(fn.when(fn.col(c).isNull()|isnan(fn.col(c)), c)).alias(c) for c in df.columns]).collect()[0].asDict()
    to_drop = [k for k, v in null_counts.items() if v > 0]
    df = df.drop(*to_drop)
    return df

In [7]:
# Remove all columns where the number of NA values are greater than 50% of total values.
#Creating a dataframe which has the count of Nas of each column
checkna = bucketed_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in bucketed_df.columns])

#Selecting the columns with NA values not greater than 50%
No_of_rows = bucketed_df.count()
checkna_greater_than_50 = checkna.select([when(fn.col(c)<(No_of_rows*0.50),c).alias(c) for c in checkna.columns])

# Using the function to find the list of columns with less than 50%Na values
final_cols = drop_null_columns(checkna_greater_than_50)
processed_lending_club_df = bucketed_df.select([fn.col(c) for c in final_cols.columns])

# Process columns with indices from 50 to 75
1. Select * columns with indices from 50 to 75
1. Replace NA values with median values

In [9]:
import pandas as pd
from pyspark.sql.types import *

# Select * columns with indices from 50 to 75
part3 = processed_lending_club_df.select(processed_lending_club_df.columns[51:76:1])

# Convert all data in columns to integers
part3 = part3.select([col(c).cast("integer") for c in part3.columns])

In [10]:
part3.printSchema()