In [1]:
%pip install pyspark==3.3.4 -q

Note: you may need to restart the kernel to use updated packages.


In [2]:
import time


In [3]:
from pyspark import SparkConf, SparkContext, RDD
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import split


# The code below may help you if your pc cannot find the correct python executable.
# Don't use this code on the server!
# import os
# import sys
# os.environ['PYSPARK_PYTHON'] = sys.executable
# os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# TODO: Make sure that if you installed Spark version 3.3.4 (recommended) that you install the same version of
#  PySpark. You can do this by running the following command: pip install pyspark==3.3.4

def get_spark_context(on_server: bool) -> SparkContext:
    spark_conf = SparkConf().setAppName("2AMD15")
    if not on_server:
        spark_conf = spark_conf.setMaster("local[*]") # check this

    spark_context = SparkContext.getOrCreate(spark_conf)

    if on_server:
        # TODO: You may want to change ERROR to WARN to receive more info. For larger data sets, to not set the
        #  log level to anything below WARN, Spark will print too much information.
        spark_context.setLogLevel("WARN")

    return spark_context


def q0a(spark_context: SparkContext, on_server: bool) -> DataFrame:
    plays_file_path = "/plays.txt" if on_server else "/kaggle/input/bigdm/plays.txt"

    spark_session = SparkSession(spark_context)

    # Write code in Spark to import the dataset: (a) as a data frame/dataset, and (b) as an RDD. Even
    # though this question receives 0 points, you will rely on this code for the following questions.
    # Therefore, if your code for this question is wrong, this will most likely lead to wrong answers for
    # the following questions.
    
    # TODO: Implement Q0a here by creating a Dataset of DataFrame out of the file at {@code plays_file_path}.
    df = spark_session.read.format("text").load(plays_file_path)

    # Split the values column based on the comma delimiter
    split_col = split(df['value'], ',')

    # Add the split columns to the DataFrame
    df = df.withColumn('userid', split_col.getItem(0))
    df = df.withColumn('songid', split_col.getItem(1))
    df = df.withColumn('rating', split_col.getItem(2))

    # Drop the original values column
    df = df.drop('value')
    print('q0a:')
    print(df.show(1))
    print('q0a end')
    df.createOrReplaceTempView("assignment")
    
    return df


def q0b(spark_context: SparkContext, on_server: bool) -> RDD:
    plays_file_path = "/plays.txt" if on_server else "/kaggle/input/bigdm/plays.txt"

    # TODO: Implement Q0b here by creating an RDD out of the file at {@code plays_file_path}.

    rdd_df = spark_context.textFile(plays_file_path)
    rdd_df_2 = rdd_df.map(lambda x: x.split(","))    

    return rdd_df_2


def q1(spark_context: SparkContext, data_frame: DataFrame):
    
    
    spark_session = SparkSession(spark_context)


    start = time.process_time()
    
    print('q1:')
    spark_session.sql("""SELECT COUNT(*) FROM (SELECT userid FROM assignment
    WHERE rating is not NULL
    GROUP BY userid  HAVING COUNT(rating) >= 100 AND 
    AVG(rating) < 2 )""").show(50)
    
    print(time.process_time() - start)
    print('q1 end')
    
    return


def q2(spark_context: SparkContext, rdd: RDD):
    
    #The user provided at least 100 ratings, and,
    #The user has the minimum average rating, compared to all 
    #users that provided at least 100 ratings
    print(rdd.map(lambda s: len(s)))

    
    return

def q3(spark_context: SparkContext, rdd: RDD):
    # TODO: Implement Q3 here
    return


def q4(spark_context: SparkContext, rdd: RDD):
    # TODO: Implement Q4 here
    return

def q5(spark_context: SparkContext, rdd: RDD):
    # 6 min with 4 partitions
    # filter before sort?
    # try to reduce caching?
    # remove users below certain threshold after first cartesian?
    # remove broadcast?
    
    print("q5 start")

    rdd = rdd.repartition(4)
    start = time.process_time()

    filtered_rdd = rdd.filter(lambda x: len(x) > 2)
    userSongPairs = filtered_rdd.map(lambda x: (int(x[0]), int(x[1])))

    userSongCounts = userSongPairs.aggregateByKey(
        set(),
        lambda songs, song: songs | {song},
        lambda songs1, songs2: songs1 | songs2
    ).filter(lambda x: len(x[1]) <= 8000)
    
    userSongsBroadcast: Broadcast[dict] = spark_context.broadcast(userSongCounts.collectAsMap())

    users = userSongCounts.map(lambda x: x[0]) # cache?

    def count_distinct_songs(*userslist):
        user_songs = userSongsBroadcast.value
        distinct_songs = set()
        for user in userslist:
            user_songs_set = user_songs.get(user, set())
            distinct_songs |= user_songs_set
        return len(distinct_songs)

    user_combinations = users.cartesian(users)
    
    users_filtered = user_combinations.filter(lambda x: x[0] < x[1])

    distinctSongs = users_filtered.map(lambda x: ((x[0], x[1]), count_distinct_songs(x[0], x[1])))

    filtering1 = distinctSongs.filter(lambda x: x[1] <= 8000)

    justUsers = filtering1.map(lambda x: (x[0][0], x[0][1]))
    #.sortBy(lambda x: (x[0], x[1]))
        
    cartesian2 = justUsers.cartesian(users)
    
    cartesian2_filtered = cartesian2.filter(lambda x: x[0][1] < x[1])
    
    distinctSongs2 = cartesian2_filtered.map(lambda x: (x[0][0],x[0][1],x[1],count_distinct_songs(x[0][0],x[0][1],x[1])))

    filtering2 = distinctSongs2.filter(lambda x: x[3] <= 8000)
    
    finalsorted = filtering2.sortBy(lambda x: (x[0], x[1], x[2]))
    
    for user in finalsorted.collect():
        print("Q5:", "<" + ",".join(map(str, user)) + ">")

    print(time.process_time() - start)

    
    
if __name__ == '__main__':
    print("Preparing to run...")
    on_server = False  # TODO: Set this to true if and only if deploying to the server

    spark_context = get_spark_context(on_server)

    #data_frame = q0a(spark_context, on_server)

    rdd = q0b(spark_context, on_server)

    #q1(spark_context, data_frame)

    #q2(spark_context, rdd)

    #q4(spark_context, rdd)

    q5(spark_context, rdd)

    spark_context.stop()
    print("Completed run !")



Preparing to run...


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/25 11:26:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
q5 start


                                                                                

Q5: <72,861,932,6086>
Q5: <72,861,1793,6086>
Q5: <72,932,1793,5663>
Q5: <112,563,861,7029>
Q5: <112,773,861,6973>
Q5: <112,845,861,6959>
Q5: <227,861,1087,6158>
Q5: <227,861,1948,6158>
Q5: <227,1087,1948,5725>
Q5: <373,861,1233,6204>
Q5: <373,861,2094,6204>
Q5: <373,1233,2094,5772>
Q5: <563,773,861,6957>
Q5: <563,845,861,6939>
Q5: <773,845,861,6887>
Q5: <861,932,1793,6086>
Q5: <861,1087,1948,6158>
Q5: <861,1233,2094,6204>
2.9101721339999997
Completed run !


# QUESTION 1

   The average rating of a user is the mean value of 
    all ratings provided by that user. For example,
    assuming that we have the following information for user 1:
    1, 44, 3
    1, 45, 5
    1, 56
    1, 53
    1, 46, 1
    The average rating for the user is (3+5+1)/3. The definition of average rating does not account
    for the lines without a rating.
    Write code that uses SparkSQL to find the total number of 
    all users that provided at least 100
    ratings, and have an average rating below 2.
    Your answer should be printed as follows:
    >> Q1: number
    

Constraints:
a) Your code should achieve the maximum degree of 
parallelism on the available hardware.
The code that is executed centrally (on the master node) should be of constant
complexity. This also means that the code executed on the 
master should contain no loops.

b) The actual computation of your answer should be with a single standard SQL
command, i.e., SQL that could also run in a relational database. 
Besides creating a view or a temporary table (which may use the dataframe API) 
you should not use the dataframe API for the actual computation of the answer.
For example, the following command:

spark.sql(“SELECT name FROM people WHERE age>10”).show()

satisfies this constraint (albeit, it does not correctly answer the question), 

whereas df.filter(col("age").gt(10)).select(“name”);

does not satisfy this constraint because it uses the dataframe API on 
something else than creating a view or a temporary table 
(in this example, for filtering and for projection).
Report/deliver the following information/code:

a) Fill in the missing code in question1.java or question1.py for 
answering the question.
b) The SQL query [Report and poster]
c) The output of the query plan [Report]
d) A paragraph (or pseudocode) describing how you would implement the same
functionality in Spark, but without using SparkSQL. 

Notice that you do not need to actually implement it. 
[Report and short discussion in the poster]
e) A paragraph where you compare your Spark implementation (point d above) 
and the SparkSQL implementation, in terms of efficiency. 
The paragraph should solely rely on the query plan of SparkSQL, 
and on your answer on point d above, i.e., without actually implementing 
point d [Report and short discussion in poster]

The expected length of this question in the report is 1 page. Sub-questions 
a, b, c receive 10
points total, and points d and e receive 10 points total (assuming that the previous
sub-questions are correct).
Hint: For all questions, you can test that your SQL 
    answers are correct on the small dataset, for
which we provide the expected answers. 
Keep in mind that correct answers on the small
dataset do not guarantee that your code is correct. 
However, wrong answers on the small
dataset mean that your code is wrong.
Answer on the small dataset:
>> Q1: 3

# QUESTION 2

You are asked to write a Spark program (not using SparkSQL) for finding the grumpiest user.
The formal definition of the grumpiest user is as follows:

a) The user provided at least 100 ratings, and,
b) The user has the minimum average rating, compared to all users that provided at least
100 ratings.

You can assume that there is exactly one grumpiest user, i.e., no two users have an identical
minimum average rating!

Notice that the definition of the grumpiest user only considers the users with at least 100 ratings!

Failing to take this into account will lead to wrong results.
Your answer should be printed as follows:
>> Q2: userid, averagerating
For example,
>> Q2: 861, 1.0

Constraints:
a) Your code should have a high (but configurable) degree of parallelism. The code that is
executed centrally (on the master node) should be of constant complexity. This also
means that the code executed on the master should contain no loops.
b) Your answer should not use SQL or the dataframe API. This also means that you should
not use the groupBy function. Think of how this functionality could be done with
reduceByKey instead!
Report/deliver the following information/code:

a) Fill in the missing method in question2.java or question2.py.
b) Brief description of your solution [Report and poster]
c) Investigate how parallelism helps on the performance of your code in Question 2. In
particular, test how fast is your code for different degrees of parallelism (indicatively, 1, 5,
10, 20, 40, 80). Plot, discuss, and explain your results. [Report and poster]
The expected length of this question in the report is 1 page. Points a and b receive 10 points
total, and point c receives 10 points total (assuming that the previous points are correct).
Answer on the small dataset:
>> Q2: 861, 1.0