In [104]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [105]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Conn                                                                               Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Connecting to security.u                                                                               Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:5 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:7 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:8 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates I

In [106]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PySpark_NLP").getOrCreate()

In [107]:
path="/content/drive/My Drive/Colab Notebooks/Resort_reviews_data_DRive"

In [108]:
df = spark.read.csv("/content/drive/My Drive/Colab Notebooks/Resort_reviews_data_DRive/resort_rating_up.csv", header=True, inferSchema=True)

In [109]:
df.show(5)

+-----------+--------------------+-----------+--------+-----------------------------+--------------------+
|Review Site|            Ski Area|Review Date|   class|Review Star Rating (out of 5)|         Review Text|
+-----------+--------------------+-----------+--------+-----------------------------+--------------------+
|Tripadvisor|Whitefish Mountai...|     17-Jan|positive|                            5|I love Big Mounta...|
|Tripadvisor|Whitefish Mountai...|     16-Dec|positive|                            5|We have come to W...|
|Tripadvisor|Whitefish Mountai...|     16-Dec|positive|                            4|We took our famil...|
|Tripadvisor|Whitefish Mountai...|     16-Dec|positive|                            4|We skied two days...|
|Tripadvisor|Whitefish Mountai...|     16-Dec|positive|                            4|Very good skiing....|
+-----------+--------------------+-----------+--------+-----------------------------+--------------------+
only showing top 5 rows



In [110]:
reviews_df = df[['class','Review Text']]

In [111]:
reviews_df.show()

+--------+--------------------+
|   class|         Review Text|
+--------+--------------------+
|positive|I love Big Mounta...|
|positive|We have come to W...|
|positive|We took our famil...|
|positive|We skied two days...|
|positive|Very good skiing....|
|negative|Champagne pow can...|
|positive|We were here the ...|
|positive|Amazing ski resor...|
|negative|We skied a couple...|
|positive|Big Mountain aka ...|
|negative|The breadth of sk...|
|negative|Our family loves ...|
|negative|Trying to get a b...|
|positive|Big Mountain ... ...|
|positive|Had another great...|
|negative|I was not as impr...|
|negative|Visited in June. ...|
|negative|the big mountain ...|
|positive|Many years ago I ...|
|negative|My hubby and I dr...|
+--------+--------------------+
only showing top 20 rows



In [112]:
# Import functions
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer

In [113]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature
data_df = reviews_df.withColumn('length', length(reviews_df['Review Text']))
data_df.show()

+--------+--------------------+------+
|   class|         Review Text|length|
+--------+--------------------+------+
|positive|I love Big Mounta...|   242|
|positive|We have come to W...|   279|
|positive|We took our famil...|   280|
|positive|We skied two days...|   230|
|positive|Very good skiing....|   118|
|negative|Champagne pow can...|   110|
|positive|We were here the ...|   279|
|positive|Amazing ski resor...|   215|
|negative|We skied a couple...|   277|
|positive|Big Mountain aka ...|   276|
|negative|The breadth of sk...|   279|
|negative|Our family loves ...|   223|
|negative|Trying to get a b...|   282|
|positive|Big Mountain ... ...|   226|
|positive|Had another great...|   148|
|negative|I was not as impr...|   279|
|negative|Visited in June. ...|   277|
|negative|the big mountain ...|   276|
|positive|Many years ago I ...|   275|
|negative|My hubby and I dr...|   191|
+--------+--------------------+------+
only showing top 20 rows



In [114]:
# Create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='class',outputCol='label')
tokenizer = Tokenizer(inputCol="Review Text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashing = HashingTF(inputCol="stop_tokens", outputCol='hash_token', numFeatures=pow(2,18))
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [115]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [116]:
# Create and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [118]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)

In [119]:
cleaned.show()

+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|   class|         Review Text|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|
+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|positive|I love Big Mounta...|   242|  0.0|[i, love, big, mo...|[love, big, mount...|(262144,[535,2182...|(262144,[535,2182...|(262145,[535,2182...|
|positive|We have come to W...|   279|  0.0|[we, have, come, ...|[come, whitefish,...|(262144,[535,1240...|(262144,[535,1240...|(262145,[535,1240...|
|positive|We took our famil...|   280|  0.0|[we, took, our, f...|[took, family, sk...|(262144,[329,2545...|(262144,[329,2545...|(262145,[329,2545...|
|positive|We skied two days...|   230|  0.0|[we, skied, two, ...|[skied, two, days...|(262144,[15775

In [120]:
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3], 21)

In [121]:
from pyspark.ml.classification import NaiveBayes
# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [122]:
#transform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|   class|         Review Text|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|negative|"If you're going ...|   211|  1.0|["if, you're, goi...|["if, going, comp...|(262144,[12336,14...|(262144,[12336,14...|(262145,[12336,14...|[-1212.5704605873...|[1.0,7.9227746967...|       0.0|
|negative|"So this part of ...|   203|  1.0|["so, this, part,...|["so, part, count...|(262144,[8804,105...|(262144,[8804,105...|(262145,[8804,105...|[-1287.6725062614...|[1.0,1.7990717646.

In [123]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

acc_eval = BinaryClassificationEvaluator(labelCol ='label', rawPredictionCol ="prediction")
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)


Accuracy of model at predicting reviews was: 0.500000
