In [1]:
#####################################################################
# DO NOT CHANGE ANYTHING HERE.
# IF YOU HAVE PROBLEMS, CHECK THE ASSIGNMENT GUIDE CAREFULLY 
#####################################################################
    
# Only execute this cell once.
if '_EXECUTED_' in globals():
    # check if variable '_EXECUTED_' exists in the global variable namespace
    print("Already been executed once, not running again!")
else:
    print("Cell has not been executed before, running...")
    import os, json, pyspark
    from pyspark.conf import SparkConf
    from pyspark.sql import SparkSession, functions as F

    # Two files are automatically read: JobParameters.json for the Spark Cluster job using a temporary spark instance
    # and JobParameters.json for the Jupyter Lab job to extract the hostname of the cluster. 

    MASTER_HOST_NAME = None

    # Open the parameters Jupyter Lab app was launched with
    with open('/work/JobParameters.json', 'r') as file:
        JUPYTER_LAB_JOB_PARAMS = json.load(file)
        # from pprint import pprint; pprint(JUPYTER_LAB_JOB_PARAMS) 
        for resource in JUPYTER_LAB_JOB_PARAMS['request']['resources']:
            if 'hostname' in resource.keys():
                MASTER_HOST_NAME = resource['hostname']

    MASTER_HOST = f"spark://{MASTER_HOST_NAME}:7077"

    conf = SparkConf().setAll([
            ("spark.app.name", 'reading_job_params_app'), 
            ("spark.master", MASTER_HOST),
        ])
    spark = SparkSession.builder.config(conf=conf)\
                                .getOrCreate()

    CLUSTER_PARAMETERS_JSON_DF = spark.read.option("multiline","true").json('/work/JobParameters.json')

    # Extract cluster info from the specific JobParameters.json
    NODES = CLUSTER_PARAMETERS_JSON_DF.select("request.replicas").first()[0]
    CPUS_PER_NODE = CLUSTER_PARAMETERS_JSON_DF.select("machineType.cpu").first()[0] - 1
    MEM_PER_NODE = CLUSTER_PARAMETERS_JSON_DF.select("machineType.memoryInGigs").first()[0]

    CLUSTER_CORES_MAX = CPUS_PER_NODE * NODES
    CLUSTER_MEMORY_MAX = MEM_PER_NODE * NODES 
    
    if CPUS_PER_NODE > 1:
        EXECUTOR_CORES = CPUS_PER_NODE - 1  # set cores per executor on worker node
    else:
        EXECUTOR_CORES = CPUS_PER_NODE 

    EXECUTOR_MEMORY = int(
        MEM_PER_NODE / (CPUS_PER_NODE / EXECUTOR_CORES) * 0.5
    )  # set executor memory in GB on each worker node

    # Make sure there is a dir for spark logs
    if not os.path.exists('spark_logs'):
        os.mkdir('spark_logs')

    conf = SparkConf().setAll(
        [
            ("spark.app.name", 'spark_assignment'), # Change to your liking 
            ("spark.sql.caseSensitive", False), # Optional: Make queries strings sensitive to captialization
            ("spark.master", MASTER_HOST),
            ("spark.cores.max", CLUSTER_CORES_MAX),
            ("spark.executor.cores", EXECUTOR_CORES),
            ("spark.executor.memory", str(EXECUTOR_MEMORY) + "g"),
            ("spark.eventLog.enabled", True),
            ("spark.eventLog.dir", "spark_logs"),
            ("spark.history.fs.logDirectory", "spark_logs"),
            ("spark.deploy.mode", "cluster"),
        ]
    )

    ## check executor memory, taking into accout 10% of memory overhead (minimum 384 MiB)
    CHECK = (CLUSTER_CORES_MAX / EXECUTOR_CORES) * (
        EXECUTOR_MEMORY + max(EXECUTOR_MEMORY * 0.10, 0.403)
    )

    assert (
        int(CHECK) <= CLUSTER_MEMORY_MAX
    ), "Executor memory larger than cluster total memory!"

    # Stop previous session that was just for loading cluster params
    spark.stop()

    # Start new session with above config, that has better resource handling
    spark = SparkSession.builder.config(conf=conf)\
                                .getOrCreate()
    sc = spark.sparkContext
    _EXECUTED_ = True
    print("Success!")

Cell has not been executed before, running...
Success!


In [2]:
# Read in the business and review files
# This is the path to the shared datasets provided by adding an the dataset input folder
# when submitting the spark cluster job.
business = spark.read.json('file:////work/yelp/yelp_academic_dataset_business.json') # Use the file:/// prefix to indicate we want to read from the cluster's filesystem
business = business.persist()
# Persist 2 commonly used dataframes since they're used for later computations
# https://sparkbyexamples.com/spark/spark-difference-between-cache-and-persist/

users = spark.read.json("file:////work/yelp/yelp_academic_dataset_user.json")

reviews = spark.read.json('file:////work/yelp/yelp_academic_dataset_review.json')
reviews = reviews.persist()

In [3]:
r = reviews.withColumnRenamed("stars", "review_stars")
b = business.withColumnRenamed("stars", "business_stars")
rb = r.join(b, "business_id")

In [4]:
rb.show()

+--------------------+----+-------------------+-----+--------------------+------------+--------------------+------+--------------------+----------------+--------------------+--------------------+---------------+--------------------+-------+----------+-----------+-----------------+-----------+------------+--------------+-----+
|         business_id|cool|               date|funny|           review_id|review_stars|                text|useful|             user_id|         address|          attributes|          categories|           city|               hours|is_open|  latitude|  longitude|             name|postal_code|review_count|business_stars|state|
+--------------------+----+-------------------+-----+--------------------+------------+--------------------+------+--------------------+----------------+--------------------+--------------------+---------------+--------------------+-------+----------+-----------+-----------------+-----------+------------+--------------+-----+
|---kPU91CF4Lq2-

In [5]:
# from pyspark.ml.feature import VectorAssembler

# assembler = VectorAssembler(inputCols=["funny"], outputCol='features')
# data = assembler.transform(rb)
# data = data.select(['features', 'review_stars'])
# data.show()

In [6]:
# from pyspark.ml.feature import VectorAssembler

# lr = LinearRegression(featuresCol="features", labelCol="review_stars")
# model = lr.fit(data)
# test_stats = model.evaluate(data)
# print(f"RMSE: {test_stats.rootMeanSquaredError}")
# print(f"R2: {test_stats.r2}")
# print(f"MSE: {test_stats.meanSquaredError}")

In [7]:
# test_stats.predictions.show()

In [8]:
r.dtypes

[('business_id', 'string'),
 ('cool', 'bigint'),
 ('date', 'string'),
 ('funny', 'bigint'),
 ('review_id', 'string'),
 ('review_stars', 'double'),
 ('text', 'string'),
 ('useful', 'bigint'),
 ('user_id', 'string')]

In [9]:
import pyspark.sql.functions as f
from functools import reduce
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.sql.types import IntegerType

authent_words = ["authentic", "veritable", "legitimate"]
neg_words = [ "dirty", "kitsch", "cheap", "rude", "simple", "bland", "dodgy", "poisoning" ]
pos_words = [ "clean", "refined", "elegant", "stylish" ]

def count_word(word, column):
    return f.expr(f"size(split({column}, '{word}')) -1")
auth_counts = [count_word(word, "text") for word in authent_words]
neg_counts = [count_word(word, "text") for word in neg_words]
pos_counts = [count_word(word, "text") for word in pos_words]
auth_count = reduce(lambda a, b: a + b, auth_counts)
neg_count = reduce(lambda a, b: a + b, neg_counts)
pos_count = reduce(lambda a, b: a + b, pos_counts)

# Set up data
rbd = rb\
    .withColumn("year", f.substring(rb.date, 3, 2).cast(IntegerType()))\
    .withColumn("month", f.substring(rb.date, 6, 2).cast(IntegerType()))\
    .withColumn("day", f.substring(rb.date, 9, 2).cast(IntegerType()))\
    .withColumn("auth_count", auth_count)\
    .withColumn("neg_count", neg_count)\
    .withColumn("pos_count", pos_count)

# One-hot encode states
states = [i.state for i in rbd.select('state').distinct().collect()]
for state in states:
    rbd = rbd.withColumn(state, rbd.state == state)

In [10]:
# Prepare feature column
from pyspark.ml.feature import VectorAssembler

inputCols = [
    "cool", "funny", "useful", "review_count",
    "year", "month", "day", "auth_count", "neg_count", "pos_count",
] + states

assembler = VectorAssembler(inputCols=inputCols, outputCol='features')
data = assembler.transform(rbd)
data = data.select(['features', 'review_stars'])
data.show()

+--------------------+------------+
|            features|review_stars|
+--------------------+------------+
|(37,[3,4,5,6,22],...|         3.0|
|(37,[0,2,3,4,5,6,...|         5.0|
|(37,[3,4,5,6,10],...|         3.0|
|(37,[0,2,3,4,5,6,...|         5.0|
|(37,[0,2,3,4,5,6,...|         4.0|
|(37,[0,1,2,3,4,5,...|         1.0|
|(37,[1,3,4,5,6,11...|         5.0|
|(37,[2,3,4,5,6,22...|         5.0|
|(37,[1,2,3,4,5,6,...|         3.0|
|(37,[3,4,5,6,15],...|         3.0|
|(37,[2,3,4,5,6,8,...|         5.0|
|(37,[3,4,5,6,20],...|         4.0|
|(37,[3,4,5,6,18],...|         4.0|
|(37,[3,4,5,6,22],...|         4.0|
|(37,[1,3,4,5,6,8,...|         4.0|
|(37,[3,4,5,6,24],...|         5.0|
|(37,[3,4,5,6,22],...|         5.0|
|(37,[2,3,4,5,6,18...|         4.0|
|(37,[0,2,3,4,5,6,...|         5.0|
|(37,[3,4,5,6,22],...|         5.0|
+--------------------+------------+
only showing top 20 rows



In [11]:
# Do linear regression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
import numpy as np

SEED = 1337
CV_SPLITS = 5
SPLIT_SIZE = 1/CV_SPLITS

splits = data.randomSplit([SPLIT_SIZE]*CV_SPLITS, seed=SEED)

lr = LinearRegression(featuresCol="features", labelCol="review_stars", elasticNetParam=0.8)

mses = []
r2s = []
statss = []
for i in range(CV_SPLITS):
    print(f"{i+1} of {CV_SPLITS}")
    test = splits.pop(0)
    train = reduce(lambda df1, df2: df1.union(df2), splits)
    splits.append(test)

    model = lr.fit(train)
    stats = model.evaluate(test)
    r2s.append(stats.r2adj)
    mses.append(stats.meanSquaredError)
    statss.append(stats)

mmse = np.mean(mses)
mr2 = np.mean(r2s)
    
print(f"Adj R2: {mr2}")
print(f"MSE: {mmse}")

1 of 5
2 of 5
3 of 5
4 of 5
5 of 5
R2: 0.09394102445346586
MSE: 1.9811502075490552


In [12]:
test_stats.predictions.show()

NameError: name 'test_stats' is not defined