In [None]:
#Badri Narayanan Murali Krishnan

In [None]:
! hdfs dfs -D dfs.replication=1 -cp -f data/*.csv hdfs://nn:9000/

In [None]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("cs544")
    .master("spark://boss:7077")
    .config("spark.executor.memory", "512M")
    .config("spark.sql.warehouse.dir", "hdfs://nn:9000/user/hive/warehouse")
    .enableHiveSupport()
    .getOrCreate()
)

In [None]:
#q1
banks_df = spark.read.csv(
    "hdfs://nn:9000/arid2017_to_lei_xref_csv.csv", header=True, inferSchema=True
)
count = banks_df.rdd.filter(
    lambda row: row["respondent_name"].startswith("The")
).count()
count

In [None]:
banks_df.printSchema()

In [None]:
#q2
count = banks_df.filter("""respondent_name LIKE 'The%'""").count()
count

In [None]:
#q3
banks_df.write.saveAsTable("banks", mode="overwrite")
sql_table = spark.sql(
    """SELECT COUNT(*) as count FROM banks WHERE respondent_name LIKE 'The%'"""
)
count = sql_table.collect()[0]["count"]
count

In [None]:
loans_df = spark.read.csv(
    "hdfs://nn:9000/hdma-wi-2021.csv", header=True, inferSchema=True
)
loans_df.write.bucketBy(8, "county_code").saveAsTable("loans", mode="overwrite")

In [None]:
view_names = [
    "ethnicity",
    "race",
    "sex",
    "states",
    "counties",
    "tracts",
    "action_taken",
    "denial_reason",
    "loan_type",
    "loan_purpose",
    "preapproval",
    "property_type",
]
for view_name in view_names:
    df = spark.read.csv(
        f"hdfs://nn:9000/{view_name}.csv", header=True, inferSchema=True
    )
    df.createOrReplaceTempView(view_name)

In [None]:
#q4
tables_df = spark.sql("SHOW TABLES")
tables_dict = {row["tableName"]: row["isTemporary"] for row in tables_df.collect()}
tables_dict

In [None]:
#q5
query = """
SELECT COUNT(*) as count 
FROM banks INNER JOIN loans ON banks.lei_2020 = loans.lei 
WHERE banks.respondent_name = 'University of Wisconsin Credit Union'
"""
result = spark.sql(query)
count = result.collect()[0]["count"]
count

In [None]:
#q6
result.explain("formatted")
# 1. The banks table gets broadcast to all executors. This means
#    Spark first filters the banks table to just find UWCU's records, then copies this small
#    filtered subset to every executor. This is much more efficient than shuffling around the
#    full loans table or trying to coordinate joins across nodes.
#
# 2. Looking at HashAggregates in the plan:
#    - First HashAggregate: Each executor counts its own portion of matching loans locally
#      (this is labeled as "partial" in the plan)
#    - Second HashAggregate: All these local counts get combined into one final number
#      (this is labeled as "final" in the plan)
#    Similar to divide and conquer.

In [None]:
import matplotlib.pyplot as plt

In [None]:
#q7
query = """
WITH stats AS (
    SELECT c.NAME as county_name, COUNT(*) as application_count, AVG(l.interest_rate) as avg_interest_rate
    FROM loans l
    INNER JOIN banks b ON l.lei = b.lei_2020
    INNER JOIN counties c ON l.county_code = c.STATE * 1000 + c.COUNTY
    WHERE b.respondent_name = "Wells Fargo Bank, National Association"
    GROUP BY c.NAME
)
SELECT county_name, avg_interest_rate
FROM stats
ORDER BY application_count DESC
LIMIT 10
"""
results = spark.sql(query).collect()
county_rates = {row["county_name"]: row["avg_interest_rate"] for row in results}
counties = list(county_rates.keys())
rates = list(county_rates.values())

plt.figure(figsize=(10, 6))
plt.bar(counties, rates)
plt.xticks(rotation=45, ha="right")
plt.ylabel("Average Interest Rate")
plt.xlabel("County")
plt.title("Average Wells Fargo Interest Rates by County")
plt.tight_layout()
plt.savefig("../q7.png")

county_rates

In [None]:
#q8
spark.sql(
    """
    SELECT county_code, AVG(interest_rate) as avg_rate
    FROM loans
    GROUP BY county_code
    """
).explain()
spark.sql(
    """
    SELECT lei, AVG(interest_rate) as avg_rate
    FROM loans
    GROUP BY lei
    """
).explain()
# Network I/O between partial_mean and mean operations is required when:
# 1. The data for a single group is spread across multiple partitions
# 2. We need to combine partial results from different executors to get the final mean
#
# GROUP BY county_code (No Network I/O needed):
# - When we created the loans table, we used bucketBy(8, "county_code"), 
#   effectively pre-sorting the data so all loans for each county are in the same bucket.
# - This setup allows each executor to calculate means for counties independently, 
#   as it has all necessary data locally. There's no need to exchange data across the network.

#
# GROUP BY lei (Network I/O required):
# - We didn't bucketize by lei, so loan applications for each bank are randomly scattered
#   across our 8 buckets, randomly filed across different drawers
# - To calculate a bank's mean interest rate, we need to:
#   1. Each executor calculates partial means for its piece (partial_mean)
#   2. Shuffle data over network to group all pieces for each bank together
#   3. Combine these partial results into final means (mean)
#
# The execution plans confirm this: county_code grouping shows a simpler plan without Exchange
# (shuffle) operations, while lei grouping requires data Exchange to compute accurate means.

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier

In [None]:
#q9
df = spark.sql(
    """
    SELECT
        CAST(loan_amount AS DOUBLE) AS loan_amount,
        CAST(income AS DOUBLE) AS income,
        CAST(interest_rate AS DOUBLE) AS interest_rate,
        CASE WHEN action_taken = 1 THEN 1.0 ELSE 0.0 END AS approval
    FROM loans
    """
)
df = df.select("loan_amount", "income", "interest_rate", "approval").fillna(0.0)
train, test = df.randomSplit([0.8, 0.2], seed=41)
train.cache()


assembler = VectorAssembler(
    inputCols=["loan_amount", "income", "interest_rate"], outputCol="X"
)

train_data = assembler.transform(train)
test_data = assembler.transform(test)

accuracy = {"depth=1": 1, "depth=5": 5, "depth=10": 10, "depth=15": 15, "depth=20": 20}

for key, depth in accuracy.items():
    decision_tree = DecisionTreeClassifier(
        maxDepth=depth, labelCol="approval", featuresCol="X", seed=41
    )
    classifier = decision_tree.fit(train_data)
    y_pred = classifier.transform(test_data)
    total = y_pred.filter(
        (y_pred.approval.isNotNull()) & (y_pred.prediction.isNotNull())
    ).count()
    correct = y_pred.filter(
        (y_pred.approval == y_pred.prediction) & (y_pred.approval.isNotNull())
    ).count()
    accuracy[key] = float(correct / total)
accuracy

In [None]:
# q10
# No, the test accuracy does not always increase with larger max_depth values.
# Looking at the numbers:
# - Accuracy increases from depth 1 to 10
# - But then starts decreasing after depth 10
# - At depth 20, accuracy is lower than at depth 10
#
# This happens because deeper trees can start "memorizing" the training data
# rather than learning general patterns => Overfitting.
# At some point, making the tree deeper just makes it overfit to the training
# data, hurting its ability to make good predictions on new data it hasn't
# seen before.