# Projeto Spark
## Alunos: André Rocco, Beatriz Muniz, Marcelo Miguel

In [1]:
%pyspark
rdd = sc.textFile('s3://megadados-alunos/dados/all_reviews_clean_tsv/').cache()

In [2]:
%pyspark
type(rdd)

## Tarefa 1


### Quantos reviews tem?

In [5]:
%pyspark

print(f"há {rdd.count()} reviews nesse csv")

### Quantos clientes existem

In [7]:
%pyspark
df = spark.read.option("header", "false") \
    .option("delimiter", "\t") \
    .option("inferSchema", "true") \
    .csv("s3://megadados-alunos/dados/all_reviews_clean_tsv/")
    
df = df \
    .withColumnRenamed("_c0", "marketplace") \
    .withColumnRenamed("_c1", "customer_id") \
    .withColumnRenamed("_c2", "review_id") \
    .withColumnRenamed("_c3", "product_id") \
    .withColumnRenamed("_c4", "product_parent") \
    .withColumnRenamed("_c5", "product_title") \
    .withColumnRenamed("_c6", "product_category") \
    .withColumnRenamed("_c7", "star_rating") \
    .withColumnRenamed("_c8", "helpful_votes") \
    .withColumnRenamed("_c9", "total_votes") \
    .withColumnRenamed("_c10", "vine") \
    .withColumnRenamed("_c11", "verified_purchase") \
    .withColumnRenamed("_c12", "review_headline") \
    .withColumnRenamed("_c13", "review_body") \
    .withColumnRenamed("_c14", "review_date")


In [8]:
%pyspark
df.count()

In [9]:
%pyspark
df[['customer_id']].distinct().count()

### Quantos produtos existem

In [11]:
%pyspark
df[['product_id']].distinct().count()

### Quantos reviews existem para cada “star_rating” (de 1 a 5 estrelas)?

In [13]:
%pyspark
df_star_rating_clean = df.where((df['star_rating'] =='1') | (df['star_rating'] =='2') | (df['star_rating'] =='3')| (df['star_rating'] =='4')| (df['star_rating'] =='5'))

df_star_rating_clean.groupBy("star_rating").count().show()

## Tarefa 2

## Tarefa 3

In [16]:
%pyspark
from pyspark.sql.functions import when
df_star_rating_clean = df_star_rating_clean.na.drop()


In [17]:
%pyspark
from pyspark.sql.functions import when
df_star_rating_clean= df_star_rating_clean.withColumn("type_review", when(df_star_rating_clean["star_rating"] == "5", "positiva")\
                    .when(df_star_rating_clean["star_rating"] == "4","neutra").otherwise("negativa"))
                    

In [18]:
%pyspark
df_star_rating_clean.groupBy("type_review").count().show()

In [19]:
%pyspark
# Import the required packages
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import NaiveBayes


In [20]:
%pyspark
# ===================================================================
# RegexTokenizer, CountVectorizer, StringIndexer e VectorAssembler
# ===================================================================

stages = []
# 1. clean data and tokenize sentences using RegexTokenizer
regexTokenizer = RegexTokenizer(inputCol="review_body", outputCol="tokens", pattern="\\W+")
stages += [regexTokenizer]

# 2. CountVectorize the data
cv = CountVectorizer(inputCol="tokens", outputCol="token_features", minDF=2.0)#, vocabSize=3, minDF=2.0
stages += [cv]

# 3. Convert the labels to numerical values using binariser
indexer = StringIndexer(inputCol="type_review", outputCol="label")
stages += [indexer]

# 4. Vectorise features using vectorassembler
vecAssembler = VectorAssembler(inputCols=['token_features'], outputCol="features")
stages += [vecAssembler]
[print('\n', stage) for stage in stages]

In [21]:
%pyspark
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=stages)
data = pipeline.fit(df_star_rating_clean).transform(df_star_rating_clean)


In [22]:
%pyspark
train, test = data.randomSplit([0.7, 0.3], seed = 2018)


In [23]:
%pyspark
from pyspark.ml.classification import NaiveBayes
# Initialise the model
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
# Fit the model
model = nb.fit(train)
# Make predictions on test data
predictions = model.transform(test)
predictions.select("label", "prediction", "probability").show()

In [24]:
%pyspark
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print ("Model Accuracy: ", accuracy)

In [25]:
%pyspark
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid and Evaluator for Cross Validation
paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0, 1.5, 2.0]).build()
cvEvaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")

# Run Cross-validation
cv = CrossValidator(estimator=nb, estimatorParamMaps=paramGrid, evaluator=cvEvaluator)
cvModel = cv.fit(train)

# Make predictions on testData. cvModel uses the bestModel.
cvPredictions = cvModel.transform(test)

# Evaluate bestModel found from Cross Validation
evaluator.evaluate(cvPredictions)