In [1]:
import findspark
findspark.init("/usr/local/spark")

from pyspark.sql import SparkSession 
import pyspark.sql.functions as F 
from pyspark.ml import Pipeline
from pyspark.sql.types import *

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                
from delta import *

# Import Spark NLP
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline


builder = SparkSession.builder.appName("Sentiment Analysis - presentation") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:4.4.0")\
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
    .master("spark://namenode:7077")\
    .config("spark.executor.cores", "4")\
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    #.config("spark.executor.instances", "1")\
    

    
spark = configure_spark_with_delta_pip(builder).getOrCreate()


:: loading settings :: url = jar:file:/usr/local/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-d34bd907-b8d2-42b6-aa22-4ef4fcf0354a;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.2.0 in central
	found io.delta#delta-storage;2.2.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
:: resolution report :: resolve 570ms :: artifacts dl 37ms
	:: modules in use:
	io.delta#delta-core_2.12;2.2.0 from central in [default]
	io.delta#delta-storage;2.2.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   

In [2]:
# Prepare Dataframes
# Review data
spark.sparkContext.setJobDescription('load review dataset')
review_schema = StructType([StructField("review_id", StringType(), False),
      StructField("user_id", StringType(), False),
      StructField("business_id", StringType(), False),
      StructField("stars", StringType(), False), 
      StructField("useful", IntegerType(), False),
      StructField("funny", IntegerType(), False),
      StructField("cool", IntegerType(), False),
      StructField("text", StringType(), False),
      StructField("date", StringType(), False),])
# review_df = spark.read.csv("hdfs://namenode:9000/project_data/review.csv", sep = '|', header = False, schema = review_schema)
review_df = spark.read.csv("hdfs://namenode:9000/project_data/review_small_b.csv", sep = '|', header = False, schema = review_schema)
review_df.createOrReplaceTempView("reviews")
review_df.persist()

#review_df.show()

# Business data
spark.sparkContext.setJobDescription('load review dataset')
business_schema = StructType([
    StructField("business_id", StringType(), False),
    StructField("name", StringType(), True),
    StructField("address", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("postal_code", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("stars", DoubleType(), True),
    StructField("review_count", IntegerType(), True),
    StructField("is_open", IntegerType(), True),
    StructField("attributes", StringType(), True),
    StructField("categories", StringType(), True),
    StructField("hours", StringType(), True)
])
business_df = spark.read.csv("hdfs://namenode:9000/project_data/business.csv", sep = '|', header = False, schema = business_schema)
business_df.createOrReplaceTempView("business")
business_df.persist()

DataFrame[business_id: string, name: string, address: string, city: string, state: string, postal_code: string, latitude: double, longitude: double, stars: double, review_count: int, is_open: int, attributes: string, categories: string, hours: string]

## Test optimisation
### Combine and Filter

In [5]:
# Combine data on common restaurant id
spark.sparkContext.setJobGroup("combine datasets", "combine datasets")
reviews_with_category = spark.sql("SELECT r.review_id, b.business_id, r.text, r.date, b.categories, r.stars FROM reviews AS r LEFT JOIN business AS b ON b.business_id = r.business_id ")

# Filter for only restaurant reviews
spark.sparkContext.setJobGroup("filter reviews", "filter reviews")
restaurant_reviews = reviews_with_category.where(F.col('categories').contains("Restaurants"))
restaurant_reviews.show()

+--------------------+--------------------+--------------------+-------------------+--------------------+-----+
|           review_id|         business_id|                text|               date|          categories|stars|
+--------------------+--------------------+--------------------+-------------------+--------------------+-----+
|ApYU3mkJPVqpsrDd5...|LdECsE8lJS7v5GTFT...|We are here.and l...|2018-09-12 00:09:57|Seafood, Restaurants|  5.0|
|MN7wg1rpDJTKTkg1j...|2pLIQ0RHSmUbF0eIR...|I've wanted to vi...|2011-06-16 14:59:40|Wine Bars, Bars, ...|  4.0|
|gGepmRT-EenDRFiGf...|deWhFgwgU3F11jYMJ...|Okay for what it ...|2017-08-24 23:13:27|Seafood, Salad, R...|  2.0|
|z5i8OtKd4L4Kuyvu9...|PP3BBaVxZLcJU54uP...|5stars.          ...|2015-08-16 03:23:35|Italian, American...|  5.0|
|-KOxrHhTSayEk01XH...|CrP6JWXBmf_HyMnZJ...|Good place to eat...|2013-02-20 17:34:12|Restaurants, Burg...|  4.0|
|4okNZR_Z3whwioH3i...|BxfvdHqETU8jWYUjx...|I love shake shac...|2012-08-05 12:19:57|Food, Hot Dogs, B...

### Filter and Combine
We expect this to be faster since the restaurant dataframe is first made smaller..

In [3]:
# Filter for only restaurant reviews
spark.sparkContext.setJobDescription('filter reviews')
business_df = business_df.where(F.col('categories').contains("Restaurants"))


# Combine data on common restaurant id
spark.sparkContext.setJobDescription('combine datasets')
restaurant_reviews = spark.sql("SELECT r.review_id, b.business_id, r.text, r.date, b.categories, r.stars FROM reviews AS r LEFT JOIN business AS b ON b.business_id = r.business_id ")

restaurant_reviews.show()



+--------------------+--------------------+--------------------+-------------------+--------------------+-----+
|           review_id|         business_id|                text|               date|          categories|stars|
+--------------------+--------------------+--------------------+-------------------+--------------------+-----+
|z5i8OtKd4L4Kuyvu9...|PP3BBaVxZLcJU54uP...|5stars.          ...|2015-08-16 03:23:35|Italian, American...|  5.0|
|kXbVRmYjaUIBI8jLs...|iHdrLu8deF5GceB0V...|"My new favorite ...|2017-12-23 02:58:08|American (Traditi...|  5.0|
|E0m3ni49ruwkccjZe...|pN3O2ZLRiSLPyCWCe...|"America used to ...|2010-09-27 04:29:02|Restaurants, Amer...|  1.0|
|DhxtQTbPgjg5DIGDX...|pym7c6ZFEtmoH16xN...|Awesome service a...|2011-12-14 21:53:19|Restaurants, Bars...|  5.0|
|4okNZR_Z3whwioH3i...|BxfvdHqETU8jWYUjx...|I love shake shac...|2012-08-05 12:19:57|Food, Hot Dogs, B...|  4.0|
|wcNjtKfLSubxPp8kc...|MZ9ucV1npggMOYY9f...|I recently purcha...|2019-09-01 03:55:28|Home Services, Co...

                                                                                

### MODEL and Training

In [None]:
spark.sparkContext.setJobDescription('pipeline builder')
# Build a pipeline
# Document Assembler
documentAssembler = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("document")

spark.sparkContext.setJobDescription('pipeline -tokenizer')
tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")#\
    #.fit(restaurant_reviews) # apperantly works without this

spark.sparkContext.setJobDescription('pipeline -normalizer')
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normal")

spark.sparkContext.setJobDescription('pipeline -model')
vivekn = ViveknSentimentModel.pretrained() \
    .setInputCols(["document", "normal"]) \
    .setOutputCol("result_sentiment")

spark.sparkContext.setJobDescription('pipeline -model=finish')
finisher = Finisher() \
    .setInputCols(["result_sentiment"]) \
    .setOutputCols("final_sentiment")

spark.sparkContext.setJobDescription('pipeline -pipeline executor')
pipeline = Pipeline().setStages([documentAssembler, tokenizer, normalizer, vivekn, finisher])

spark.sparkContext.setJobDescription('pipeline -pipeline fit')
pipelineModel = pipeline.fit(restaurant_reviews)

spark.sparkContext.setJobDescription('pipeline -pipeline transform')
# Calculate sentiment for all restaurant reviews
result = pipelineModel.transform(restaurant_reviews)

spark.sparkContext.setJobDescription('pipeline -result check')
# Check accuracy
result = result.withColumn("right_prediction", 
                   F.when(((F.array_contains(F.col("final_sentiment"),"positive")) & (F.col("stars").isin(["5.0", "4.0", "3.0"]))) |
                        ((F.array_contains(F.col("final_sentiment"),"negative")) & (F.col("stars").isin(["3.0", "2.0", "1.0"]))), 
                        1).otherwise(0))

### finding Accuracy without persist 

In [None]:
spark.sparkContext.setJobDescription('count right ones')
count_ones = result.agg(F.sum("right_prediction")).collect()[0][0]

total_count = result.count()

print(f"Prediction accuracy for sentiment: {count_ones/total_count}")
result.write.format("delta").mode("overwrite").save("/temp/sentiment_predicted_restaurant_reviews")


### finding accuracy with persist()

In [None]:
result.persist()

spark.sparkContext.setJobDescription('count right ones')
#### TO-DO --- try to optomise below line.
count_ones = result.agg(F.sum("right_prediction")).collect()[0][0]
result.write.format("delta").mode("overwrite").save("/temp/sentiment_predicted_restaurant_reviews")


### upsert 


In [None]:
# delta_table_path = "/temp/sentiment_predicted_restaurant_reviews"
delta_table_path = "/temp/upsert_presentation"
# Check if the Delta table exists
if DeltaTable.isDeltaTable(spark, delta_table_path):
    print("Updating delta")
    # If the Delta table exists, load it into a DataFrame
    deltaTableAnalyzedReviews = DeltaTable.forPath(spark, delta_table_path)
    deltaTableAnalyzedReviews.alias('old') \
        .merge(
        result.alias('updates'),
        'old.review_id = updates.review_id'
  ) \
  .whenMatchedUpdate(set =
    {
      "review_id": "updates.review_id",
      "business_id": "updates.business_id",
      "text": "updates.text",
      "date": "updates.date",
      "categories": "updates.categories",
      "final_sentiment": "updates.final_sentiment",
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "review_id": "updates.review_id",
      "business_id": "updates.business_id",
      "text": "updates.text",
      "date": "updates.date",
      "categories": "updates.categories",
      "final_sentiment": "updates.final_sentiment",
    }
  ) \
  .execute()
else:
    # If the Delta table does not exist, create it
    print("Creating delta")
    deltaTableAnalyzedReviews = result.write.format('delta').mode('overwrite').save(delta_table_path)
    deltaTableAnalyzedReviews = DeltaTable.forPath(spark, delta_table_path)
print("Finished!")