In [1]:
!hdfs dfs -ls hdfs://nn:9000/

In [2]:
!hdfs dfs -D dfs.replication=1 -mkdir -p hdfs://nn:9000/data
!hdfs dfs -D dfs.replication=1 -cp -f file:///nb/data/*.jsonl hdfs://nn:9000/
!hdfs dfs -D dfs.replication=1 -cp -f file:///nb/data/*.csv hdfs://nn:9000/

In [3]:
from pyspark.sql import SparkSession
import time
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, when, lit
from pyspark.sql.types import IntegerType
from pyspark.ml.evaluation import RegressionEvaluator

In [4]:
spark = (SparkSession.builder.appName("cs544")
         .master("spark://boss:7077")
         .config("spark.executor.memory", "1G")
         .config("spark.sql.warehouse.dir", "hdfs://nn:9000/user/hive/warehouse")
         .enableHiveSupport()
         .getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/29 19:48:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
problems_df = spark.read.json("hdfs://nn:9000/problems.jsonl")
solutions_df = spark.read.json("hdfs://nn:9000/solutions.jsonl")

                                                                                

In [6]:
problems_df.limit(5).show()

+-------------+--------+---------+---------+---------------+----------+---------------+-------------------------+------------------+--------------------+-------------+----------+------------+------+----------+
|cf_contest_id|cf_index|cf_points|cf_rating|        cf_tags|difficulty|generated_tests|is_description_translated|memory_limit_bytes|                name|private_tests|problem_id|public_tests|source|time_limit|
+-------------+--------+---------+---------+---------------+----------+---------------+-------------------------+------------------+--------------------+-------------+----------+------------+------+----------+
|          322|       A|    500.0|     1000|            [0]|         7|             93|                    false|         256000000|322_A. Ciel and D...|           45|         1|           2|     2|         1|
|          760|       D|   1000.0|     1600|         [1, 2]|        10|             51|                    false|         256000000|  760_D. Travel Card|       

In [7]:
#q1 RDD API
q1_count = problems_df.rdd.filter(
    lambda x: x.cf_rating >= 1600 and 
    x.private_tests > 0 and 
    "_A." in x.name
).count()
q1_count

                                                                                

217

In [12]:
#q2: DataFrame API
from pyspark.sql.functions import expr

q2_count = problems_df.filter(
    expr("cf_rating >= 1600 AND private_tests > 0 AND name LIKE '%\\\\_A.%'")
).count()

q2_count

217

In [17]:
#q3: Spark SQL
problems_df.write.mode("overwrite").saveAsTable("problems")
q3_result = spark.sql("""
    SELECT COUNT(*) 
    FROM problems 
    WHERE cf_rating >= 1600 
    AND private_tests > 0 
    AND name LIKE '%\\\\_A.%'
""").first()[0]
q3_result

217

In [18]:
#q4: Bucketed Solutions Table
solutions_df.write.mode("overwrite").bucketBy(4, "language").saveAsTable("solutions")
spark.sql("""
    SELECT language, COUNT(*)
    FROM solutions
    GROUP BY language
""").explain()



== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[language#686], functions=[count(1)])
   +- HashAggregate(keys=[language#686], functions=[partial_count(1)])
      +- FileScan parquet spark_catalog.default.solutions[language#686] Batched: true, Bucketed: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://nn:9000/user/hive/warehouse/solutions], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<language:string>, SelectedBucketsCount: 4 out of 4




                                                                                

In [19]:
#q5: Views in Warehouse
csv_files = ["languages", "problem_tests", "sources", "tags"]
for file in csv_files:
    df = spark.read.csv(f"hdfs://nn:9000/{file}.csv", header=True)
    df.createOrReplaceTempView(file)
{
    'problems': False,  
    'solutions': False,
    'languages': True,
    'problem_tests': True, 
    'sources': True,
    'tags': True 
}


{'problems': False,
 'solutions': False,
 'languages': True,
 'problem_tests': True,
 'sources': True,
 'tags': True}

In [20]:
#q6: Correct PYTHON3 Solutions from CODEFORCES
result = spark.sql("""
    SELECT COUNT(*) as count
    FROM solutions s
    JOIN problems p ON s.problem_id = p.problem_id
    JOIN sources src ON p.source = src.source
    WHERE s.is_correct = True 
    AND s.language = 'PYTHON3'
    AND src.source_name = 'CODEFORCES'
""").first()[0]
result

10576

In [21]:
#q7: Problem Difficulty Categories
result = problems_df.select(
    when(col("difficulty") <= 5, "Easy")
    .when(col("difficulty") <= 10, "Medium")
    .otherwise("Hard")
    .alias("difficulty_category")
).groupBy("difficulty_category").count()
difficulty_counts = {row.difficulty_category: row["count"] for row in result.collect()}
difficulty_counts

{'Easy': 409, 'Medium': 5768, 'Hard': 2396}

In [22]:
#q8: Caching Performance
filtered_tests = spark.sql("SELECT * FROM problem_tests WHERE is_generated = 'False'")

times = []
start = time.time()
filtered_tests.agg({"input_chars": "avg", "output_chars": "avg"}).collect()
times.append(time.time() - start)
filtered_tests.cache()
start = time.time()
filtered_tests.agg({"input_chars": "avg", "output_chars": "avg"}).collect()
times.append(time.time() - start)
start = time.time()
filtered_tests.agg({"input_chars": "avg", "output_chars": "avg"}).collect()
times.append(time.time() - start)
filtered_tests.unpersist()

times

                                                                                

[0.7604794502258301, 1.2660470008850098, 0.19753289222717285]

In [23]:
#q9: Decision Tree Prediction
codeforces_df = spark.sql("""
    SELECT p.* 
    FROM problems p
    JOIN sources s ON p.source = s.source
    WHERE s.source_name = 'CODEFORCES'
""")
train_df = codeforces_df.filter(
    (col("cf_rating") > 0) & 
    (col("problem_id") % 2 == 0)
)
test_df = codeforces_df.filter(
    (col("cf_rating") > 0) & 
    (col("problem_id") % 2 == 1)
)
missing_df = codeforces_df.filter(col("cf_rating") == 0)
assembler = VectorAssembler(
    inputCols=["difficulty", "time_limit", "memory_limit_bytes"],
    outputCol="features"
)

dt = DecisionTreeRegressor(
    labelCol="cf_rating",
    featuresCol="features",
    maxDepth=5
)
pipeline = Pipeline(stages=[assembler, dt])
model = pipeline.fit(train_df)
predictions = model.transform(test_df)
evaluator = RegressionEvaluator(
    labelCol="cf_rating",
    predictionCol="prediction",
    metricName="r2"
)
r2_score = evaluator.evaluate(predictions)
r2_score

                                                                                

0.5929835263198762

In [24]:
#q10: Missing cf_rating Predictions
train_avg = train_df.agg({"cf_rating": "avg"}).collect()[0][0]
test_avg = test_df.agg({"cf_rating": "avg"}).collect()[0][0]
missing_predictions = model.transform(missing_df)
missing_avg = missing_predictions.agg({"prediction": "avg"}).collect()[0][0]
(train_avg, test_avg, missing_avg)

(1887.9377431906614, 1893.1106471816283, 1950.4728638818783)