In [1]:
# Casey Reyes & Joaquin Feria

In [2]:
!hdfs dfs -D dfs.replication=1 -cp -f data/*.csv hdfs://nn:9000/

# Part 1: Filtering: RDDs, DataFrames, and Spark

In [3]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("cs544")
         .master("spark://boss:7077")
         .config("spark.executor.memory", "512M")
         .config("spark.sql.warehouse.dir", "hdfs://nn:9000/user/hive/warehouse")
         .enableHiveSupport()
         .getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/09 06:32:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
#q1: how many banks contain the word "first" in their name, ignoring case? Use an RDD to answer.

# TODO: modify to treat the first row as a header
# TODO: modify to infer the schema
banks_df = (spark.read.format("csv")
            .option("header", True)
            .option("inferSchema", True)
            .load("hdfs://nn:9000/arid2017_to_lei_xref_csv.csv"))
rdd = banks_df.rdd
filtered_banks = rdd.filter(lambda x: "first" in x[0].lower())
filtered_banks.count()

# filtered_bank_names = filtered_banks.collect()
# for name in filtered_bank_names:
#     print(name[0])

                                                                                

525

In [5]:
#q2 how many banks contain the word "first" in their name, ignoring case? Use a DataFrame to answer.
from pyspark.sql.functions import expr, col, lower

col("respondent_name")
expr("respondent_name")

filtered_df = banks_df.filter(lower(expr("respondent_name")).like("%first%"))
filtered_pandas_df = filtered_df.select("respondent_name")
filtered_pandas_df.count()

                                                                                

525

In [6]:
#q3 how many banks contain the word "first" in their name, ignoring case? Use Spark SQL to answer.

banks_df.write.saveAsTable("banks", mode="overwrite")
banks_df.createOrReplaceTempView("names")
banks_df.withColumnRenamed("respondent_name", "name").createOrReplaceTempView("names")
filtered_df = spark.sql("SELECT * FROM names WHERE LOWER(name) LIKE '%first%'")
filtered_df.count()

23/11/09 06:33:31 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
23/11/09 06:33:31 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
23/11/09 06:33:35 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
23/11/09 06:33:35 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore UNKNOWN@172.18.0.3
23/11/09 06:33:36 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
23/11/09 06:33:41 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
23/11/09 06:33:42 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
23/11/09 06:33:42 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
23/11/09 06:33:42 W

525

## Part 2: Hive Data Warehouse

In [None]:
loans_df = (spark.read
            .format("csv")
            .option("header", True)
            .option("inferSchema", True)
            .load("hdfs://nn:9000/hdma-wi-2021.csv"))
            # .createOrReplaceTempView("codes"))
(loans_df.write.format("csv")
            .bucketBy(8, 'county_code')
            .mode("overwrite")
            .saveAsTable('loans'))

# loans_df.printSchema
views_list = ["ethnicity", "race", "sex", "states", "counties", "tracts", "action_taken", "denial_reason", "loan_type", "loan_purpose", "preapproval", "property_type"]
for view in views_list:
    loans_df.createOrReplaceTempView(view)



In [None]:
#q4 what tables are in our warehouse?

spark.sql("SHOW TABLES").show()
tables_df = spark.sql("SHOW TABLES")
table_list = tables_df.collect()
table_dict = {row['tableName']: row['isTemporary'] for row in table_list}
table_dict

In [None]:
#q5 how many loan applications has the bank "University of Wisconsin Credit Union" received in 2020 in this dataset?

bank_name = "University of Wisconsin Credit Union"
total_df = banks_df.join(
    loans_df,
    loans_df["lei"] == banks_df["lei_2020"],
    "inner"
).filter(banks_df["respondent_name"] == bank_name)
total_df.count()

In [None]:
#q6 what does .explain("formatted") tell us about how Spark executes Q5?

#1. The table in input[4], denial_reason
#2. It Does not involve hash aggregates

total_df.explain("formatted")

## Part 3: Grouping Rows

In [None]:
#q7 what are the average interest rates for Wells Fargo applications for the ten counties where Wells Fargo receives the most applications?
from pyspark.sql.functions import avg, count

counties_df = (spark.read
            .format("csv")
            .option("header", True)
            .option("inferSchema", True)
            .load("hdfs://nn:9000/counties.csv"))

joined_df = banks_df.join(
    loans_df,
    loans_df["lei"] == banks_df["lei_2020"],
    "inner"
).filter(banks_df["respondent_name"].like("%Wells Fargo%"))
result = (joined_df
 .join(counties_df, on=loans_df["county_code"] == counties_df["STATE"] * 1000 + counties_df["COUNTY"])
 .groupBy("NAME")
 .agg(avg("interest_rate").alias("avg_interest_rate"), count("*").alias("application_count"))
 .orderBy("application_count", ascending=False)
 .select("NAME", "avg_interest_rate")
 .limit(10))
 # .show())

result_list = result.rdd.map(lambda row: row.asDict()).collect()
result_dict = {entry["NAME"]: entry["avg_interest_rate"] for entry in result_list}
result_dict

In [None]:
import matplotlib.pyplot as plt

counties = list(result_dict.keys())
avg_interest_rates = list(result_dict.values())

plt.figure(figsize=(6, 6))
plt.bar(counties, avg_interest_rates, width=0.5)
plt.xlabel('name')
plt.ylabel('Average Interest Rate')
plt.xticks(rotation=90, ha='right')

plt.show()

In [None]:
#q8 when computing a MEAN aggregate per group of loans, under what situation (when) do we require network I/O between the partial_mean and mean operations?

(loans_df.groupBy("county_code").agg({"interest_rate": "mean"}).explain())
(loans_df.groupBy("lei").agg({"interest_rate": "mean"}).explain())

# there is an indication of network I/O during the hashpartitioning(lei#101, 200) Exchange operation. This is because a 
# there is a network shuffle operation to redistribute data based on the hash of the lei column.