In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

session = SparkSession(sc)
session.conf.set("spark.sql.shuffle.partitions", 5)
sample_df = session.read.format("csv")\
    .option("header", "true")\
    .option("mode", "DROPMALFORMED")\
    .option("inferSchema", "true")\
    .option("sep", "\t")\
    .load("/FileStore/tables/*.tsv")
print(sample_df.count())


In [2]:
max_help = sample_df.groupby().max('helpful_votes').first().asDict()['max(helpful_votes)']
max_sent = sample_df.groupby().max('sentiment_rating').first().asDict()['max(sentiment_rating)']
max_sub = sample_df.groupby().max('subjectivity_rating').first().asDict()['max(subjectivity_rating)']
print("Max help = %d Max sent = %d Max sub = %d" % (max_help,max_sent,max_sub))


In [3]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.sql.functions import col, countDistinct
va = VectorAssembler(inputCols = ["helpful_votes","sentiment_rating", "subjectivity_rating","word_count","sentence_count","adjective_count","adverb_count", "noun_count","pronoun_count","verb_count"], outputCol = "features")
new_df = va.transform(sample_df)
kmeans = KMeans(k=3, seed=1)
model = kmeans.fit(new_df.select('features'))
clustered = model.transform(new_df)
# clustered.show()
# clustered.agg(countDistinct(col("prediction")).alias("count")).show()

In [4]:
cl_1 = clustered.filter(clustered.prediction==0)
cl_1_stats = cl_1.select("helpful_votes", "sentiment_rating", "subjectivity_rating","word_count","sentence_count","adjective_count","adverb_count", "noun_count","pronoun_count","verb_count","prediction")
cl_1_stats.describe().show()

cl_2 = clustered.filter(clustered.prediction==1)
cl_2_stats = cl_2.select("helpful_votes", "sentiment_rating", "subjectivity_rating","word_count","sentence_count","adjective_count","adverb_count", "noun_count","pronoun_count","verb_count","prediction")
cl_2_stats.describe().show()

cl_3 = clustered.filter(clustered.prediction==2)
cl_3_stats = cl_3.select("helpful_votes", "sentiment_rating", "subjectivity_rating","word_count","sentence_count","adjective_count","adverb_count", "noun_count","pronoun_count","verb_count","prediction")
cl_3_stats.describe().show()

# cl_4 = clustered.filter(clustered.prediction==3)
# cl_4_stats = cl_4.select("helpful_votes", "sentiment_rating", "subjectivity_rating","word_count","sentence_count","adjective_count","adverb_count", "noun_count","pronoun_count","verb_count","prediction")
# cl_4_stats.describe().show()

# cl_5 = clustered.filter(clustered.prediction==4)
# cl_5_stats = cl_5.select("helpful_votes", "sentiment_rating", "subjectivity_rating","word_count","sentence_count","adjective_count","adverb_count", "noun_count","pronoun_count","verb_count","prediction")
# cl_5_stats.describe().show()