# Problem 1

Assumptions:
1. If there is a line (x, y), there will not be another line with (y, x)


## Part A


Map function:
$$
map : (null, (x,y)) \rightarrow [(x, 1), (y, 1)]
$$
Reduce function:
$$
reduce : (x, [1, 1, ...]) \rightarrow (x, sum([1, 1, ...]))
$$

The file would have each line: $x, \text{count\_of\_friends}$

## Part B

### Phase 1

Map function:
$$
map : (null, (x,y)) \rightarrow [(x, y), (y, x)]
$$


Reduce function (identity):
$$
reduce : (x, [y_1, y_2, ...]) \rightarrow (x, [y_1, y_2, ...])
$$


### Intemediary Step (optional)
Filter out all pairs where the length of the friend list < k


### Phase 2

Map Function:
$$
map : (x_i, [y_1, y_2, ...]) \rightarrow ((y_i, y_j), 1) \forall i,j, i \neq j
$$

Reduce Function:
$$
reduce : ((y_i, y_j), [1,1,...]) \rightarrow ((y_i, y_j), sum([1,1,...]))
$$
Write the $(y_i, y_j)$ pairs that have key $\geq 7$



## Part C

Map function:
$$
map: (x,y) \rightarrow ((x, 1), (y, 1))
$$
Reduce function:
$$
reduce: (x, [1, 1, ...]) \rightarrow 
\begin{cases}
    (x, \text{null}) & \text{with probability } 0.01 \\
    \text{nothing} & \text{otherwise}
\end{cases}
$$

​


# Problem 2

Assumptions:
1. reducer size is the input size of that particular reducer

## Part A

Suppose not, that there is a reducer, $R$, that solves our problem and has a required input size of $n+k$ for some $k>0$.

By definition, this means the reducer $R$ is an algorithm that requires an input sequence of length $n+k$ to operate correctly.

But our problem's input is only of size $n$, the sequence $(a_1,..., a_n)$. When we provide this input to reducer $R$, it cannot run because its definition expects elements at indices larger than $n$, which simply don't exist in the input we have.

This is a contradiction, because $R$ cannot solve the problem if it can't even run on the problem's input. Thus, the reducer size can be at most $n$.

## Part 2

### Idea
First we make a key observation. To compute $m_i$, we can do $m_i = min/{m_{i-1}, a_i/}$ instead of comparing all $a_k$ where $k < i$.

We can do 2 logical steps as follows:
1. Split the input sequence into $\frac{n}{sqrt{n}}$ equal and ordered chunks and compute the min for the largest index in the chunk. 
2. now we will have $\sqrt{n}$ pairs of local minima and we perform the naive version of the algorithm but instead of using all $a_i$ we use only the $a_i$'s that are greater than the largest $m_i$ before the target $m_i$. Additionally we must use all the previous $m_i$'s as well (but we could compute a running minima of all intermediate m_i's given a third pass). This will make the reducer size in the second pass as most $(\sqrt{n}-1)+(\sqrt{n}-1)$ (the second to last element) which is still $O(\sqrt{n})$

### Algorithm

#### Pass 1 - chunk mins

Break array $A$ into $\sqrt{n}$ equal and ordered chunks $C_1,C_2,…,C_{\sqrt{n}}$. For the mapping function we need to be able to map $i$ to a chunk id $c_i$. We do this as follows:

$$c_i=\lceil \frac{i}{\sqrt{n}} \rceil$$


Mapping function:
$$
map : (i, a_i) \mapsto (c_i, a_i)
$$

Reduce function:
$$
reduce: (c_i, [a_i, ... , a_{i+\sqrt{n}-1}]) \mapsto (c_i, M_i)
$$
where $M_i = \min\{[a_i, ... , a_{i+\sqrt{n}-1}]\}$



#### Pass 2 - Assemble

We now have inputs:
1. $(c_j, M_j)$
2. $(i, a_i)$

Mapping Function:
$$
map: (i, a_i) \mapsto (c_i, ('val', i, a_i)) \newline
map: (c_j, M_j) \mapsto (c_{k}, ('min', M_j)) \space \forall c_k>c_j
$$

After the shuffle phase:
 $$(c_i, [('min', M_1), ... , ('min', M_{i-\sqrt{n}})], ('val', i, a_i), ... , ('val', i+\sqrt{n}, a_{i+\sqrt{n}}))$$

Reduce Function:

Each reducer is responsible for a single chunk $c_i$
1. Calculate the running minimum from all minimum chunks:
$$
P = \min\{M_j | ('min', M_j) \in list\}
$$

2. Initialize local_min as P (if no 'min' tuples like in the first chunk, initialise to infinity)
3. Sort the value tuples $('val', i, a_i)$ by $i$
4. Iterate through the sorted value tuples
    - for each $('val', i, a_i)$:
        - update $local_min = min(local_min, a_i)$
        - emit the final pair $(i, local_min)$

We then combine the lists from all reducers and return

# Problem 3

## Part A

The goal is to broadcast $x_0, ... x_{k+1}$ (k+2) numbers to t machines. 

The subsampling happens on the master so there is no communication cost involved. We simply pick each number with probability $k/n$.
The broadcast is the only phase we have a communication cost.

This means we are sending a total of $(k+2)\times t = kt \times 2t$ numbers in total. This is $O(kt)$

## Part B

The goal here is to compute how many numbers fall into each of the $k$ buckets (exist on each machine).

First we count the number of elements in each bucket on each of the $t$ local machines. We end up with a count for each of the $k$ buckets.

Next, we need to transmit this count back to the master to sum up the total number of elements in each bucket. Here, we send $k$ (or 2k if we are sending pairs) numbers to the master from each of the $t$ machines. $G_i$ is then the sum of the count sent from each machine for each bucket.

Thus, in total we are sending at most $2k \times t$ numbers across the network which is $O(kt)$

## Part C

The goal here is to use the global counts that we have from the master to figure out which $G_i$ contains the median and then which element in $G_i$ is the median ($r$).

We now have $(n_i, count_i)$ on the master. Wer can find $G_i$ and $r$ as follows:
$$
\begin{align}
&\text{1. Calculate the median's overall rank}\\
&N = \sum \text{counts}\\
&\text{median\_rank} = \lceil N / 2 \rceil\\
\\
&\text{2. Find the bucket containing the median}\\
&\text{cumulative\_count} = 0\\
&\text{for } j \text{ from } 0 \text{ to length(counts)} - 1\text{:}\\
&\quad\text{// Check if the median falls within the current bucket}\\
&\quad\text{if } (\text{cumulative\_count} + \text{counts}[j]) \geq \text{median\_rank}\text{:}\\
&\quad\quad\text{// 3. Calculate the rank 'r' within this bucket}\\
&\quad\quad r = \text{median\_rank} - \text{cumulative\_count}\\
&\quad\quad\text{return } (j, r) \text{ // Return the bucket index and the rank}\\
\\
&\quad\text{// If not found, add the current bucket's count to the total}\\
&\quad\text{cumulative\_count} = \text{cumulative\_count} + \text{counts}[j]
\end{align}
$$

There is no communication cost since this is all done on the master.

## Part D

We now know $G_j$ and $r$ so we need to collect an ordered list of numbers in $G_j$ to find the median. 

The master instructs all $t$ machines to send it only the numbers they have that belong to bucket $G_j$. The total communication is the total number of elements in $G_j$, which is $n_j$.

The network cost would be the expected size of $n_j$. Since we chose $k$ samples, the data would be divided into $k+1$ chunks of roughly equal size on average.

Thus the network communication cost is approximately $\frac{n}{k+1}$ which is $O(\frac{n}{k})$

Once the master has the list of numbers in $G_j$, we sort it and return the element at index $r$. This is the median.



## Part E

From the above parts we know communication cost is:
$$
Total Cost(k) \approx O(kt) + O(kt) + O(n/k) = O(kt + n/k)
$$

To minimize network cost we need to find $k$ that minimizes $f(k)=kt + n/k$

$$
f'(k) = t-\frac{n}{k^2} \newline
0 = t-\frac{n}{k^2} \newline
k^2=\frac{n}{t} \newline
k = \sqrt{\frac{n}{t}}
$$

Thus we choose $k = \sqrt{\frac{n}{t}}$ to minimize network cost.


# Problem 4



## Config

In [None]:
# Import necessary libraries for Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [7]:
# Initialize Spark session for local machine
spark = SparkSession.builder \
    .appName("Assignment1_Problem4") \
    .master("local[*]") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

# Set log level to basically no verbose output
spark.sparkContext.setLogLevel("ERROR")

print(f"Spark version: {spark.version}")
print(f"Spark UI available at: {spark.sparkContext.uiWebUrl}")

Spark version: 3.5.4
Spark UI available at: http://10.228.244.25:4040


In [8]:
datasources = {
    'links' : spark.read.csv('data/links.csv', header=True, inferSchema=True),
    'movies' : spark.read.csv('data/movies.csv', header=True, inferSchema=True),
    'ratings' : spark.read.csv('data/ratings.csv', header=True, inferSchema=True),
    'tags' : spark.read.csv('data/tags.csv', header=True, inferSchema=True)
}

## Part A

In [9]:
df = datasources['ratings']

avg_count = (
    df.groupBy('movieID').agg(count('rating').alias('count')) #count ratings for each movie
    .agg(avg('count')) # get the average counts
    .collect()[0][0] # get the average
)
print('Average number of ratings per movie: ',avg_count)

Average number of ratings per movie:  10.369806663924312


## Part B

In [41]:
movies = datasources['movies']
ratings = datasources['ratings']

# join genre on movie id in ratings
df = (
    ratings.join(
        movies.select('movieID', 'genres'),
        on='movieID', 
        how='left'
    )
)

genre_avg = (
    df.groupBy('genres')
    .agg(avg('rating').alias('avg_rating')) # get average rating for each genre
    .sort('avg_rating', ascending=False) # sort by average rating
)

genre_avg.show()


+--------------------+----------+
|              genres|avg_rating|
+--------------------+----------+
|Action|Crime|Dram...|       5.0|
|Adventure|Drama|F...|       5.0|
|Comedy|Drama|Fant...|       5.0|
|Animation|Childre...|       5.0|
|Action|Horror|Mys...|       5.0|
|Adventure|Comedy|...|       5.0|
|Drama|Fantasy|Mus...|       5.0|
|Drama|Horror|Romance|       5.0|
|Adventure|Romance...|       5.0|
|Animation|Crime|D...|       5.0|
|Fantasy|Mystery|W...|       5.0|
|Action|Comedy|Dra...|       5.0|
|Animation|Drama|F...|       5.0|
|Comedy|Crime|Dram...|       5.0|
|Animation|Drama|S...|       5.0|
|Comedy|Horror|Mys...|       5.0|
|Comedy|Crime|Fantasy|       5.0|
|Comedy|Crime|Dram...|      4.75|
|   Animation|Romance|      4.75|
|Children|Drama|Ro...|      4.75|
+--------------------+----------+
only showing top 20 rows



## Part C

In [40]:
movies = datasources['movies']
ratings = datasources['ratings']

# we need ratings, movies, and genres
df = (
    ratings.join(
        movies.select('movieID', 'genres', 'title'),
        on='movieID',
        how='left'
    )
)

movie_avg = (
    df.groupBy('movieID', 'title', 'genres')
    .agg(avg('rating').alias('avg_rating')) # get average rating for each movie
    # keep only the top 3 movies in each genre
    .withColumn('rank', 
        row_number().over(Window.partitionBy('genres').orderBy(desc('avg_rating'))))
    .filter('rank <= 3')
    .sort('genres', 'rank')
    .drop('movieID')
)

movie_avg.show()

+--------------------+--------------------+------------------+----+
|               title|              genres|        avg_rating|rank|
+--------------------+--------------------+------------------+----+
|        Black Mirror|  (no genres listed)|               5.0|   1|
|Death Note: Desu ...|  (no genres listed)|               5.0|   2|
|The Adventures of...|  (no genres listed)|               5.0|   3|
|    Knock Off (1998)|              Action|               5.0|   1|
|Big Bird Cage, Th...|              Action|               4.5|   2|
|Master of the Fly...|              Action|               4.5|   3|
|Crippled Avengers...|    Action|Adventure|               5.0|   1|
|Shogun Assassin (...|    Action|Adventure|               5.0|   2|
|Touch of Zen, A (...|    Action|Adventure|               4.5|   3|
|Dragon Ball Z: Br...|Action|Adventure|...|               4.0|   1|
|Dragon Ball Z: Co...|Action|Adventure|...|               4.0|   2|
|The Boy and the B...|Action|Adventure|...|     

## Part D

In [39]:
ratings = datasources['ratings']

user_movies = (
    ratings.select('userID', 'movieID')
    .groupBy('userID')
    .agg(count('movieID').alias('count_rated')) # count num ratings for each user
    .sort('count_rated', ascending=False) # sort by num ratings
)

user_movies.show(10)

+------+-----------+
|userID|count_rated|
+------+-----------+
|   414|       2698|
|   599|       2478|
|   474|       2108|
|   448|       1864|
|   274|       1346|
|   610|       1302|
|    68|       1260|
|   380|       1218|
|   606|       1115|
|   288|       1055|
+------+-----------+
only showing top 10 rows



## Part E

In [43]:
ratings = datasources['ratings']

print('num_users:',ratings.select('userID').agg(countDistinct('userID')).collect()[0][0])

# first we get a dataframe with (userID, list_of_movies_rated)
user_movies = (
    ratings.select('userID', 'movieID')
    .groupBy('userID')
    .agg(collect_list('movieID').alias('movies_rated'))
)

# now we need to check for every user pair what the intersection of their movies is
# first do a cross join and remove rows where user1 == user2
user_pairs = (
    user_movies.alias('u1')
    .crossJoin(user_movies.alias('u2'))
    .filter('u1.userID != u2.userID')
    .select(
        col('u1.userID').alias('userID_u1'),
        col('u1.movies_rated').alias('movies_rated_u1'),
        col('u2.userID').alias('userID_u2'),
        col('u2.movies_rated').alias('movies_rated_u2')
    )
)

# now we need to create another column with the intersection between movies_rated_u1 and movies_rated_u2
# also get the cardinality of the intersection
user_pairs = (
    user_pairs
    .withColumn(
        'movies_rated_intersection',
        array_intersect('movies_rated_u1', 'movies_rated_u2')
    )
    .withColumn(
        'intersection_cardinality',
        size('movies_rated_intersection')
    )
)

# rename cols and drop unnessary columns. sort by intersection_cardinality
user_pairs = (
    user_pairs
    .withColumnRenamed('userID_u1', 'user_1')
    .withColumnRenamed('userID_u2', 'user_2')
    .drop('movies_rated_u1', 'movies_rated_u2', 'movies_rated_intersection')
    .sort('intersection_cardinality', ascending=False)
)


user_pairs.show(10)


num_users: 610


[Stage 281:>                                                        (0 + 1) / 1]

+------+------+------------------------+
|user_1|user_2|intersection_cardinality|
+------+------+------------------------+
|   414|   599|                    1338|
|   599|   414|                    1338|
|   414|   474|                    1077|
|   474|   414|                    1077|
|    68|   414|                     950|
|   414|    68|                     950|
|   414|   448|                     914|
|   448|   414|                     914|
|   274|   414|                     856|
|   414|   274|                     856|
+------+------+------------------------+
only showing top 10 rows



                                                                                