# Assignment 1: Scalable Processing
## Yelp Reviews and Authenticity

Big Data Management | by ___ | ____@itu.dk | date

## Connecting to the Spark Cluster job using the two JobParameters.json

To connect this jupyter notebook with your Spark cluster, we need to tell jupyter how it can access the spark cluster. Below code accomplishes that. Do not worry about how it works, just run the cell once to connect. 

In [None]:
#####################################################################
# DO NOT CHANGE ANYTHING HERE.
# IF YOU HAVE PROBLEMS, CHECK THE ASSIGNMENT GUIDE CAREFULLY 
#####################################################################
from IPython.display import Javascript, display
import jupyterlab
import os, json, pyspark
from pyspark.sql import SparkSession, functions as F
from pyspark.conf import SparkConf
from py4j.protocol import Py4JJavaError


def show_popup(message):
    display(Javascript(f'alert("{message}")'))

def check_correct_file_location():
    items = os.listdir('/work')
    items_expected = ['yelp', 'Home','JobParameters.json', 'emails']
    if sorted(items) != sorted(items_expected):
        items_to_be_moved = [item for item in items if item not in items_expected and item[0] != '.'] # Ignore hidden files starting with .
        show_popup(f"Warning: Found these files {items_to_be_moved} that should (most likely) be moved inside your Home folder. Make sure your Git repository and notebooks are all saved inside your Home folder and not at the 'root'/top of filesystem. Please move your files to prevent them from disappearing.")
    if 'emails' not in items_expected:
        show_popup(f'Error: the folder "emails" does not seem to be accessible - did you remeber to add it to the Spark Cluster job and JupyterLab job?')
    
check_correct_file_location()

SUPPORTED_SPARK_VERSION = "3.3.1"
SUPPORTED_JUPYTERLAB_VERSION = "3.5.1"
if jupyterlab.__version__ != SUPPORTED_JUPYTERLAB_VERSION:
    show_popup(f"Wrong JupyterLab version :( When starting the UCloud job you selected {jupyterlab.__version__} but it should have been {SUPPORTED_JUPYTERLAB_VERSION}")
    show_popup("Please shutdown this JupyterLab job and follow the instructions carefully in the UCloud setup guide PDF on LearnIT") 
elif '_EXECUTED_' in globals(): # Only execute this cell once.
    # check if variable '_EXECUTED_' exists in the global variable namespace
    print("Already been executed once, not running again!")
else:
    print("Cell has not been executed before. Please restart the UCloud jobs if any error message pops up. Running setup cell now.")
    # Two files are automatically read: JobParameters.json for the Spark Cluster job using a temporary spark instance
    # and JobParameters.json for the Jupyter Lab job to extract the hostname of the cluster. 

    MASTER_HOST_NAME = None

    # Open the parameters Jupyter Lab app was launched with
    with open('/work/JobParameters.json', 'r') as file:
        JUPYTER_LAB_JOB_PARAMS = json.load(file)
        # from pprint import pprint; pprint(JUPYTER_LAB_JOB_PARAMS) 
        for resource in JUPYTER_LAB_JOB_PARAMS['request']['resources']:
            if 'hostname' in resource.keys():
                MASTER_HOST_NAME = resource['hostname']
    
    if MASTER_HOST_NAME != "spark-cluster":
        msg = f"The JupyterLab job was started using spark hostname {MASTER_HOST_NAME}. This is not recommended, please start it using spark-cluster instead"
        show_popup(msg)
        print(msg)
    else:
        MASTER_HOST = f"spark://{MASTER_HOST_NAME}:7077"

        conf = SparkConf().setAll([
                ("spark.app.name", 'reading_job_params_app'), 
                ("spark.master", MASTER_HOST),
            ])

        spark = SparkSession.builder.config(conf=conf)\
                                    .getOrCreate()
        
        if spark.version != SUPPORTED_SPARK_VERSION:
            show_popup(f"Wrong Spark Cluster version :( When starting the UCloud job you selected {spark.version} but it should have been {SUPPORTED_SPARK_VERSION}")
            show_popup("Please shutdown this JupyterLab job, the Spark Cluster and follow the instructions carefully in the UCloud setup guide PDF on LearnIT") 

        CLUSTER_PARAMETERS_JSON_DF = spark.read.option("multiline","true").json('/work/JobParameters.json')
        
        # Extract cluster info from the specific JobParameters.json
        NODES = CLUSTER_PARAMETERS_JSON_DF.select("request.replicas").first()[0]
        CPUS_PER_NODE = CLUSTER_PARAMETERS_JSON_DF.select("machineType.cpu").first()[0] - 1
        MEM_PER_NODE = CLUSTER_PARAMETERS_JSON_DF.select("machineType.memoryInGigs").first()[0]

        CLUSTER_CORES_MAX = CPUS_PER_NODE * NODES
        CLUSTER_MEMORY_MAX = MEM_PER_NODE * NODES 
        
        if CPUS_PER_NODE > 1:
            EXECUTOR_CORES = CPUS_PER_NODE - 1  # set cores per executor on worker node
        else:
            EXECUTOR_CORES = CPUS_PER_NODE 

        try:
            EXECUTOR_MEMORY = int(
                MEM_PER_NODE / (CPUS_PER_NODE / EXECUTOR_CORES) * 0.5
            )  # set executor memory in GB on each worker node
        except ZeroDivisionError:
            show_popup(f"Please make sure you selected 3 nodes for the Spark Cluster, each with 24 GB of ram. You selected {MEM_PER_NODE} GB ram and {NODES} node(s)")
            
        # Make sure there is a dir for spark logs
        if not os.path.exists('spark_logs'):
            os.mkdir('spark_logs')

        conf = SparkConf().setAll(
            [
                ("spark.app.name", 'spark_assignment'), # Change to your liking 
                ("spark.sql.caseSensitive", False), # Optional: Make queries strings sensitive to captialization
                ("spark.master", MASTER_HOST),
                ("spark.cores.max", CLUSTER_CORES_MAX),
                ("spark.executor.cores", EXECUTOR_CORES),
                ("spark.executor.memory", str(EXECUTOR_MEMORY) + "g"),
                ("spark.eventLog.enabled", True),
                ("spark.eventLog.dir", "spark_logs"),
                ("spark.history.fs.logDirectory", "spark_logs"),
                ("spark.deploy.mode", "cluster"),
            ]
        )

        ## check executor memory, taking into accout 10% of memory overhead (minimum 384 MiB)
        CHECK = (CLUSTER_CORES_MAX / EXECUTOR_CORES) * (
            EXECUTOR_MEMORY + max(EXECUTOR_MEMORY * 0.10, 0.403)
        )

        assert (
            int(CHECK) <= CLUSTER_MEMORY_MAX
        ), "Executor memory larger than cluster total memory!"

        # Stop previous session that was just for loading cluster params
        spark.stop()

        # Start new session with above config, that has better resource handling
        spark = SparkSession.builder.config(conf=conf)\
                                    .getOrCreate()
        sc = spark.sparkContext
        _EXECUTED_ = True
        print("Success!")

Click on the "SparkMonitor" tab at the top in Jupyter Lab to see the status of running code on the cluster.

## Loading the data
Here we specify where the yelp datasets are located on UCloud and read then using the spark session.

In [None]:
# Read in the business and review files
# This is the path to the shared datasets provided by adding an the dataset input folder
# when submitting the spark cluster job.
business = spark.read.json('file:////work/yelp/yelp_academic_dataset_business.json') # Use the file:/// prefix to indicate we want to read from the cluster's filesystem
business = business.persist()
# Persist 2 commonly used dataframes since they're used for later computations
# https://sparkbyexamples.com/spark/spark-difference-between-cache-and-persist/

users = spark.read.json("file:////work/yelp/yelp_academic_dataset_user.json")

reviews = spark.read.json('file:////work/yelp/yelp_academic_dataset_review.json')
reviews = reviews.persist()

## PySpark example usage

In [None]:
# Show PySpark dataframes:
reviews.show()

In [None]:
business.show()

In [None]:
# Get number of rows with no sampling:
reviews.count()

In [None]:
# OPTIONAL:
# Reduce resource usage and make queries run faster
# by only using a small sample of the dataframe
# and overwriting previous variable "df".
# Useful while developing, not so much to
# provide final answers. Therefore: Remember to 
# to re-read the df when done developing code using
# df = spark.read etc like above.
reviews = reviews.sample(withReplacement=False, fraction=1/50)

# Get number of rows after sampling:
reviews.count() 

In [None]:
business.show()

Example: Say we're only interested in reviews of good mexican restaurants in Arizona. You can delete this when you do your own thing. 

In [None]:
# Filter to only Arizona businesses with "Mexican" as part of their categories
az_mex = business.filter(business.state == "AZ")\
                .filter(business.categories.rlike("Mexican"))\
                .select("business_id", "name")

# Join with the reviews
az_mex_rs = reviews.join(az_mex, on="business_id", how="inner")

# Filter to only 5 star reviews
good_az_mex_rs = az_mex_rs.filter(az_mex_rs.stars == 5)\
                        .select("name","text")

# Print the top 20 rows of the DataFrame
good_az_mex_rs.show()

# Convert to pandas (local object) and save to local file system
good_az_mex_rs.toPandas().to_csv("good_az_reviews.csv", header=True, index=False, encoding='utf-8')


See assignment PDF for task descriptions.

### Task 3.1.1:

In [None]:
# Write your code here...

### Task 3.1.2:

In [None]:
# Write your code here...

### Task 3.1.3: 

In [None]:
# Write your code here...

### Task 3.1.4: 

In [None]:
# Write your code here...

### Task 3.1.5: 

In [None]:
# Write your code here...

### Task 3.2.1: Data Exploration

In [None]:
# Write your code here...

### Task 3.2.2: Hypothesis Testing

In [None]:
# Write your code here...

### Task 3.3: Building a Rating Prediction Model

In [None]:
# Write your code here...