# Projeto 2 - Megadados

### Eiki Luis Yamashiro | João Guilherme Almeida | William Silva

In [1]:
%pyspark
rdd = sc.textFile('s3://megadados-alunos/dados/all_reviews_clean_tsv/').cache()

In [2]:
%pyspark
df = spark.read.option("header", "false").option("delimiter", "\t").csv("s3://megadados-alunos/dados/all_reviews_clean_tsv/")

In [3]:
%pyspark
df = df \
    .withColumnRenamed("_c0", "marketplace") \
    .withColumnRenamed("_c1", "customer_id") \
    .withColumnRenamed("_c2", "review_id") \
    .withColumnRenamed("_c3", "product_id") \
    .withColumnRenamed("_c4", "product_parent") \
    .withColumnRenamed("_c5", "product_title") \
    .withColumnRenamed("_c6", "product_category") \
    .withColumnRenamed("_c7", "star_rating") \
    .withColumnRenamed("_c8", "helpful_votes") \
    .withColumnRenamed("_c9", "total_votes") \
    .withColumnRenamed("_c10", "vine") \
    .withColumnRenamed("_c11", "verified_purchase") \
    .withColumnRenamed("_c12", "review_headline") \
    .withColumnRenamed("_c13", "review_body") \
    .withColumnRenamed("_c14", "review_date")


In [4]:
%pyspark
df.show()

Métricas do Nayve-Bayes: 
- 5 estrelas são positivas (Positivas = 2)
- 4 estrelas são neutras (Neutras = 1)
- 3 ou - são negativas (Negativas = 0)

review_body | métrica do NB

In [6]:
%pyspark
from pyspark.sql.functions import when
nb_df = df.select("review_body", "star_rating")
stars = df_renamed.where((df_renamed["star_rating"] == '1') | (df_renamed["star_rating"] == '2') | (df_renamed["star_rating"] == '3') | (df_renamed["star_rating"] == '4') | (df_renamed["star_rating"] == '5'))
nb_df = stars.select("review_body", "star_rating")
df = nb_df.withColumn("metric", \
    when(nb_df.star_rating == "5", "Positive") \
        .when(nb_df.star_rating == "4", "Neutro") \
        .otherwise("Negativo") \
    )
    
df = df.na.drop()

df.show()

In [7]:

%pyspark
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import NaiveBayes

# ===================================================================
# RegexTokenizer, CountVectorizer, StringIndexer e VectorAssembler
# ===================================================================

stages = []
# 1. clean data and tokenize sentences using RegexTokenizer
regexTokenizer = RegexTokenizer(inputCol="review_body", outputCol="tokens", pattern="\\W+")
stages += [regexTokenizer]

# 2. CountVectorize the data
cv = CountVectorizer(inputCol="tokens", outputCol="token_features", minDF=2.0)#, vocabSize=3, minDF=2.0
stages += [cv]

# 3. Convert the labels to numerical values using binariser
indexer = StringIndexer(inputCol="metric", outputCol="label")
stages += [indexer]

# 4. Vectorise features using vectorassembler
vecAssembler = VectorAssembler(inputCols=['token_features'], outputCol="features")
stages += [vecAssembler]

In [8]:
%pyspark
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=stages)

In [9]:
%pyspark
data = pipeline.fit(df).transform(df)

In [10]:
%pyspark
train, test = data.randomSplit([0.7, 0.3], seed = 42)

In [11]:
%pyspark
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

In [12]:
%pyspark
model = nb.fit(train)

In [13]:
%pyspark
predictions = model.transform(test)

In [14]:
%pyspark
predictions.select("label", "prediction", "probability").show()

In [15]:
%pyspark
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print ("Model Accuracy: ", accuracy)

In [16]:
%pyspark
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0, 1.5, 2.0]).build()
cvEvaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")

cv = CrossValidator(estimator=nb, estimatorParamMaps=paramGrid, evaluator=cvEvaluator)
cvModel = cv.fit(train)

cvPredictions = cvModel.transform(test)

evaluator.evaluate(cvPredictions)