In [1]:
! hdfs dfs -D dfs.replication=1 -cp -f data/*.csv hdfs://nn:9000/

In [2]:
! hdfs dfs -ls hdfs://nn:9000/

Found 22 items
-rw-r--r--   1 root supergroup        328 2023-11-09 06:08 hdfs://nn:9000/action_taken.csv
-rw-r--r--   1 root supergroup        317 2023-11-09 06:08 hdfs://nn:9000/agency.csv
-rw-r--r--   1 root supergroup     533472 2023-11-09 06:08 hdfs://nn:9000/arid2017_to_lei_xref_csv.csv
-rw-r--r--   1 root supergroup     319092 2023-11-09 06:08 hdfs://nn:9000/counties.csv
-rw-r--r--   1 root supergroup        237 2023-11-09 06:08 hdfs://nn:9000/denial_reason.csv
-rw-r--r--   1 root supergroup        109 2023-11-09 06:08 hdfs://nn:9000/edit_status.csv
-rw-r--r--   1 root supergroup        180 2023-11-09 06:08 hdfs://nn:9000/ethnicity.csv
-rw-r--r--   1 root supergroup  174944099 2023-11-09 06:08 hdfs://nn:9000/hdma-wi-2021.csv
-rw-r--r--   1 root supergroup         41 2023-11-09 06:08 hdfs://nn:9000/hoepa.csv
-rw-r--r--   1 root supergroup        114 2023-11-09 06:08 hdfs://nn:9000/lien_status.csv
-rw-r--r--   1 root supergroup         65 2023-11-09 06:08 hdfs://nn:9000/loan_purpo

In [3]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("cs544")
         .master("spark://boss:7077")
         .config("spark.executor.memory", "512M")
         .config("spark.sql.warehouse.dir", "hdfs://nn:9000/user/hive/warehouse")
         .enableHiveSupport()
         .getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/09 06:09:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
banks_df = (spark.read.format("csv")
      .option("header", True)
      .option("inferSchema", True)
      .load("hdfs://nn:9000/arid2017_to_lei_xref_csv.csv"))

                                                                                

In [5]:
banks_df.dtypes

[('respondent_name', 'string'),
 ('arid_2017', 'string'),
 ('lei_2018', 'string'),
 ('lei_2019', 'string'),
 ('lei_2020', 'string')]

In [6]:
#q1
banks_df.rdd.filter(lambda x: 'first' in x['respondent_name'].lower()).count()

                                                                                

525

In [7]:
#q2
from pyspark.sql.functions import col, expr
banks_df.filter(expr('LOWER(respondent_name) like "%first%"')).count()

                                                                                

525

In [8]:
#q3
banks_df.write.saveAsTable("banks", mode="overwrite")
spark.sql('select count(respondent_name) as count from banks where LOWER(respondent_name) like "%first%"').select('count').first()[0]

23/11/09 06:09:48 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
23/11/09 06:09:48 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
23/11/09 06:09:53 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
23/11/09 06:09:53 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore UNKNOWN@172.26.0.6
23/11/09 06:09:55 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
23/11/09 06:10:00 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
23/11/09 06:10:00 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
23/11/09 06:10:00 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
23/11/09 06:10:00 W

525

In [9]:
! hdfs dfs -ls hdfs://nn:9000/user/hive/warehouse

Found 2 items
drwxr-xr-x   - root supergroup          0 2023-11-09 06:10 hdfs://nn:9000/user/hive/warehouse/banks
drwxr-xr-x   - root supergroup          0 2023-11-09 03:26 hdfs://nn:9000/user/hive/warehouse/loans


In [10]:
loans_df = (spark.read.format("csv")
      .option("header", True)
      .option("inferSchema", True)
      .load("hdfs://nn:9000/hdma-wi-2021.csv"))

                                                                                

In [11]:
(loans_df.write.format('parquet')  
    .bucketBy(8, 'county_code')
    .mode("overwrite")
    .saveAsTable('loans'))

23/11/09 06:10:21 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [12]:
!hdfs dfs -ls hdfs://nn:9000/user/hive/warehouse

Found 2 items
drwxr-xr-x   - root supergroup          0 2023-11-09 06:10 hdfs://nn:9000/user/hive/warehouse/banks
drwxr-xr-x   - root supergroup          0 2023-11-09 06:10 hdfs://nn:9000/user/hive/warehouse/loans


In [13]:
mylist = ["ethnicity", "race", "sex", "states", "counties", "tracts", "action_taken",
 "denial_reason", "loan_type", "loan_purpose", "preapproval", "property_type"]

In [14]:
for i in mylist:
    (spark.read.format("csv")
      .option("header", True)
      .option("inferSchema", True)
      .load(f"hdfs://nn:9000/{i}.csv")).createOrReplaceTempView(i)

                                                                                

In [15]:
#q4
mydict = {}
for i in spark.sql("SHOW TABLES").collect():
    mydict[i['tableName']] = i['isTemporary']
mydict

{'banks': False,
 'loans': False,
 'action_taken': True,
 'counties': True,
 'denial_reason': True,
 'ethnicity': True,
 'loan_purpose': True,
 'loan_type': True,
 'preapproval': True,
 'property_type': True,
 'race': True,
 'sex': True,
 'states': True,
 'tracts': True}

In [16]:
#q5
banks = spark.table('banks')
loans = spark.table('loans')
spark.sql("""
select count(*)
from banks b, loans l
where b.lei_2020 = l.lei and
respondent_name = 'University of Wisconsin Credit Union'
""").first()[0]

                                                                                

19739

In [17]:
#q6
#+- Scan parquet spark_catalog.default.banks (1) => the table being sent to every executor is banks
#Functions [1]: [count(1)] => hash function was caused by count
spark.sql("""
select count(*)
from banks b, loans l
where b.lei_2020 = l.lei and
respondent_name = 'University of Wisconsin Credit Union'
""").explain('formatted')

== Physical Plan ==
AdaptiveSparkPlan (12)
+- HashAggregate (11)
   +- Exchange (10)
      +- HashAggregate (9)
         +- Project (8)
            +- BroadcastHashJoin Inner BuildLeft (7)
               :- BroadcastExchange (4)
               :  +- Project (3)
               :     +- Filter (2)
               :        +- Scan parquet spark_catalog.default.banks (1)
               +- Filter (6)
                  +- Scan parquet spark_catalog.default.loans (5)


(1) Scan parquet spark_catalog.default.banks
Output [2]: [respondent_name#65, lei_2020#69]
Batched: true
Location: InMemoryFileIndex [hdfs://nn:9000/user/hive/warehouse/banks]
PushedFilters: [IsNotNull(respondent_name), EqualTo(respondent_name,University of Wisconsin Credit Union), IsNotNull(lei_2020)]
ReadSchema: struct<respondent_name:string,lei_2020:string>

(2) Filter
Input [2]: [respondent_name#65, lei_2020#69]
Condition : ((isnotnull(respondent_name#65) AND (respondent_name#65 = University of Wisconsin Credit Union)) AND i

In [18]:
banks.limit(3).toPandas()

                                                                                

Unnamed: 0,respondent_name,arid_2017,lei_2018,lei_2019,lei_2020
0,First National Bank,110004,5493003EW6T31TGECO83,5493003EW6T31TGECO83,5493003EW6T31TGECO83
1,"First Mid Bank & Trust, National Association",110045,549300XOTES5TCS8T794,549300XOTES5TCS8T794,549300XOTES5TCS8T794
2,"First Hope Bank, A National Banking Association",110118,5493003XLOX5FDT9R120,5493003XLOX5FDT9R120,5493003XLOX5FDT9R120


In [19]:
loans.limit(3).toPandas()

                                                                                

Unnamed: 0,activity_year,lei,derived_msa-md,state_code,county_code,census_tract,conforming_loan_limit,derived_loan_product_type,derived_dwelling_category,derived_ethnicity,...,denial_reason-2,denial_reason-3,denial_reason-4,tract_population,tract_minority_population_percent,ffiec_msa_md_median_family_income,tract_to_msa_income_percentage,tract_owner_occupied_units,tract_one_to_four_family_homes,tract_median_age_of_housing_units
0,2021,549300FQ2SN6TRRGB032,99999,WI,55055,55055100602,C,Conventional:First Lien,Single Family (1-4 Units):Site-Built,Not Hispanic or Latino,...,,,,3351,2.98,69600,112,1201,1689,38
1,2021,254900HA4DQWAE0W3342,31540,WI,55025,55025012700,C,Conventional:First Lien,Single Family (1-4 Units):Site-Built,Ethnicity Not Available,...,,,,4053,3.38,99000,109,1346,1638,27
2,2021,549300FQ2SN6TRRGB032,99999,WI,55113,55113100500,C,Conventional:First Lien,Single Family (1-4 Units):Site-Built,Not Hispanic or Latino,...,,,,3378,3.46,69600,107,1475,4331,34


In [20]:
spark.sql("SHOW TABLES").show()

+---------+-------------+-----------+
|namespace|    tableName|isTemporary|
+---------+-------------+-----------+
|  default|        banks|      false|
|  default|        loans|      false|
|         | action_taken|       true|
|         |     counties|       true|
|         |denial_reason|       true|
|         |    ethnicity|       true|
|         | loan_purpose|       true|
|         |    loan_type|       true|
|         |  preapproval|       true|
|         |property_type|       true|
|         |         race|       true|
|         |          sex|       true|
|         |       states|       true|
|         |       tracts|       true|
+---------+-------------+-----------+



In [21]:
spark.table('counties').limit(5).toPandas()

Unnamed: 0,AREA,PERIMETER,CO99_D00_,CO99_D00_I,STATE,COUNTY,NAME,LSAD,LSAD_TRANS
0,52.91341,60.56496,2.0,1.0,2,185,North Slope,4,Borough
1,74.22506,63.131,3.0,2.0,2,290,Yukon-Koyukuk,5,Census Area
2,19.67918,40.91407,4.0,3.0,2,188,Northwest Arctic,4,Borough
3,10.69172,31.55687,5.0,4.0,2,180,Nome,5,Census Area
4,0.00375,0.34984,6.0,5.0,2,180,Nome,5,Census Area


In [22]:
#q7
spark.sql("""
select *
from loans l
inner join counties c on l.county_code = c.STATE*1000 + c.COUNTY
""").limit(5).toPandas()

                                                                                

Unnamed: 0,activity_year,lei,derived_msa-md,state_code,county_code,census_tract,conforming_loan_limit,derived_loan_product_type,derived_dwelling_category,derived_ethnicity,...,tract_median_age_of_housing_units,AREA,PERIMETER,CO99_D00_,CO99_D00_I,STATE,COUNTY,NAME,LSAD,LSAD_TRANS
0,2021,549300FQ2SN6TRRGB032,99999,WI,55055,55055100602,C,Conventional:First Lien,Single Family (1-4 Units):Site-Built,Not Hispanic or Latino,...,38,0.16668,1.6507,738.0,737.0,55,55,Jefferson,6,County
1,2021,254900HA4DQWAE0W3342,31540,WI,55025,55025012700,C,Conventional:First Lien,Single Family (1-4 Units):Site-Built,Ethnicity Not Available,...,27,0.35443,2.51806,717.0,716.0,55,25,Dane,6,County
2,2021,549300FQ2SN6TRRGB032,99999,WI,55113,55113100500,C,Conventional:First Lien,Single Family (1-4 Units):Site-Built,Not Hispanic or Latino,...,34,0.40534,2.78246,335.0,334.0,55,113,Sawyer,6,County
3,2021,254900HA4DQWAE0W3342,33460,WI,55093,55093960700,C,Conventional:First Lien,Single Family (1-4 Units):Site-Built,Ethnicity Not Available,...,41,0.174,1.85148,475.0,474.0,55,93,Pierce,6,County
4,2021,549300FQ2SN6TRRGB032,99999,WI,55055,55055101201,C,Conventional:First Lien,Single Family (1-4 Units):Site-Built,Not Hispanic or Latino,...,55,0.16668,1.6507,738.0,737.0,55,55,Jefferson,6,County


In [23]:
#q8
#+- Exchange hashpartitioning(lei#995, 200), ENSURE_REQUIREMENTS, [plan_id=763] => network is only needed for grouping lei, as we only use bucket on county_code
spark.sql("""
select count(*)
from loans l
group by l.county_code
""").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[county_code#998], functions=[count(1)])
   +- HashAggregate(keys=[county_code#998], functions=[partial_count(1)])
      +- FileScan parquet spark_catalog.default.loans[county_code#998] Batched: true, Bucketed: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://nn:9000/user/hive/warehouse/loans], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<county_code:string>, SelectedBucketsCount: 8 out of 8




In [24]:
spark.sql("""
select count(*)
from loans l
group by l.lei
""").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[lei#995], functions=[count(1)])
   +- Exchange hashpartitioning(lei#995, 200), ENSURE_REQUIREMENTS, [plan_id=763]
      +- HashAggregate(keys=[lei#995], functions=[partial_count(1)])
         +- FileScan parquet spark_catalog.default.loans[lei#995] Batched: true, Bucketed: false (bucket column(s) not read), DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://nn:9000/user/hive/warehouse/loans], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<lei:string>




In [25]:
type(loans)
#loans.select('loan_amount', 'income', 'interest_rate', 'action_taken').toPandas()

pyspark.sql.dataframe.DataFrame

In [26]:
from pyspark.sql.functions import col
from pyspark.sql.functions import when
loans = spark.table('loans')
mlcolumns = ['loan_amount', 'income', 'interest_rate','action_taken']
changed = mlcolumns[0:-1]
mldf = loans.select(mlcolumns)
for colname in changed:
    mldf = mldf.withColumn(colname, col(colname).cast('double'))
mldf = mldf.withColumnRenamed("action_taken","approval")
mldf = mldf.fillna(0, subset=changed)
mldf = mldf.withColumn("approval", when(col("approval") != 1, 0).otherwise(1))


In [27]:
mldf.limit(3).toPandas()

Unnamed: 0,loan_amount,income,interest_rate,approval
0,255000.0,210.0,0.0,1
1,435000.0,0.0,3.125,0
2,435000.0,190.0,0.0,1


In [28]:
train, test = mldf.randomSplit([0.8, 0.2], seed=41)
train.cache()

DataFrame[loan_amount: double, income: double, interest_rate: double, approval: int]

In [29]:
mldf.dtypes

[('loan_amount', 'double'),
 ('income', 'double'),
 ('interest_rate', 'double'),
 ('approval', 'int')]

In [30]:
#q9
train.filter(train['approval']==1).count()

                                                                                

242868

In [31]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.pipeline import Pipeline, PipelineModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
va = VectorAssembler(inputCols=changed, outputCol="features")
dt = DecisionTreeClassifier(featuresCol="features", labelCol="approval", maxDepth=5)
pipe = Pipeline(stages=[va, dt])
model = pipe.fit(train)
print(model.stages[1].toDebugString)
result = model.transform(test)

                                                                                

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_ab15894b8280, depth=5, numNodes=19, numClasses=2, numFeatures=3
  If (feature 2 <= 0.495)
   If (feature 1 <= 0.5)
    If (feature 0 <= 380000.0)
     Predict: 0.0
    Else (feature 0 > 380000.0)
     If (feature 0 <= 550000.0)
      Predict: 0.0
     Else (feature 0 > 550000.0)
      If (feature 1 <= -13.5)
       Predict: 0.0
      Else (feature 1 > -13.5)
       Predict: 1.0
   Else (feature 1 > 0.5)
    Predict: 0.0
  Else (feature 2 > 0.495)
   If (feature 1 <= 0.5)
    If (feature 2 <= 2.4995000000000003)
     Predict: 1.0
    Else (feature 2 > 2.4995000000000003)
     If (feature 0 <= 550000.0)
      If (feature 0 <= 160000.0)
       Predict: 1.0
      Else (feature 0 > 160000.0)
       Predict: 0.0
     Else (feature 0 > 550000.0)
      Predict: 1.0
   Else (feature 1 > 0.5)
    Predict: 1.0



In [32]:
#q10
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="approval", metricName="accuracy")
evaluator.evaluate(model.transform(test))

                                                                                

0.8933759280490655