In [0]:
%pyspark
rdd = sc.textFile('s3://megadados-alunos/dados/all_reviews_clean_tsv/')

In [1]:
%pyspark

column_names = ["marketplace","customer_id", "review_id","product_id","product_parent","product_title","product_category","star_rating","helpful_votes","total_votes","vine","verified_purchase","review_headline","review_body","review_date"]

df = spark.read.option("header", "false").option("delimiter", "\t").csv("s3://megadados-alunos/dados/all_reviews_clean_tsv/")
df = df \
    .withColumnRenamed("_c0", column_names[0])\
    .withColumnRenamed("_c1", column_names[1])\
    .withColumnRenamed("_c2", column_names[2])\
    .withColumnRenamed("_c3", column_names[3])\
    .withColumnRenamed("_c4", column_names[4])\
    .withColumnRenamed("_c5", column_names[5])\
    .withColumnRenamed("_c6", column_names[6])\
    .withColumnRenamed("_c7", column_names[7])\
    .withColumnRenamed("_c8", column_names[8])\
    .withColumnRenamed("_c9", column_names[9])\
    .withColumnRenamed("_c10", column_names[10])\
    .withColumnRenamed("_c11", column_names[11])\
    .withColumnRenamed("_c12", column_names[12])\
    .withColumnRenamed("_c13", column_names[13])\
    .withColumnRenamed("_c14", column_names[14])

# Tarefa 1:


In [3]:
%pyspark
count = rdd.count()
print("Tarefa 1: Quantos reviews existem? ---> {0} reviews".format(count))

In [4]:
%pyspark
clientes_existentes = df[["customer_id"]].distinct().count()
print("Tarefa 1: Quantos clientes existem? ---> {0} clientes".format(clientes_existentes))

In [5]:
%pyspark
produtos = df[["product_id"]].distinct().count()
print("Tarefa 1: Quantos produtos existem? ---> {0} produtos".format(produtos))

In [6]:
%pyspark
rating = df["star_rating"]
print("Tarefa 1: Quantos reviews existem para cada star rating?:")
df.where((rating == '1') | (rating == '2') | (rating == '3') | (rating == '4') | (rating == '5')).groupBy("star_rating").count().show()

# Tarefa 2:
######## Além do conteúdo das aulas, utilizamos a seguinte referência para aprofundar os quesitos relevantes na caracterização de bots: https://finance.yahoo.com/news/rise-fake-amazon-reviews-spot-175430368.html, e vimos que fazer várias reviews no mesmo dia é algo característico de bots.

In [8]:
%pyspark

repeat_date_reviews = df.groupBy("customer_id", "product_title", "product_category", "star_rating", "review_date").count()
rdr_ordered= repeat_date_reviews.orderBy(["count"], ascending=False)
rdr_filtered= rdr_ordered.filter(((rdr_ordered["count"]) >= 2) )
rdr_filtered_ordered= rdr_filtered.orderBy(["count"], ascending=False)

In [9]:
%pyspark
count=rdr_filtered_ordered[["customer_id"]].distinct().count()
print("Número de bots: {}".format(count))
print("Porcentagem de bots: {}%".format((count/clientes_existentes)*100))

In [10]:
%pyspark
rating = rdr_filtered_ordered["star_rating"]
rdr_filtered_ordered.where((rating == '1') | (rating == '2') | (rating == '3') | (rating == '4') | (rating == '5')).groupBy("star_rating").count().show()

In [11]:
%pyspark

rdr_filtered_ordered.groupBy("product_category").count().orderBy(["count"], ascending=False).show()

# Tarefa 3:
######## Baseado no exemplo disponível em: https://ai.plainenglish.io/build-naive-bayes-spam-classifier-on-pyspark-58aa3352e244

In [13]:
%pyspark
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline
from pyspark.sql.functions import when
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [14]:
%pyspark
naive_bayes = df.select("star_rating","review_body")
df_class =  naive_bayes.withColumn("nb", when(naive_bayes["star_rating"] == "1", "negativo").when(naive_bayes["star_rating"] == "2", "negativo").when(naive_bayes["star_rating"] == "3", "negativo").when(naive_bayes["star_rating"] == "4", "neutro").when(naive_bayes["star_rating"] == "5", "positivo"))
df_class.show()


In [15]:
%pyspark
naive_bayes_final = df_class.select("review_body","nb")
naive_bayes_final=naive_bayes_final.na.drop()
naive_bayes_final.show()

In [16]:
%pyspark
#auxilio do exemplo

stages = []

regexTokenizer = RegexTokenizer(inputCol="review_body", outputCol="tokens", pattern="\\W+")
stages += [regexTokenizer]


cv = CountVectorizer(inputCol="tokens", outputCol="token_features", minDF=2.0)#, vocabSize=3, minDF=2.0
stages += [cv]


indexer = StringIndexer(inputCol="nb", outputCol="label")
stages += [indexer]


vecAssembler = VectorAssembler(inputCols=['token_features'], outputCol="features")
stages += [vecAssembler]

[print('\n', stage) for stage in stages]


In [17]:
%pyspark

pipeline = Pipeline(stages=stages)
data = pipeline.fit(naive_bayes_final).transform(naive_bayes_final)

In [18]:
%pyspark

train, test = data.randomSplit([0.7, 0.3], 2018)


In [19]:
%pyspark

# Initialise the model
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
# Fit the model
model = nb.fit(train)
# Make predictions on test data
predictions = model.transform(test)
predictions.select("label", "prediction", "probability").show()

In [20]:
%pyspark

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print ("Model Accuracy: ", accuracy)

In [21]:
%pyspark


paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0, 1.5, 2.0]).build()
cvEvaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")


cv = CrossValidator(estimator=nb, estimatorParamMaps=paramGrid, evaluator=cvEvaluator)
cvModel = cv.fit(train)


cvPredictions = cvModel.transform(test)


evaluator.evaluate(cvPredictions)