In [None]:
import re, string
from pyspark import SparkContext, SparkConf

In [None]:
conf = SparkConf().setAppName("TP2 - BD2")
sc = SparkContext(conf=conf)
sc._jsc.hadoopConfiguration().set("textinputformat.record.delimiter", "\n\n")

In [None]:
regex_id = re.compile(r'Id:(.*)(\n(.)*)*Id:')

In [None]:
rdd_amazon = sc.textFile("./amazon-meta-pequeno.txt")

## Funções

In [None]:
def encode_and_get_Id(register):
    converted_register = register.encode('utf-8')
    aux = re.findall("Id:   \d*", converted_register)
    if aux: 
        id = re.findall("\d+", aux[0])
        if id:
            id = int(id[0])
            return id

In [None]:
def encode_and_get_ASIN(register):
    converted_register = register.encode('utf-8')
    aux = re.findall("ASIN: \d*", converted_register)
    if aux: 
        id = re.findall("\d+", aux[0])
        if id:
            id = int(id[0])
            return id

In [None]:
def get_reviews(product):
    converted_product = product.encode('utf-8')
    converted_product = converted_product.split('\n')
    reviews = []
    for index, line in enumerate(converted_product):
        if 'reviews' in line:
            for review_line in converted_product[index + 1:]:
                reviews.append(review_line)
    return reviews

In [None]:
# ESTRUTURA: NAME - RATING - VOTES - HELPFUL
def format_review(review):
    return (review.split('rating: ')[0], int(review.split('rating: ')[1][0:1]), int(review.split('votes: ')[1][1:3]), int(review.split('helpful: ')[1][1:3]))

In [None]:
def get_salesrank(product):
    converted_product = product.encode('utf-8')
    converted_product = converted_product.split('\n')
    return int(converted_product[4].split('salesrank: ')[1])

In [None]:
def get_similars(product):
    similars = []
    converted_product = product.encode('utf-8')
    converted_product = converted_product.split('\n')
    splitted_produts = converted_product[5].split('similar: ')[1]
    splitted_produts = splitted_produts.split(' ')
    quantity = int(splitted_produts[0])
    for product in splitted_produts[1:]:
        if product:
            similars.append(product)
    return similars

In [None]:
def return_greater_seller(product, sale_rank):
    converted_product = product.encode('utf-8')
    converted_product = converted_product.split('\n')
    product_sale_rank = int(converted_product[4].split('salesrank: ')[1])
    if product_sale_rank > sale_rank:
        return product

### rdd_mapeado é um rdd com todos os elementos mapeados pelo seu ID, facilitando o retrive do registro

In [None]:
rdd_mapeado_ids = rdd_amazon.map(lambda x: (encode_and_get_Id(x), x))
rdd_mapeado_asin = rdd_amazon.map(lambda x: (encode_and_get_ASIN(x), x))

In [None]:
#INSIRA AQUI O ID DO PRODUTO A SER BUSCADO
id_produto_desejado = 33
produto_procurado = rdd_mapeado_ids.lookup(id_produto_desejado)

## A)

Para essa parte foi feito primeiro uma extração das reviews que foram transformadas numa lista de estrutura:
Name - Rating- Votes - Helpfull.
Depois de extraidos, esses dados foram transformados em um RDD para poder aproveitarmos das funções do Spark, como no caso o TakeOrdered, que extrai os 5 melhores valores dependendo da situação pedida

In [None]:
reviews_list = get_reviews(produto_procurado[0])
reviews_rdd = sc.parallelize(reviews_list)
reviews_formated = reviews_rdd.map(format_review)

In [None]:
helpful_rating = reviews_formated.takeOrdered(5, lambda x: (-x[1],-x[3]))
print(helpful_rating)

In [None]:
helpful_bad_rating = reviews_formated.takeOrdered(5, lambda x: (x[1],-x[3]))
print(helpful_bad_rating)

## B)

Para essa atividade foi feito algo parecido com o quesito A, primeiro extraimos a posição de vendas do produto em analise, depois extraimos os ASINS dos produtos similares e geramos uma lista. Com essa lista gerada, fazemos um lookup para recuperarmos os registros e transformamos essa lista de registros em uma rdd. Uma vez transformada em um rdd podemos fazer um map para uma funcao que retorna apenas os produtos com salesrank maior que o objeto requisitado

In [None]:
product_salesrank = get_salesrank(produto_procurado[0])

In [None]:
similars = get_similars(produto_procurado[0])
similars_asins = []
for similar in similars:
    print(similar)
    s = rdd_mapeado_asin.lookup(similar)
    similars_asins.append(s)
print(similars_asins)

In [None]:
similars_rdd = sc.parallelize(similars_asins)

In [None]:
similars_best_salers = similars_rdd.map(lambda x: return_greater_seller(x,product_salesrank))    

In [None]:
similars_best_salers.collect()