# Home Exam 52002 - 2024-2025 - Part 4: Spark

#**Instructions**
* **Fill your ID Here:** [replace the bracketed text with your ID number]

Rename the solution file 'HomeExam_52002_2024_25_[ID]_Spark.ipynb' such that [ID] should be replaced by your ID number.
* Submit your filled solution by Febuary 28th 23:59 your solution on moodle.

Read carefully all instructions in the Parts 1-3 file, as they apply also here

###Submission Guidelines:

By the end of the exercise, please submit the following **four** files:


1. **Networks, Streaming, Unix, and Batch Task Processing:**  
   - Provide your solutions in both `.ipynb` (Jupyter Notebook) and `.html` formats. Submit after running all parts of the `.ipynb` notebook except the unix part , check that the outputs of each question were created and saved. For the unix part, copy the code and results manually to the `.ipynb` notebook.

2. **Spark Section:**  
   - Submit the fully executed Jupyter Notebook (`.ipynb`) with all expected outputs, after running it in the Databricks environment.
   - Include an `.html` export of the executed notebook displaying the outputs.  


Ensure that all submitted files are clearly labeled and display the required outputs where applicable.

* **Good luck!**


# Part 4: Spark - 25 points

## Q1. Upload the data and preliminary analysis (_ points)

Login into you account- use the **community** login! [link]( https://community.cloud.databricks.com/login.html)

In this part we will work with spark, in order to upload files into Databricks you may look at the [next guide](https://www.youtube.com/watch?v=7zT9GKtBVHM)

Use the `network_review_Oregon.json` from the first section as data. Run the next query in order to check that the upload was successful

Use RDD operations and MAP-Reduce Spark commands using on the `network_rdd` variable to
1.  Find the top 3 users who posted the earliest reviews in the data. What does this number represent (`time`) and what is the data in human language?
2.  Find the top 3 users with the highest number of reviews


In [37]:

%python
# Run this code to load the data first
import json

network_rdd = sc.textFile("/FileStore/tables/final_Term/network_review_Oregon.json") \
        .map(lambda x: json.loads(x))
print(network_rdd.take(10))

UsageError: Line magic function `%python` not found (But cell magic `%%python` exists, did you mean that instead?).


**Solution**

1.

In [38]:
%pip install pyspark

from pyspark.sql import SparkSession
import json

spark = SparkSession.builder \
        .appName("LocalSparkApp") \
        .getOrCreate()
sc = spark.sparkContext

network_rdd = sc.textFile("Data/network-review-Oregon.json") \
        .map(lambda x: json.loads(x))

print(network_rdd.take(10))

from datetime import datetime

# Get (user, time) pairs, sort by time (earliest first) and take top 3
earliest_reviews = network_rdd.map(lambda record: (record['name'], record['time'])) \
                              .sortBy(lambda x: x[1]) \
                              .take(3)

print("\nTop 3 users with the earliest reviews:")
for name, timestamp in earliest_reviews:
    # Convert from milliseconds to seconds
    human_time = datetime.fromtimestamp(timestamp / 1000)
    print(f"Name: {name}, Time: {timestamp} (which is {human_time})")


Note: you may need to restart the kernel to use updated packages.
[{'user_id': '113524129708146481732', 'name': 'Alyssa Miller', 'time': 1534099975802, 'rating': 4, 'text': None, 'pics': None, 'resp': None, 'gmap_id': '0x54950a0305f0bc7d:0xd427df7d61f305d3'}, {'user_id': '114476130197618351498', 'name': 'ColdGhost', 'time': 1605617141349, 'rating': 5, 'text': 'An older gentleman helped us find the exact washers and screws we needed and all the staff was extremely pleasant and helpful great place really.', 'pics': None, 'resp': None, 'gmap_id': '0x549576bf17534d6b:0xa44ebc408880ae85'}, {'user_id': '114675375479747577746', 'name': 'Maria Ortiz', 'time': 1472189695563, 'rating': 5, 'text': None, 'pics': None, 'resp': None, 'gmap_id': '0x549576b3224401e1:0xb9a9291588ca7ef9'}, {'user_id': '106743434380913011087', 'name': 'Dustin Wright', 'time': 1562774566765, 'rating': 5, 'text': 'Dimitri is a stand-up guy and he runs a good operation.', 'pics': None, 'resp': None, 'gmap_id': '0x5495a74d1f

2.

In [39]:
# Create (user, 1) pairs for each review and then count reviews per user
user_review_count = network_rdd.map(lambda record: (record['name'], 1)) \
                               .reduceByKey(lambda count1, count2: count1 + count2)

# Sort by the count in descending order and take the top 3
top_reviewers = user_review_count.sortBy(lambda x: x[1], ascending=False).take(3)

print("Top 3 users with the highest number of reviews:")
for name, count in top_reviewers:
    print(f"Name: {name}, Number of reviews: {count}")


Top 3 users with the highest number of reviews:
Name: A Google User, Number of reviews: 69
Name: Google User, Number of reviews: 22
Name: Matt, Number of reviews: 21


## Q2. Calculate Average rating (_ points)  
Use RDD operations (Map-Reduce) to  calculate the `avg_rating` for each `gmap_id`. Show the top three `gmap_id` sorted by `avg_rating` and the number of reviews (both in descending order).  


**Solution**

In [40]:
# Step 1: Map each record to (gmap_id, (rating, 1))
gmap_rating_pairs = network_rdd.map(lambda record: (record['gmap_id'], (record['rating'], 1)))

# Step 2: Reduce by key to sum ratings and counts
gmap_rating_aggregated = gmap_rating_pairs.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

# Step 3: Calculate average rating for each gmap_id
gmap_avg_rating = gmap_rating_aggregated.map(lambda kv: (kv[0], (kv[1][0] / kv[1][1], kv[1][1])))

# Step 4: Sort by average rating and number of reviews, both in descending order
# The key for sorting is a tuple (avg_rating, total_reviews)
sorted_gmap = gmap_avg_rating.sortBy(lambda kv: (kv[1][0], kv[1][1]), ascending=False)

# Step 5: Take the top three results
top_three_gmaps = sorted_gmap.take(3)

print("Top 3 gmap_id by average rating and review count:")
for gmap_id, (avg_rating, total_reviews) in top_three_gmaps:
    print(f"gmap_id: {gmap_id}, Average Rating: {avg_rating:.2f}, Number of Reviews: {total_reviews}")


Top 3 gmap_id by average rating and review count:
gmap_id: 0x54956d7d623d3741:0x36f9814537de2d8e, Average Rating: 5.00, Number of Reviews: 38
gmap_id: 0x88d92749d865bfc7:0x386f8a39d715d4e2, Average Rating: 5.00, Number of Reviews: 32
gmap_id: 0x88f5a391e525f70b:0x5dec0de9516af60c, Average Rating: 5.00, Number of Reviews: 30


## Q3. Multiplying Sparse Matrices - (_ points)

In the exam's data files, you'll find two files with sparse representations of matrices \( A \) and \( B \), called `A.txt` and `B.txt`.
Write Spark code to compute the product of these two matrices using only basic RDD operations. In addition, find and display the highest entry of the product $AB$. Explain the code implementation.

For guidance, you can refer to this [blog post](https://bmeyers.github.io/Spark_sparse_matrix_mult/), which provides a detailed implementation of matrix-vector multiplication.

**Solution**

In [41]:
# Step 1: Load matrix A and matrix B from text files.
# Now using comma as the delimiter.
A = sc.textFile("Data/A.txt") \
      .map(lambda line: line.split(",")) \
      .map(lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2])))

B = sc.textFile("Data/B.txt") \
      .map(lambda line: line.split(",")) \
      .map(lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2])))

# Step 2: Re-key the matrices on the join key j.
# For matrix A, key by its column index j.
A_keyed = A.map(lambda x: (x[1], (x[0], x[2])))  # (j, (i, A_ij))
# For matrix B, key by its row index j.
B_keyed = B.map(lambda x: (x[0], (x[1], x[2])))  # (j, (k, B_jk))

# Step 3: Join on the common key j.
joined = A_keyed.join(B_keyed)
# Now, each record is of the form:
# (j, ((i, A_ij), (k, B_jk)))

# Step 4: For each joined record, compute the partial product.
# Re-map to key by (i, k) and multiply the values.
partial_products = joined.map(lambda x: ((x[1][0][0], x[1][1][0]), x[1][0][1] * x[1][1][1]))
# Each record now is: ((i, k), A_ij * B_jk)

# Step 5: Sum up the partial products for each (i, k) to get the final product AB.
AB = partial_products.reduceByKey(lambda x, y: x + y)
# AB now holds entries of the matrix product, i.e., ((i, k), value)

# Step 6: Find the highest entry in the product AB.
max_entry = AB.max(key=lambda x: x[1])
print("Highest entry in product AB:")
print(max_entry)


[Stage 78:>                                                         (0 + 4) / 4]

Highest entry in product AB:
((99, 99), 98990000.0)


                                                                                