In [2]:
import sys
from pyspark import SparkContext
import json

In [5]:
# task1, input format: task1.py <review_filepath> <output_filepath>
# review_path = sys.argv[0]
# output_path = sys.argv[1]
review_path = "./test_review.json"
output_path = "./ans1.json"
sc = SparkContext("local[*]",appName="task1").getOrCreate()
review = sc.textFile(review_path).map(lambda x: json.loads(x))
output = {}

In [22]:
# A. The total number of reviews
output["n_review"] = review.map(lambda x: x["review_id"]).distinct().count()

In [23]:
# B. The number of reviews in 2018
output["n_review_2018"] = review.filter(lambda x: x["date"].startswith("2018")).\
                                map(lambda x: x["review_id"]).distinct().count()

In [24]:
# C. The number of distinct users who wrote reviews
output["n_user"] = review.map(lambda x: x["user_id"]).distinct().count()

In [25]:
# D. The top 10 users who wrote the largest numbers of reviews and the number of reviews they wrote
# top10_user
output["top10_user"] = review.map(lambda x: [x["user_id"],1]).reduceByKey(lambda x,y: x+y)\
                        .map(lambda x: list(x)).takeOrdered(10,key=lambda x: [-x[1],x[0]])

In [26]:
# E. The number of distinct businesses that have been reviewed
# n_business
output["n_business"] = review.map(lambda x: x["business_id"]).distinct().count()

In [27]:
# F. The top 10 businesses that had the largest numbers of reviews and the number of reviews they had
# top10_business
output["top10_business"] = review.map(lambda x: [x["business_id"],1]).reduceByKey(lambda x,y: x+y)\
                        .map(lambda x: list(x)).takeOrdered(10,key=lambda x: [-x[1],x[0]])

In [28]:
output

{'n_review': 30,
 'n_review_2018': 3,
 'n_user': 30,
 'top10_user': [['-mA3-1mN4JIEkqOtdbNXCQ', 1],
  ['2mxBNBeFrgDszqGS5tdEHA', 1],
  ['3CJUJILq7CLHk_9OrvpvQg', 1],
  ['5JVY32_bmTBfIGpCCsnAfw', 1],
  ['6Fz_nus_OG4gar721OKgZA', 1],
  ['86J5DwcFk4f4In1Vxe2TvA', 1],
  ['8NwU4TRsD3S6gIfBqFzDMQ', 1],
  ['DzZ7piLBF-WsJxqosfJgtA', 1],
  ['FIk4lQQu1eTe2EpzQ4xhBA', 1],
  ['GYNnVehQeXjty0xH7-6Fhw', 1]],
 'n_business': 30,
 'top10_business': [['3fw2X5bZYeW9xCz_zGhOHg', 1],
  ['6lj2BJ4tJeu7db5asGHQ4w', 1],
  ['8mIrX_LrOnAqWsB5JrOojQ', 1],
  ['9nTF596jDvBBia2EXXiOOg', 1],
  ['AakkkTuGZA2KBodKi2_u8A', 1],
  ['FQ1wBQb3aNeRMThSQEV0Qg', 1],
  ['FxLfqxdYPA6Z85PFKaqLrg', 1],
  ['Gyrez6K8f1AyR7dzW9fvAw', 1],
  ['I4Nr-MVc26qWr08-S3Q1ow', 1],
  ['LUN6swQYa4xJKaM_UEUOEw', 1]]}

![Screen%20Shot%202023-01-31%20at%2021.23.28.png](attachment:Screen%20Shot%202023-01-31%20at%2021.23.28.png)

In [None]:
# task2
# format: task2.py <review_filepath> <output_filepath> <n_partition>
# show the number of partitions for the RDD used for Task 1 Question F 
# and the number of items per partition
# use a customized partition function to improve the performance of map and reduce tasks

The output for the number of partitions and execution time will be a number. The output for the number of items per partition will be a list of numbers.

![Screen%20Shot%202023-01-31%20at%2017.06.08.png](attachment:Screen%20Shot%202023-01-31%20at%2017.06.08.png)

In [None]:
# task3
# format:task3.py <review_filepath> <business_filepath> <output_filepath_question_a> <output_filepath_question_b>

#### a:What are the average stars for each city?
1. (DO NOT use the stars information in the business file).
2. (DO NOT discard records with empty “city” field prior to aggregation - this just means that you should not worry about performing any error handling, input data cleanup or handling edge case scenarios).
3. (DO NOT perform any round off for the average stars).

#### a:compare the execution time of using two methods to print top 10 cities with highest average stars.
Please note that this task is not graded. You will get full points only if you implement the logic to generate the output file required for this task
1. To evaluate the execution time, start tracking the execution time from the point you load the file. 

    For M1: execution time = loading time + time to create and collect averages, sort using Python and print the first 10 cities.

    For M2: execution time = loading time + time to create and collect averages, sort using Spark and print the first 10 cities.

    The loading time will stay the same for both methods, the idea is to compare the overall execution time for both methods and understand which method is more efficient for an end-to-end solution.
    Please note that for Method 1, only sorting is to be done in Python. Creating and collecting averages needs to be done via RDD.

    You should store the execution time in the json file with the tags “m1” and “m2”.
2. Additionally, add a “reason” field and provide a hard-coded explanation for the observed execution times.
3. Do not round off the execution times.


![Screen%20Shot%202023-01-31%20at%2017.11.34.png](attachment:Screen%20Shot%202023-01-31%20at%2017.11.34.png)