In [17]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.functions import col
#installing python libraries
sc.install_pypi_package("textblob")
from textblob import TextBlob
from pyspark.sql import functions as F
from math import *
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.clustering import GaussianMixture
from pyspark.ml.evaluation import ClusteringEvaluator


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting textblob
  Downloading https://files.pythonhosted.org/packages/60/f0/1d9bfcc8ee6b83472ec571406bd0dd51c0e6330ff1a51b2d29861d389e85/textblob-0.15.3-py2.py3-none-any.whl (636kB)
Installing collected packages: textblob
Successfully installed textblob-0.15.3

In [18]:
conf = SparkConf().setAppName("classification").setMaster("spark://master:7077")
sc = SparkContext.getOrCreate()



spark = SparkSession.builder.config( conf = conf ).appName("classification").getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
from pyspark.sql.types import StructField,StructType,StringType,FloatType,IntegerType,DoubleType,BooleanType

#defining the schema to help read the file
DataSchema = StructType([
    StructField("ID",IntegerType(), True),
    StructField("business_id", StringType(), True), 
    StructField("Categories", StringType(), True),
    StructField("name", StringType(), True),
    StructField("review_count", IntegerType(), True),
    StructField("stars", DoubleType(), True),
    StructField("cool", IntegerType(), True),
    StructField("date", StringType(), True),
    StructField("funny", IntegerType(), True),
    StructField("review_id", StringType(), True),
    StructField("review_stars", DoubleType(), True),
    StructField("text", StringType(), True),
    StructField("useful", IntegerType(), True),
    StructField("user_id", StringType(), True),
])

#reading the csv file got after preprocessing
idframe = spark.read.format('csv').option("header","true").schema(DataSchema).csv("s3://bigdaaassignmentbucket/Reviews.csv")

# final dropping of null values if any 
udframe = idframe.dropna()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
idframe.show(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+--------------------+--------------------+--------------------+------------+-----+----+-------------------+-----+--------------------+------------+--------------------+------+--------------------+
| ID|         business_id|          Categories|                name|review_count|stars|cool|               date|funny|           review_id|review_stars|                text|useful|             user_id|
+---+--------------------+--------------------+--------------------+------------+-----+----+-------------------+-----+--------------------+------------+--------------------+------+--------------------+
|  0|-Gh9a15ijNii-8rnp...|Auto Repair, Auto...|Volvo of Mississauga|           3|  3.5|   0|2017-06-15 21:25:56|    0|0MypdQ9_BH_-hsmUI...|         4.0|I've been there o...|     0|a05f7O5_8WZxiVKQF...|
+---+--------------------+--------------------+--------------------+------------+-----+----+-------------------+-----+--------------------+------------+--------------------+------+------------

In [21]:
# To construct a sepcial Feature Id using user_id, review_date, business_id
def FeatureId (user_id, review_date, business_id):
    return user_id + "-" + review_date + "-" + business_id

# To combine "Useful", "Funny" and "Cool" fields into one column
def useful_review_ct(useful, funny, cool):
    return useful + funny + cool;

# To calculate the sentiment score from data
def get_Sentiment(data):
    sentiment_data = TextBlob(data).sentiment.polarity
    return sentiment_data

# To compute the difference between individual value and average value
def compute_mean_dist(meanUnit,Unit):
     return abs(meanUnit - Unit)

# To define and return the GMM model
def gmmModel(k, dframe):
    gmm = GaussianMixture().setK(k).setFeaturesCol("scaledFeatures").setMaxIter(100)
    model = gmm.fit(dframe)
    return model
    
# To check the probability and labele it as either spam/ham
def probDist_scan(probability):
    return probability.toArray().tolist()[0] >= 0.90

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
# To generate the feature ID from the method
FeatureId_udframe = udf(FeatureId, StringType())
featureId_dframe = udframe.withColumn("feature_id",FeatureId_udframe(udframe["user_id"],udframe["date"],udframe["business_id"]))
useful_review_count_udf = udf(useful_review_ct, IntegerType())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
# To combine all votes into one variable
combined_dframe = featureId_dframe.withColumn("useful_review_ct",useful_review_count_udf(featureId_dframe['useful'],featureId_dframe["funny"],featureId_dframe["cool"]))
combined_dframe = combined_dframe.drop("useful","funny","cool");
combined_dframe.cache()
combined_dframe.show(1);

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+--------------------+--------------------+--------------------+------------+-----+-------------------+--------------------+------------+--------------------+--------------------+--------------------+----------------+
| ID|         business_id|          Categories|                name|review_count|stars|               date|           review_id|review_stars|                text|             user_id|          feature_id|useful_review_ct|
+---+--------------------+--------------------+--------------------+------------+-----+-------------------+--------------------+------------+--------------------+--------------------+--------------------+----------------+
|  0|-Gh9a15ijNii-8rnp...|Auto Repair, Auto...|Volvo of Mississauga|           3|  3.5|2017-06-15 21:25:56|0MypdQ9_BH_-hsmUI...|         4.0|I've been there o...|a05f7O5_8WZxiVKQF...|a05f7O5_8WZxiVKQF...|               0|
+---+--------------------+--------------------+--------------------+------------+-----+-------------------+-----

In [24]:
# To select the feature ID for reviewing text and evaluating sentiment score
review_Summary_dframe = combined_dframe.select("feature_id","text")
sentiment_score_udframe = udf(get_Sentiment, DoubleType())
combined_dframe = combined_dframe.dropna()
sentiment_score_dframe = combined_dframe.withColumn("sentiment_score",sentiment_score_udframe(combined_dframe["text"]))
sentiment_score_dframe.show(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+--------------------+--------------------+--------------------+------------+-----+-------------------+--------------------+------------+--------------------+--------------------+--------------------+----------------+-------------------+
| ID|         business_id|          Categories|                name|review_count|stars|               date|           review_id|review_stars|                text|             user_id|          feature_id|useful_review_ct|    sentiment_score|
+---+--------------------+--------------------+--------------------+------------+-----+-------------------+--------------------+------------+--------------------+--------------------+--------------------+----------------+-------------------+
|  0|-Gh9a15ijNii-8rnp...|Auto Repair, Auto...|Volvo of Mississauga|           3|  3.5|2017-06-15 21:25:56|0MypdQ9_BH_-hsmUI...|         4.0|I've been there o...|a05f7O5_8WZxiVKQF...|a05f7O5_8WZxiVKQF...|               0|0.20000000000000004|
+---+--------------------+------

In [25]:
# To find the average of the sentiment score and the average of the review star ratings
avg_reviewstars_dframe = sentiment_score_dframe.select("business_id","sentiment_score","review_stars")
avg_reviewstars_dframe = avg_reviewstars_dframe.groupBy("business_id").agg({'sentiment_score':'avg', 'review_stars':'avg'})
avg_reviewstars_dframe.cache()
avg_reviewstars_dframe.show(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+-----------------+
|         business_id|avg(sentiment_score)|avg(review_stars)|
+--------------------+--------------------+-----------------+
|-Gh9a15ijNii-8rnp...|0.033806818181818216|              2.5|
+--------------------+--------------------+-----------------+
only showing top 1 row

In [26]:
# To merge and drop the dataframes 
avg_reviewstars_dframe = avg_reviewstars_dframe.join(sentiment_score_dframe, "business_id")
avg_reviewstars_dframe = avg_reviewstars_dframe.drop("name","stars","review_count")
avg_reviewstars_dframe.show(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+-----------------+---+--------------------+-------------------+--------------------+------------+--------------------+--------------------+--------------------+----------------+-------------------+
|         business_id|avg(sentiment_score)|avg(review_stars)| ID|          Categories|               date|           review_id|review_stars|                text|             user_id|          feature_id|useful_review_ct|    sentiment_score|
+--------------------+--------------------+-----------------+---+--------------------+-------------------+--------------------+------------+--------------------+--------------------+--------------------+----------------+-------------------+
|-Gh9a15ijNii-8rnp...|0.033806818181818216|              2.5|  0|Auto Repair, Auto...|2017-06-15 21:25:56|0MypdQ9_BH_-hsmUI...|         4.0|I've been there o...|a05f7O5_8WZxiVKQF...|a05f7O5_8WZxiVKQF...|               0|0.20000000000000004|
+--------------------+--------------

In [27]:
mean_dframe = udf(compute_mean_dist, DoubleType())

# To compute distance between Average score and the individual score
dSentiment_dframe =  avg_reviewstars_dframe.withColumn("dist_sentiment",mean_dframe(avg_reviewstars_dframe["avg(sentiment_score)"],avg_reviewstars_dframe["sentiment_score"]))
dSentiment_dframe.show(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+-----------------+---+--------------------+-------------------+--------------------+------------+--------------------+--------------------+--------------------+----------------+-------------------+-------------------+
|         business_id|avg(sentiment_score)|avg(review_stars)| ID|          Categories|               date|           review_id|review_stars|                text|             user_id|          feature_id|useful_review_ct|    sentiment_score|     dist_sentiment|
+--------------------+--------------------+-----------------+---+--------------------+-------------------+--------------------+------------+--------------------+--------------------+--------------------+----------------+-------------------+-------------------+
|-Gh9a15ijNii-8rnp...|0.033806818181818216|              2.5|  0|Auto Repair, Auto...|2017-06-15 21:25:56|0MypdQ9_BH_-hsmUI...|         4.0|I've been there o...|a05f7O5_8WZxiVKQF...|a05f7O5_8WZxiVKQF...|              

In [28]:
# To compute distance between Average score and the individual score
dReview_stars_dframe =  dSentiment_dframe.withColumn("dist_review_stars",mean_dframe(dSentiment_dframe["avg(review_stars)"],dSentiment_dframe["review_stars"]))
dReview_stars_dframe.show(1)                                                
                                                               

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+-----------------+---+--------------------+-------------------+--------------------+------------+--------------------+--------------------+--------------------+----------------+-------------------+-------------------+-----------------+
|         business_id|avg(sentiment_score)|avg(review_stars)| ID|          Categories|               date|           review_id|review_stars|                text|             user_id|          feature_id|useful_review_ct|    sentiment_score|     dist_sentiment|dist_review_stars|
+--------------------+--------------------+-----------------+---+--------------------+-------------------+--------------------+------------+--------------------+--------------------+--------------------+----------------+-------------------+-------------------+-----------------+
|-Gh9a15ijNii-8rnp...|0.033806818181818216|              2.5|  0|Auto Repair, Auto...|2017-06-15 21:25:56|0MypdQ9_BH_-hsmUI...|         4.0|I've been there o...|a0

In [29]:
# To generate the features
features_assembler_dframe = VectorAssembler(
    inputCols=["dist_sentiment", "dist_review_stars", "useful_review_ct"],
    outputCol="features")

features_dframe= features_assembler_dframe.transform (dReview_stars_dframe)
features_dframe.show(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+-----------------+---+--------------------+-------------------+--------------------+------------+--------------------+--------------------+--------------------+----------------+-------------------+-------------------+-----------------+--------------------+
|         business_id|avg(sentiment_score)|avg(review_stars)| ID|          Categories|               date|           review_id|review_stars|                text|             user_id|          feature_id|useful_review_ct|    sentiment_score|     dist_sentiment|dist_review_stars|            features|
+--------------------+--------------------+-----------------+---+--------------------+-------------------+--------------------+------------+--------------------+--------------------+--------------------+----------------+-------------------+-------------------+-----------------+--------------------+
|-Gh9a15ijNii-8rnp...|0.033806818181818216|              2.5|  0|Auto Repair, Auto...|2017-06-15 21:

In [31]:
# To Standardise and compute statistics of the model using MaxAbsScalar
scaler_feat = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")
scaler_Model = scaler_feat.fit(features_dframe)
transformed_scaleData = scaler_Model.transform(features_dframe)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
# To create a dataframe which stores the output of computations
dataSchema = StructType([
    StructField("Silhouette_distance", DoubleType(), True),
    StructField("k", IntegerType(), True)        
])

k_silhouette_dframe = spark.createDataFrame(sc.emptyRDD(), dataSchema)
probability_distribution_scan_udf = udf(probDist_scan,  BooleanType())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
# To iterate the model on various k values for evaluation
k=2
distance_array = []
while (k <= 8) :
    gmm = gmmModel(k,transformed_scaleData)
    gmmTransform = gmm.transform(transformed_scaleData)
    cosEvaluator = ClusteringEvaluator().setDistanceMeasure ("cosine")
    silhouetteDist = cosEvaluator.evaluate(gmmTransform);
    tupleRow = spark.createDataFrame([{"k": k, "Silhouette_distance": silhouetteDist}])
    k_silhouette_dframe = k_silhouette_dframe.union(tupleRow)
    distance_array.append(silhouetteDist)
    review_dframe = gmmTransform.withColumn("isHam",probability_distribution_scan_udf(gmmTransform["probability"]))
    ureview_dframe = review_dframe.select("business_id","user_id","review_id","prediction","isHam")
    ureview_dframe.write.csv("s3://bigdaaassignmentbucket/out/final"+str(k))
    if k== 2:
        k_silhouette_dframe.show(1)
    
    k = k+1;

k_silhouette_dframe.write.csv("s3://bigdaaassignmentbucket/out/s_new_distance11")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+---+
|Silhouette_distance|  k|
+-------------------+---+
|0.11950472014657175|  2|
+-------------------+---+
only showing top 1 row

In [37]:
 k_silhouette_dframe.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---+
| Silhouette_distance|  k|
+--------------------+---+
| 0.11950472014657175|  2|
| 0.11950472014657175|  2|
| -0.0634518694278572|  3|
|-0.10135028043040387|  4|
|-0.15671248130914417|  5|
| -0.0795122407621355|  6|
|-0.05629476779391...|  7|
|-0.08737859156300563|  8|
+--------------------+---+