# Assignment 5: SPARKSQL

**Intro**
* The Home Mortgage Disclosure Act (HMDA) requires many financial institutions to maintain, report, and publicly disclose information about mortgages. These public data are important because they help show whether lenders are serving the housing needs of their communities; they give public officials information that helps them make decisions and policies; and they shed light on lending patterns that could be discriminatory.

**Content**

* Inside this data set contains 466,566 observations of Washington State home loans - variables include; demographic information, area specific data, loan status, property type, loan type, loan purpose and originating agency.

**Acknowledgements**

* This data set was compiled by the Consumer Finance Protection Bureau using their automatic filtering for Washington State.

**Inspiration**

* Looking at home loans in Washington State are there any current trends? What are the significant factors that go into loan approval decisions? Are there any area, demographic or gender bias? Can I build a MLA to predict loan decisions?

**Important**
* Upload the wa_hdma.csv dataset onto Databricks and create a table named wa_hdma

** QUESTION 1: ** 
* Write code to start a SQL Context and save it as sqlContext

In [0]:
# Your code goes below
from pyspark.sql import SQLContext # do not change anything here - this will import the SPARKSQL
from pyspark.sql import SQLContext

sqlContext = SQLContext(spark) # your code goes in here

sqlContext    # do not change or delete this. This will be used as a test case. 

Out[119]: <pyspark.sql.context.SQLContext at 0x7feace836520>

** Question 1 Test Case **

* The below cell tests if you coded the above question correctly. Do not change anything in the below cell. Just run it

In [0]:
score = {}
try:
  sqlContext
  score['Question 1'] = 'pass'
except:
  score['Question 1'] = 'fail'
score

Out[120]: {'Question 1': 'pass'}

** QUESTION 2: ** 
* Read the wa_hdma dataset and save it as df using the sqlContext you just created

In [0]:
table_name = "wa_hdma"
df = sqlContext.sql("select * from wa_hdma") # Your code goes in here

df.show(5) # do not change or delete this. This will be used as a test case. 

+---------------------+-----------+----------+-------------------+------------------------------+-----------------------------+----------------+------------------------+---------------------+---------------+-------------+--------------------+--------------------+----------------+--------------------+--------------------+--------------+-----------------+--------------------+-----------------+--------------------+--------------------+--------------------+------------------+---------------------+------------------------+------------------------+------------------------+------------------------+------------------------+---------------------------+-------------------+--------------------------+------------------+---------------------+---------------------+---------------------+---------------------+---------------------+------------------------+--------------------+-----------+
|tract_to_msamd_income|rate_spread|population|minority_population|number_of_owner_occupied_units|number_of_1_to_4_

** Question 2 Test Case **

* * The below cell tests if you coded the above question correctly. Do not change anything in the below cell. Just run it

In [0]:
try:
  if len(df.columns) == 42:
    score['Question 2'] = 'pass'
  else:
    score['Question 2'] = 'fail'
except:
  score['Question 2'] = 'fail'
score

Out[122]: {'Question 1': 'pass', 'Question 2': 'pass'}

**QUESTION 3:**
* Write code to compute the number of float (double) and string variables in the data.

In [0]:
#Your code goes below
from pyspark.sql.functions import col
from pyspark.sql.types import FloatType, IntegerType

#Cast column to specific datatypes
df = df.withColumn("tract_to_msamd_income", col("tract_to_msamd_income").cast(FloatType()))\
       .withColumn("rate_spread", col("rate_spread").cast(FloatType()))\
       .withColumn("minority_population", col("minority_population").cast(FloatType()))\
       .withColumn("census_tract_number", col("census_tract_number").cast(FloatType()))

df = df.withColumn("population", col("population").cast(IntegerType()))\
       .withColumn("number_of_owner_occupied_units", col("number_of_owner_occupied_units").cast(IntegerType()))\
       .withColumn("number_of_1_to_4_family_units", col("number_of_1_to_4_family_units").cast(IntegerType()))\
       .withColumn("loan_amount_000s", col("loan_amount_000s").cast(IntegerType()))\
       .withColumn("hud_median_family_income", col("hud_median_family_income").cast(IntegerType()))\
       .withColumn("applicant_income_000s", col("applicant_income_000s").cast(IntegerType()))\
       .withColumn("sequence_number", col("sequence_number").cast(IntegerType()))\
       .withColumn("application_date_indicator", col("application_date_indicator").cast(IntegerType()))

num_float = len([c for c, t in df.dtypes if t == 'float'])  # your code goes in here
num_string = len([c for c, t in df.dtypes if t == 'string']) # your code goes in here


print(num_float) # do not change or delete this. This will be used as a test case. 
print(num_string) # do not change or delete this. This will be used as a test case. 

4
30


** Question 3 Test Case **

* The below cell tests if you coded the above question correctly. Do not change anything in the below cell. Just run it

In [0]:
try:
  if ((num_float == 4) & (num_string == 30)):
    score['Question 3'] = 'pass'
  else:
    score['Question 3'] = 'fail'
except:
  score['Question 3'] = 'fail'
score

Out[124]: {'Question 1': 'pass', 'Question 2': 'pass', 'Question 3': 'pass'}

** QUESTION 4:** 
* Write code to create a new column named denied.
  * Assume that if `denial_reason_name_1` column is not null, then the loan application is rejected/denied
  * Create a new column in the dataset - Name the column as `denied`
  * Encode the `denied` column as 0 if `denial_reason_name_1` is null, otherwise encode the `denied` column as 1

In [0]:
#Your code goes below
from pyspark.sql.types import IntegerType # do not change or delete this.
from pyspark.sql.functions import udf # do not change or delete this.
from pyspark.sql.functions import * # do not change or delete this.

get_01_label = when(col("denial_reason_name_1").isNull(), 0).otherwise(1) # your code goes in here to write the function to create labels
df = df.withColumn("denied", get_01_label) # your code goes in here to create the labvel column and add it to the dataframe


df.select('denied').show() # do not change or delete this. This will be used as a test case. 

+------+
|denied|
+------+
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
+------+
only showing top 20 rows



** Question 4 Test Case **

* The below cell tests if you coded the above question correctly. Do not change anything in the below cell. Just run it

In [0]:
try:
  if df.select('denied').where('denied = 0').count() == 432067:
    score['Question 4'] = 'pass'
  else:
    score['Question 4'] = 'fail'
except:
  score['Question 4'] = 'fail'
score

Out[126]: {'Question 1': 'pass',
 'Question 2': 'pass',
 'Question 3': 'pass',
 'Question 4': 'pass'}

** QUESTION 5:**
* Write code to find the percentage of denied loans
  * What percentage of loans are denied? 
  * Use the new variable named `denied` in this analysis
  * Count the number of denied loans and divide this by the total number of loans.
  * Multiply the division by 100 so you can get a percentage.
  * Google the average loan application denial rate in the country. Is this number similar to the US average?

In [0]:
from pyspark.sql import functions as F # do not change or delete this.
df = df.replace('', 'null') # do not change or delete this.

percent_denied = (df.filter(F.col('denied') == 1).count() / df.count())*100 # your code goes in here

print(percent_denied) # do not change or delete this. This will be used as a test case.

7.394237899889832


** Question 5 Test Case **

* The below cell tests if you coded the above question correctly. Do not change anything in the below cell. Just run it

In [0]:
try:
  if percent_denied == 7.394237899889832:
    score['Question 5'] = 'pass'
  else:
    score['Question 5'] = 'fail'
except:
  score['Question 5'] = 'fail'
score

Out[128]: {'Question 1': 'pass',
 'Question 2': 'pass',
 'Question 3': 'pass',
 'Question 4': 'pass',
 'Question 5': 'pass'}

** QUESTION 6: **
* Write code to compute the average income of applicants with denied and approved loans
  * Use `applicant_income_000s` variable 
  * Calculate the average income for `denied` = 1 (i.e., denied) and `denied` = 0 (i.e., accepted) applicants (you can use groupBy())

In [0]:
avg_imcome_1 = df.filter(F.col('denied') == 1).agg(F.avg('applicant_income_000s')).first()[0] # your code goes in here
avg_imcome_0 = df.filter(F.col('denied') == 0).agg(F.avg('applicant_income_000s')).first()[0] # your code goes in here


print(avg_imcome_1) # do not change or delete this. This will be used as a test case. 
print(avg_imcome_0) # do not change or delete this. This will be used as a test case. 

98.34589991978774
114.25984967174479


** Question 6 Test Case **

* The below cell tests if you coded the above question correctly. Do not change anything in the below cell. Just run it

In [0]:
try:
  if ((avg_imcome_1 > 98.34) & (avg_imcome_1 < 98.44) & (avg_imcome_0 > 114.25) & (avg_imcome_0 < 114.35)):
    score['Question 6'] = 'pass'
  else:
    score['Question 6'] = 'fail'
except:
  score['Question 6'] = 'fail'
score

Out[130]: {'Question 1': 'pass',
 'Question 2': 'pass',
 'Question 3': 'pass',
 'Question 4': 'pass',
 'Question 5': 'pass',
 'Question 6': 'pass'}

** QUESTION 7: **
* Write code to find the denial rate by sex - male and female
  * Investigate if female applicants have higher rejection rate as compared to male applicants 
  * Use the denied column you created. 
  * To compute the denial rate for male: find the total number of males with denied loans. Then divide this number by the total number of males.
  * To compute the denial rate for female: find the total number of females with denied loans. Then divide this number by the total number of females.
  * Use `applicant_sex_name` for detemining the sex of the applicant

In [0]:
rejection_male = (df.filter((F.col('denied') == 1) & (F.col('applicant_sex_name') == 'Male')).count()/df.filter(F.col('applicant_sex_name') == 'Male').count()) *100 # your code goes in here
rejection_female = (df.filter((F.col('denied') == 1) & (F.col('applicant_sex_name') == 'Female')).count() / df.filter(F.col('applicant_sex_name') == 'Female').count())* 100 # your code goes in here

print(rejection_male) # do not change or delete this. This will be used as a test case. 
print(rejection_female) # do not change or delete this. This will be used as a test case.

7.308581003615839
8.493237723518199


** Question 7 Test Case **

* The below cell tests if you coded the above question correctly. Do not change anything in the below cell. Just run it

In [0]:
try:
  if ((rejection_male > 7.3) & (rejection_male < 7.4) & (rejection_female > 8.4) & (rejection_female < 8.5)):
    score['Question 7'] = 'pass'
  else:
    score['Question 7'] = 'fail'
except:
  score['Question 7'] = 'fail'
score

Out[132]: {'Question 1': 'pass',
 'Question 2': 'pass',
 'Question 3': 'pass',
 'Question 4': 'pass',
 'Question 5': 'pass',
 'Question 6': 'pass',
 'Question 7': 'pass'}

** QUESTION 8:** 
* Write code to find the most common denial reason
  * Use the `denial_reason_name_1` variable
  * Google the most common mortgage denial reasons. Did you get similar results?

In [0]:
most_common_reason = df.filter(df.denial_reason_name_1.isNotNull()).groupBy('denial_reason_name_1').count().orderBy(F.col('count').desc()).first()[0] # your code goes in here

print(most_common_reason) # do not change or delete this. This will be used as a test case.

Debt-to-income ratio


** Question 8 Test Case **

* The below cell tests if you coded the above question correctly. Do not change anything in the below cell. Just run it

In [0]:
try:
  if most_common_reason == 'Debt-to-income ratio':
    score['Question 8'] = 'pass'
  else:
    score['Question 8'] = 'fail'
except:
  score['Question 8'] = 'fail'
score

Out[134]: {'Question 1': 'pass',
 'Question 2': 'pass',
 'Question 3': 'pass',
 'Question 4': 'pass',
 'Question 5': 'pass',
 'Question 6': 'pass',
 'Question 7': 'pass',
 'Question 8': 'pass'}

------------------------------------------------------------------ END OF PART 1 ------------------------------------------------------------------

** PART 2:** 
* This time, we will work on a different dataset named housing_data that includes attributes regarding houses and their sale price
* Make sure you upload the housing_data.csv onto Databricks and create a table named housing_data
* Do not change the below cell just run it to read the data.

In [0]:
# Do not alter this cell. 
df = sqlContext.sql("Select * from housing_data")
df.columns

Out[135]: ['Id',
 'MSSubClass',
 'MSZoning',
 'LotFrontage',
 'LotArea',
 'Street',
 'Alley',
 'LotShape',
 'LandContour',
 'Utilities',
 'LotConfig',
 'LandSlope',
 'Neighborhood',
 'Condition1',
 'Condition2',
 'BldgType',
 'HouseStyle',
 'OverallQual',
 'OverallCond',
 'YearBuilt',
 'YearRemodAdd',
 'RoofStyle',
 'RoofMatl',
 'Exterior1st',
 'Exterior2nd',
 'MasVnrType',
 'MasVnrArea',
 'ExterQual',
 'ExterCond',
 'Foundation',
 'BsmtQual',
 'BsmtCond',
 'BsmtExposure',
 'BsmtFinType1',
 'BsmtFinSF1',
 'BsmtFinType2',
 'BsmtFinSF2',
 'BsmtUnfSF',
 'TotalBsmtSF',
 'Heating',
 'HeatingQC',
 'CentralAir',
 'Electrical',
 '1stFlrSF',
 '2ndFlrSF',
 'LowQualFinSF',
 'GrLivArea',
 'BsmtFullBath',
 'BsmtHalfBath',
 'FullBath',
 'HalfBath',
 'BedroomAbvGr',
 'KitchenAbvGr',
 'KitchenQual',
 'TotRmsAbvGrd',
 'Functional',
 'Fireplaces',
 'FireplaceQu',
 'GarageType',
 'GarageYrBlt',
 'GarageFinish',
 'GarageCars',
 'GarageArea',
 'GarageQual',
 'GarageCond',
 'PavedDrive',
 'WoodDeckSF',
 'Op

** QUESTION 9:** 
* Write code to find the correlation between SalePrice and GrLivArea

In [0]:
# your code goes below
from pyspark.sql.functions import corr
from pyspark.sql.types import DoubleType

#Cast SalePrice and GrLivArea to DoubleType
df = df.withColumn("SalePrice", col("SalePrice").cast(DoubleType()))
df = df.withColumn("GrLivArea", col("GrLivArea").cast(DoubleType()))

#Find the correlation
correlation = df.stat.corr('SalePrice', 'GrLivArea') # your code goes in here


print(correlation) # do not change or delete this. This will be used as a test case. 

0.7086244776126517


** Question 9 Test Case **

* The below cell tests if you coded the above question correctly. Do not change anything in the below cell. Just run it

In [0]:
try:
  if ((correlation > 0.70) & (correlation < 0.80)):
    score['Question 9'] = 'pass'
  else:
    score['Question 9'] = 'fail'
except:
  score['Question 9'] = 'fail'
score

Out[137]: {'Question 1': 'pass',
 'Question 2': 'pass',
 'Question 3': 'pass',
 'Question 4': 'pass',
 'Question 5': 'pass',
 'Question 6': 'pass',
 'Question 7': 'pass',
 'Question 8': 'pass',
 'Question 9': 'pass'}

** QUESTION 10:** 
* Write code to fins the number of houses with garages added later on. You can do this in the following way:
  * Check the YearBuilt variable and compare it with the GarageYrBlt variable. If they are not the same, garage is added later on
  * Create a new column and give 1 for those houses where garage is built later, and 0 for others
  * Name this column as GarageBuiltLater - and count the number of ones.

In [0]:
# Your code goes in here
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

# Your code goes in here to calculate number_of_garages_built_later

df = df.withColumn("GarageBuiltLater", F.when(F.col("YearBuilt") == F.col("GarageYrBlt"), 1).otherwise(0))
number_of_garages_built_later = df.filter(F.col("GarageBuiltLater") == 1).count()



number_of_garages_built_later # do not change or delete this. This will be used as a test case. 

Out[138]: 1089

** Question 10 Test Case **

* The below cell tests if you coded the above question correctly. Do not change anything in the below cell. Just run it

In [0]:
try:
  if number_of_garages_built_later == 1089:
    score['Question 10'] = 'pass'
  else:
    score['Question 10'] = 'fail'
except:
  score['Question 10'] = 'fail'
score

Out[139]: {'Question 1': 'pass',
 'Question 2': 'pass',
 'Question 3': 'pass',
 'Question 4': 'pass',
 'Question 5': 'pass',
 'Question 6': 'pass',
 'Question 7': 'pass',
 'Question 8': 'pass',
 'Question 9': 'pass',
 'Question 10': 'pass'}

** Total Score **

In [0]:
total_score = 0
for i in list(score.values()):
    if i=='pass':
        total_score = total_score + 10
print('your total score is: ', total_score)

your total score is:  100
