___

# Projeto 2 Spark Cluster - Megadados

**Grupo: Antonio Fuziy, Victor Vergara e André Tavernaro**

In [1]:
%pyspark
rdd = sc.textFile('s3://megadados-alunos/dados/all_reviews_clean_tsv/').cache()

___

## Tarefa 1

In [3]:
%pyspark

#encontrando quantos reviews existem
num_reviews = rdd.count()

print("=======================================")
print("reviews existentes")
print(num_reviews)

In [4]:
%pyspark
spark

In [5]:
%pyspark
df = spark.read.option("header", "false") \
    .option("delimiter", "\t") \
    .option("inferSchema", "true") \
    .csv("s3://megadados-alunos/dados/all_reviews_clean_tsv/")

In [6]:
%pyspark

#renomeando as colunas
df = df \
    .withColumnRenamed("_c0", "marketplace") \
    .withColumnRenamed("_c1", "customer_id") \
    .withColumnRenamed("_c2", "review_id") \
    .withColumnRenamed("_c3", "product_id") \
    .withColumnRenamed("_c4", "product_parent") \
    .withColumnRenamed("_c5", "product_title") \
    .withColumnRenamed("_c6", "product_category") \
    .withColumnRenamed("_c7", "star_rating") \
    .withColumnRenamed("_c8", "helpful_votes") \
    .withColumnRenamed("_c9", "total_votes") \
    .withColumnRenamed("_c10", "vine") \
    .withColumnRenamed("_c11", "verified_purchase") \
    .withColumnRenamed("_c12", "review_headline") \
    .withColumnRenamed("_c13", "review_body") \
    .withColumnRenamed("_c14", "review_date")

In [7]:
%pyspark
#armazenando dataframe em cache
df.cache()

In [8]:
%pyspark
#encontrando quantos clientes existem
num_customers = df[["customer_id"]].distinct().count()

print("=======================================")
print("clientes existentes")
print(num_customers)

In [9]:
%pyspark
df[["product_id", "product_parent", "product_title", "product_category"]].show()

In [10]:
%pyspark

#encontrando quantos produtos existem
num_products = df[["product_id"]].distinct().count()

print("=======================================")
print("produtos existentes")
print(num_products)

In [11]:
%pyspark
df[["review_id", "star_rating"]].show()

In [12]:
%pyspark

#filtrando dataframe por star_rating para encontrar quantos reviews existem para cada star_rating
filter_ratings = df.where((df["star_rating"] == '1') | (df["star_rating"] == '2') | (df["star_rating"] == '3') | (df["star_rating"] == '4') | (df["star_rating"] == '5'))

print("===========================================")
print("numero de reviews por star rating de 1 a 5")
filter_ratings.groupBy("star_rating").count().show()

In [13]:
%pyspark

group_customer = df.groupBy("customer_id").count()
one_rating_customer = group_customer.where(group_customer["count"] == 1)
df_one_rating = df.join(one_rating_customer, ["customer_id"])

group_product_id = df_one_rating.where((df["star_rating"] == '1') | (df["star_rating"] == '2') | (df["star_rating"] == '3') | (df["star_rating"] == '4') | (df["star_rating"] == '5')).groupBy("product_id").count()

df_count_ratings = df.join(group_product_id, ["product_id"])
df_more_than_10_ratings = df_count_ratings.where(df_count_ratings["count"] >= 10)
df_more_than_10_ratings.describe(["count"]).show()

mean_10_plus_ratings = 233.5877567471091
df_10_best_ratings = df_more_than_10_ratings.where(df_more_than_10_ratings["count"] > 233)

print("===========================================")
print("conceito A, Tarefa 1")
df_10_best_ratings.orderBy(["count"], ascending=False).limit(10).show()

___

## Tarefa 2

In [15]:
%pyspark

#========================================
#criando dataframe de customers
#========================================

#agrupando dataframe geral por customer_id
filter_customer = df.groupby("customer_id").count()
count_customers = filter_customer.orderBy(["count"], ascending=False)
count_customers.show()

In [16]:
%pyspark
from pyspark.sql.types import IntegerType

#gerando dados estatisticos basicos do dataframe de customers
count_customers.describe(["count"]).show()

In [17]:
%pyspark

#calculo do threshold entre bots e customers reais
customer_std = 20.132207420736695
customer_mean = 4.506656831142033
norm_dist = (3*customer_std) + customer_mean
bots_reviews = round(norm_dist)

print("=======================================")
print("threshold entre bots e customers reais")
print(bots_reviews)

In [18]:
%pyspark

#filtrando o  dataframe de customers por review, separando bots e customers reais
filter_bots = count_customers.where((count_customers["count"] >= bots_reviews))
filter_real_customers = count_customers.where((count_customers["count"] <= bots_reviews))

#dataframe de bots
print("======================================")
print("bots")
filter_bots.show()

#dataframe de customers
print("======================================")
print("real customers")
filter_real_customers.show()

#numero de bots de acordo com o threshold de 65 reviews
print("======================================")
print("number of bots")
filter_bots.count()

In [19]:
%pyspark

#juntando o  dataframe real com o dataframe de customers
df_join_count = df.join(count_customers, ["customer_id"])
just_bots = df_join_count.where(df_join_count["count"] >= bots_reviews)
bot_characters = just_bots[["customer_id", "product_category", "star_rating"]]
num_bots = bot_characters.groupby("customer_id").count()
print("======================================")
print("number of bots")
print(num_bots.count())

In [20]:
%pyspark

#agrupando dataframe de bots por categoria dos produtos que o bot faz review
bot_categories = bot_characters.groupby("product_category").count()
bot_categories.orderBy(["count"], ascending=False).show()

In [21]:
%pyspark
#agrupando o dataframe de bots por customer_id
bots_groupby_id = bot_characters.groupby("customer_id").count()

#juntando o dataframe agrupado por customer_id de bots com o dataframe real
df_customers_groupby = df.join(bots_groupby_id, ["customer_id"])

#separando reviews positivos
positive_reviews = df_customers_groupby.where(df_customers_groupby["star_rating"] == 5)

#separando reviews neutros
neutral_reviews = df_customers_groupby.where(df_customers_groupby["star_rating"] == 4)

#separando reviews negativos
negative_reviews = df_customers_groupby.where(df_customers_groupby["star_rating"] <= 3)

#contagem de reviews positivos
num_pos_reviews = positive_reviews.count()

#contagem de reviews neutros
num_neu_reviews = neutral_reviews.count()

#contagem de reviews negativos
num_neg_reviews = negative_reviews.count()

print("======================================")
print("positive reviews")
print(num_pos_reviews)

print("======================================")
print("netral reviews")
print(num_neu_reviews)

print("======================================")
print("negative reviews")
print(num_neg_reviews)

___

## Tarefa 3

In [23]:
%pyspark
from pyspark.sql.functions import when
#dropando as linhas com valor NULL
df_no_nulls = df.na.drop()

# ===================================================================
# Construindo coluna com os labels de rating(positive, negative e neutral) 
# ===================================================================

df_naive_bayes = df_no_nulls.withColumn("rating_labels", when(df["star_rating"] == "5", "positive")
                                            .when(df["star_rating"] == "4", "neutral")
                                            .when(df["star_rating"] == "3", "negative")
                                            .when(df["star_rating"] == "2", "negative")
                                            .when(df["star_rating"] == "1", "negative")
                                            .when(df["star_rating"] == "0", "negative"))
df_naive_bayes.show()

In [24]:
%pyspark
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import NaiveBayes

# ===================================================================
# RegexTokenizer, CountVectorizer, StringIndexer e VectorAssembler
# ===================================================================

stages = []
# 1. clean data and tokenize sentences using RegexTokenizer
regexTokenizer = RegexTokenizer(inputCol="review_body", outputCol="tokens", pattern="\\W+")
stages += [regexTokenizer]

# 2. CountVectorize the data
cv = CountVectorizer(inputCol="tokens", outputCol="token_features", minDF=2.0)#, vocabSize=3, minDF=2.0
stages += [cv]

# 3. Convert the labels to numerical values using binariser
indexer = StringIndexer(inputCol="rating_labels", outputCol="label")
stages += [indexer]

# 4. Vectorise features using vectorassembler
vecAssembler = VectorAssembler(inputCols=['token_features'], outputCol="features")
stages += [vecAssembler]

In [25]:
%pyspark
from pyspark.ml import Pipeline

# ===================================================================
# Pipeline para o Naive Bayes 
# ===================================================================

pipeline = Pipeline(stages=stages)
data = pipeline.fit(df_naive_bayes).transform(df_naive_bayes)

In [26]:
%pyspark

# ===================================================================
# Separando dados de treinamento e teste 
# ===================================================================
train, test = data.randomSplit([0.7, 0.3], seed = 494)

In [27]:
%pyspark
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
model = nb.fit(train)
predictions = model.transform(test)
predictions.select("label", "prediction", "probability").show()

In [28]:
%pyspark
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print ("Model Accuracy: ", accuracy)