In [1]:
##############################################################################
# Cell 0: Setup
##############################################################################

# Imports and Spark session setup
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr

spark = (
    SparkSession.builder.appName("cs544")
    .master("spark://boss:7077")
    .config("spark.executor.memory", "1G")
    .config("spark.sql.warehouse.dir", "hdfs://nn:9000/user/hive/warehouse")
    .enableHiveSupport()
    .getOrCreate()
)

# Load the problems data and show a sample
problems_df = spark.read.json("hdfs://nn:9000/problems.jsonl")
problems_df.limit(5).show()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/02 21:58:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+-------------+--------+---------+---------+---------------+----------+---------------+-------------------------+------------------+--------------------+-------------+----------+------------+------+----------+
|cf_contest_id|cf_index|cf_points|cf_rating|        cf_tags|difficulty|generated_tests|is_description_translated|memory_limit_bytes|                name|private_tests|problem_id|public_tests|source|time_limit|
+-------------+--------+---------+---------+---------------+----------+---------------+-------------------------+------------------+--------------------+-------------+----------+------------+------+----------+
|          322|       A|    500.0|     1000|            [0]|         7|             93|                    false|         256000000|322_A. Ciel and D...|           45|         1|           2|     2|         1|
|          760|       D|   1000.0|     1600|         [1, 2]|        10|             51|                    false|         256000000|  760_D. Travel Card|       

In [2]:
##############################################################################
# Cell 1: Q1 - RDD filtering
##############################################################################
# #q1

q1_count = (
    problems_df.rdd
    .filter(lambda row: row.cf_rating >= 1600 and row.private_tests > 0 and "_A." in row.name)
    .count()
)

# The autograder expects a raw integer on the final line
q1_count


                                                                                

217

In [3]:
##############################################################################
# Cell 2: Q2 - DataFrame filtering
##############################################################################
# #q2

from pyspark.sql.functions import expr

q2_count = (
    problems_df
    .filter(expr("cf_rating >= 1600 AND private_tests > 0 AND name LIKE '%\\\\_A.%'"))
    .count()
)

q2_count


217

In [4]:
##############################################################################
# Cell 3: Q3 - Spark SQL filtering
##############################################################################
# #q3

# Overwrite/create a Hive table for 'problems'
problems_df.write.mode("overwrite").saveAsTable("problems")

# How many problems are there with cf_rating >= 1600, private_tests > 0, and name containing "_A."?
q3_df = spark.sql("""
    SELECT COUNT(*) as cnt
    FROM problems
    WHERE cf_rating >= 1600
      AND private_tests > 0
      AND name LIKE '%\\_A.%'
""")
q3_val = q3_df.collect()[0][0]

q3_val


25/04/02 21:58:37 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
25/04/02 21:58:37 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
25/04/02 21:58:39 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
25/04/02 21:58:39 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore UNKNOWN@172.19.0.4
25/04/02 21:58:40 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
25/04/02 21:58:41 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
25/04/02 21:58:41 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
25/04/02 21:58:41 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
25/04/02 21:58:41 W

217

In [5]:
# Overwrite/create a Hive table for 'problems'
problems_df.write.mode("overwrite").saveAsTable("problems")

                                                                                

In [6]:
##############################################################################
# Cell 4: Q4 - Bucketing solutions
##############################################################################
# #q4

# 1. Drop any existing 'solutions' table
spark.sql("DROP TABLE IF EXISTS solutions")

# 2. Load solutions from HDFS into DataFrame
solutions_df = spark.read.json("hdfs://nn:9000/solutions.jsonl")

# 3. Write the DataFrame as a Hive table with 4 buckets by 'language'
(
    solutions_df
    .write
    .mode("overwrite")
    .bucketBy(4, "language")
    .saveAsTable("solutions")
)

# 4. EXPLAIN the query plan for grouping by language
explain_plan = spark.sql("""
    EXPLAIN
    SELECT language, COUNT(*)
    FROM solutions
    GROUP BY language
""").collect()

# We'll just show the plan text lines.
for row in explain_plan:
    print(row[0])


                                                                                

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[language#310], functions=[count(1)])
   +- HashAggregate(keys=[language#310], functions=[partial_count(1)])
      +- FileScan parquet spark_catalog.default.solutions[language#310] Batched: true, Bucketed: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://nn:9000/user/hive/warehouse/solutions], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<language:string>, SelectedBucketsCount: 4 out of 4




In [7]:
##############################################################################
# Cell 5: Q5 - Warehouse objects
##############################################################################
# #q5

# Read CSV files into DataFrames
languages_df = spark.read.option("header", True).option("inferSchema", True).csv("hdfs://nn:9000/languages.csv")
problem_tests_df = spark.read.option("header", True).option("inferSchema", True).csv("hdfs://nn:9000/problem_tests.csv")
sources_df = spark.read.option("header", True).option("inferSchema", True).csv("hdfs://nn:9000/sources.csv")
tags_df = spark.read.option("header", True).option("inferSchema", True).csv("hdfs://nn:9000/tags.csv")

# Create temp views
languages_df.createOrReplaceTempView("languages")
problem_tests_df.createOrReplaceTempView("problem_tests")
sources_df.createOrReplaceTempView("sources")
tags_df.createOrReplaceTempView("tags")

# A dict of permanent tables vs temp views
warehouse_objects = {
    'problems': False,   # permanent Hive table
    'solutions': False,  # permanent Hive table
    'languages': True,   # temp view
    'problem_tests': True,
    'sources': True,
    'tags': True
}

# Return the dictionary as the final line
warehouse_objects


                                                                                

{'problems': False,
 'solutions': False,
 'languages': True,
 'problem_tests': True,
 'sources': True,
 'tags': True}

In [8]:
##############################################################################
# Cell 6: Q6 - correct PYTHON3 solutions from CODEFORCES
##############################################################################
# #q6

q6_df = spark.sql("""
    SELECT COUNT(*) AS correct_python3_cf
    FROM solutions s
    JOIN problems p
      ON s.problem_id = p.problem_id
    JOIN sources so
      ON p.source = so.source
    WHERE s.is_correct = true
      AND s.language = 'PYTHON3'
      AND so.source_name = 'CODEFORCES'
""")
q6_count = q6_df.collect()[0][0]

q6_count


10576

In [9]:
##############################################################################
# Cell 7: Q7 - categorize problem difficulty
##############################################################################
# #q7

cat_df = spark.sql("""
    SELECT CASE
             WHEN difficulty <= 5 THEN 'Easy'
             WHEN difficulty <= 10 THEN 'Medium'
             ELSE 'Hard'
           END AS difficulty_cat
    FROM problems
""")

counts_df = cat_df.groupBy("difficulty_cat").count()

result_q7 = {}
for row in counts_df.collect():
    result_q7[row["difficulty_cat"]] = row["count"]

# Return dictionary
result_q7


{'Easy': 409, 'Medium': 5768, 'Hard': 2396}

In [10]:
##############################################################################
# Cell 8: Q8 - caching experiment
##############################################################################
# #q8

import time
from pyspark.sql.functions import avg

df_nogen = spark.sql("""
    SELECT *
    FROM problem_tests
    WHERE is_generated = false
""")

def compute_averages(df):
    row = df.select(
        avg("input_chars").alias("avg_in"),
        avg("output_chars").alias("avg_out")
    ).collect()[0]
    return (row["avg_in"], row["avg_out"])

times = []

# a) uncached
t0 = time.time()
_ = compute_averages(df_nogen)
t1 = time.time()
times.append(t1 - t0)

# b) cache
df_nogen.cache()

# c) first cached pass
t2 = time.time()
_ = compute_averages(df_nogen)
t3 = time.time()
times.append(t3 - t2)

# d) second cached pass
t4 = time.time()
_ = compute_averages(df_nogen)
t5 = time.time()
times.append(t5 - t4)

# e) uncache
df_nogen.unpersist()

# Return the list of three timings
times


                                                                                

[0.7722046375274658, 1.0936036109924316, 0.18015623092651367]

In [11]:
##############################################################################
# Cell 9: Q9 - decision tree regression (R^2)
##############################################################################
# #q9

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

cf_problems_df = spark.sql("""
SELECT p.*
FROM problems p
JOIN sources s
  ON p.source = s.source
WHERE s.source_name = 'CODEFORCES'
""")

train_df = cf_problems_df.filter("cf_rating > 0 AND (problem_id % 2) = 0")
test_df = cf_problems_df.filter("cf_rating > 0 AND (problem_id % 2) = 1")
missing_df = cf_problems_df.filter("cf_rating = 0")

assembler = VectorAssembler(
    inputCols=["difficulty", "time_limit", "memory_limit_bytes"],
    outputCol="features"
)
dt = DecisionTreeRegressor(
    labelCol="cf_rating",
    featuresCol="features",
    maxDepth=5
)
pipeline = Pipeline(stages=[assembler, dt])

model = pipeline.fit(train_df)
predictions_test = model.transform(test_df)

evaluator = RegressionEvaluator(labelCol="cf_rating", predictionCol="prediction", metricName="r2")
r2_score = evaluator.evaluate(predictions_test)

r2_score


0.5929835263198762

In [12]:
##############################################################################
# Cell 10: Q10 - average predictions for missing cf_rating
##############################################################################
# #q10

from pyspark.sql.functions import avg

avg_cf_train = train_df.agg(avg("cf_rating")).collect()[0][0]
avg_cf_test = test_df.agg(avg("cf_rating")).collect()[0][0]

predictions_missing = model.transform(missing_df)
avg_pred_missing = predictions_missing.agg(avg("prediction")).collect()[0][0]

# Return the tuple of 3 numbers
(avg_cf_train, avg_cf_test, avg_pred_missing)


(1887.9377431906614, 1893.1106471816283, 1950.4728638818783)