In [1]:
import findspark
findspark.init()

from pyspark import HiveContext

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("myApp") \
    .enableHiveSupport() \
    .getOrCreate()

hiveContext = HiveContext(spark.sparkContext)

In [2]:
# Mostra as databases existentes no hive
hiveContext.sql("SHOW DATABASES").show()

+---------+
|namespace|
+---------+
|  default|
|    sales|
+---------+



In [3]:
# Carregando todos os dados
df = spark.read.csv("../amazon_reviews_us_PC_v1_00.tsv", sep=r'\t', header=True)
df.createOrReplaceTempView("temp")
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: string (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



# Criando a tabela de avaliações

In [4]:
# Review table
## customer_id
## review_id
## product_id
## star_rating
reviewsTable = spark.sql("SELECT customer_id, review_id, product_id, star_rating FROM temp")

# Versão inicial da Review Table sem as transformações
reviewsTable.show()

+-----------+--------------+----------+-----------+
|customer_id|     review_id|product_id|star_rating|
+-----------+--------------+----------+-----------+
|   22873041|R3ARRMDEGED8RD|B00KJWQIIC|          5|
|   30088427| RQ28TSA020Y6J|B013ALA9LA|          5|
|   20329786| RUXJRZCT6953M|B00PML2GQ8|          1|
|   14215710| R7EO0UO6BPB71|B001NS0OZ4|          1|
|   38264512|R39NJY2YJ1JFSV|B00AQMTND2|          5|
|   30548466|R31SR7REWNX7CF|B00KX4TORI|          5|
|     589298| RVBP8I1R0CTZ8|B00P17WEMY|          3|
|   49329488|R1QF6RS1PDLU18|B00TR05L9Y|          4|
|   50728290|R23AICGEDAJQL1|B0098Y77OG|          1|
|   37802374|R2EY3N4K9W19UP|B00IFYEYXC|          5|
|   52027882| RC9AW4HKJ016M|B0091ITP0S|          1|
|   41770239|R2ALWJE9N6ZBXD|B008I21EA2|          1|
|   42560427|R2G5FPA4OX37GV|B00MRB7SBO|          5|
|   46345923|R1IKTSEVXSIMOD|B00LLER2CS|          5|
|   41751192|R2YA6G6SRFEWF6|B00B0CQCCC|          1|
|   21176481| RS9H1N9I3Z1IA|B00GU8W5AE|          5|
|   10674058

# Calculando média e medianas de estrelas - Contando o número de avaliações por produto

In [5]:
# Product table
## product_id
## product_title
## star_avg (CRIAR)
## star_median (CRIAR)
## total_reviews (CRIAR)

# Calcular número total de avaliações por produto
# Calcular média de estrelas por produto
# Calcular mediana de estrelas por produto

productInfos = spark.sql("SELECT product_id, percentile_approx(star_rating, 0.5) as star_median, \
avg(star_rating) as star_average, count(product_id) as total_reviews FROM temp GROUP BY product_id")
productInfos.createOrReplaceTempView("products")

# Versão inicial da tabela de produtos, ainda com produtos duplicados.
productInfos.show()

+----------+-----------+------------------+-------------+
|product_id|star_median|      star_average|total_reviews|
+----------+-----------+------------------+-------------+
|9875987018|        4.0|               4.5|            2|
|9966285946|        5.0|               5.0|            1|
|9966694242|        5.0|               4.6|           10|
|9967222247|        5.0|               5.0|            3|
|9985538803|        5.0|               5.0|            1|
|9985725344|        5.0|               5.0|            1|
|9989476071|        5.0|               5.0|            1|
|9990950369|        2.0|               2.0|            1|
|B00000J3SV|        1.0|               3.0|            2|
|B00000JBK6|        5.0| 4.153846153846154|           13|
|B00002S73F|        5.0|               3.7|           10|
|B00004VV4B|        5.0|               5.0|            1|
|B00004YNSK|        5.0|               4.5|            8|
|B00004Z7BU|        5.0|               5.0|            1|
|B00005045V|  

# Removendo linhas duplicadas

In [6]:
# Removendo os produtos duplicados
## Em testes foi verificado que o produto com ID = B0049SIJ7K está duplicado duplicado
productNames = spark.sql("SELECT DISTINCT product_id, product_title FROM temp")
productNames = productNames.dropDuplicates(["product_id"])
productTable = productNames.join(productInfos, 'product_id')
productTable.show()
productTable.createOrReplaceTempView("products")

+----------+--------------------+-----------+------------------+-------------+
|product_id|       product_title|star_median|      star_average|total_reviews|
+----------+--------------------+-----------+------------------+-------------+
|9875987018|Professional Ultr...|        4.0|               4.5|            2|
|9966285946|Professional King...|        5.0|               5.0|            1|
|9966694242|Professional King...|        5.0|               4.6|           10|
|9967222247|Professional King...|        5.0|               5.0|            3|
|9985538803|Samsung Galaxy St...|        5.0|               5.0|            1|
|9985725344|Professional King...|        5.0|               5.0|            1|
|9989476071|Professional King...|        5.0|               5.0|            1|
|9990950369|Samsung SGH-i780 ...|        2.0|               2.0|            1|
|B00000J3SV|Intel ICS2USB Cre...|        1.0|               3.0|            2|
|B00000JBK6|    ALTEC ACS SERIES|        5.0| 4.1538

In [7]:
# Removendo as reviews duplicadas
## Em testes foram encontradas 1764 reviews duplicadas
reviewsTable = reviewsTable.dropDuplicates(["customer_id", "product_id", "star_rating"])
reviewsTable.show()

+-----------+--------------+----------+-----------+
|customer_id|     review_id|product_id|star_rating|
+-----------+--------------+----------+-----------+
|   10001434|R15PWB2RWMDYY0|B0028QQC0Q|          1|
|   10002051|R2FV71G5DAQK2V|B0069ASUBQ|          3|
|   10002342|R2X44EUDV89CD0|B00CXAFG72|          5|
|   10002911|R2I3W2ZQ7WDNR1|B00HKEI3EY|          5|
|   10003679|R3PPUM8GC0P4VV|B00A1AK6EE|          4|
|   10003897| RU1J6AWUQDH8A|B000SR120M|          5|
|   10004084|R355XMTXY1GOQF|B001SH2AVQ|          5|
|   10004974|R236IN0TMKW8F9|B007PTCFFW|          1|
|   10006218|R3OJNQVUXB39PW|B00A1EDRR8|          5|
|   10006371|R1M16Q84UAFF6L|B00WBCBPDQ|          4|
|   10006579|R1VUBMDCR5R9ZS|B000GP844S|          5|
|   10007900|R1Q864KE8JGKUZ|B002VJJMSO|          5|
|   10008642| RKIHY06FGFBFU|B007C0Y8PY|          5|
|   10008821|R3RBBU21WYULB1|B0002K6RK0|          5|
|   10010828|R3BQA6G9M6GNYV|B000GHXTBO|          5|
|   10011267|R14OH1P4LDYMF0|B00HAHFQL4|          5|
|    1001155

# Salvando os dataframes no Hive

In [12]:
reviewsTable.write.mode("overwrite").saveAsTable("sales.user_reviews")
productTable.write.mode("overwrite").saveAsTable("sales.products")

# Consulta 1 - Retornar os produtos mais avaliados

In [25]:
consulta1 = hiveContext.sql("SELECT * FROM sales.products ORDER BY total_reviews DESC")
consulta1.show()

+----------+--------------------+-----------+------------------+-------------+
|product_id|       product_title|star_median|      star_average|total_reviews|
+----------+--------------------+-----------+------------------+-------------+
|B0051VVOB2|Kindle Fire (Prev...|        5.0|               4.0|        23313|
|B00JG8GOWU|Kindle Paperwhite...|        5.0| 4.555773955773955|        18315|
|B00BWYQ9YE|Kindle Fire HDX 7...|        5.0|  4.32381292395573|        16806|
|B006GWO5WK|Amazon Kindle 9W ...|        5.0| 4.308609693877551|        15680|
|B002Y27P3M|Kindle Keyboard, ...|        5.0| 4.440416946872898|        14870|
|B0083PWAPW|Kindle Fire HD 7"...|        5.0|4.1922288992078665|        14644|
|B00IKPYKWG|Fire HD 7, 7" HD ...|        5.0| 4.252794214332676|        12168|
|B00AWH595M|Kindle Paperwhite...|        5.0| 4.543020861312562|        11169|
|B00I15SB16|Kindle, 6" Glare-...|        5.0| 4.198477751756441|        10248|
|B0015T963C|Kindle Wireless R...|        5.0| 4.4164

# Consulta 2 - Retornar os produtos com melhor média de avaliação a partir do número de avaliações

In [21]:
min_reviews = 3
consulta2 = hiveContext.sql("SELECT * FROM sales.products WHERE total_reviews >= {} \
ORDER BY star_average DESC".format(min_reviews))
consulta2.show()

+----------+--------------------+-----------+------------+-------------+
|product_id|       product_title|star_median|star_average|total_reviews|
+----------+--------------------+-----------+------------+-------------+
|B001670JES|Sony 16GB Memory ...|        5.0|         5.0|            3|
|B004QDRU64|Boxed Intel H61 M...|        5.0|         5.0|            3|
|B001SH5OHI|CaseCrown Durable...|        5.0|         5.0|            4|
|9967222247|Professional King...|        5.0|         5.0|            3|
|B00284AM9U|Hitachi Travelsta...|        5.0|         5.0|            3|
|B00006B6TF|Antec TruePower 5...|        5.0|         5.0|            4|
|B002ATBNQO|Incipio TriFold N...|        5.0|         5.0|            3|
|B00006B870|3.5in 1.44MB Mult...|        5.0|         5.0|            3|
|B004MT6324|TOP CASE - Silico...|        5.0|         5.0|            3|
|B000E391MQ|M-Audio Audiophil...|        5.0|         5.0|            5|
|B000HLYLT8|Sony VAIO VGN-FE7...|        5.0|      

In [23]:
min_reviews = 3
consulta2 = hiveContext.sql("SELECT * FROM sales.products WHERE total_reviews >= {} \
ORDER BY star_average DESC".format(min_reviews))
consulta2.show()

+----------+--------------------+-----------+------------+-------------+
|product_id|       product_title|star_median|star_average|total_reviews|
+----------+--------------------+-----------+------------+-------------+
|B003DR4X50|Marware Sport Gri...|        5.0|         5.0|            3|
|B007NG4LPY|9.7" 10" 10.1" 10...|        5.0|         5.0|            3|
|B000UQ5MD4|Sumdex Fashion Pl...|        5.0|         5.0|            5|
|B00115U2UQ|1GB 800Mhz DDR2 P...|        5.0|         5.0|            3|
|B0001CWI2A|Sony VAIO PCV-RZ5...|        5.0|         5.0|            3|
|B003U8K4E6|Laptop/Notebook B...|        5.0|         5.0|            3|
|B000A0RIHS|VARTA CR2032 Equi...|        5.0|         5.0|            4|
|B003YEYMP8|Bear Motion ® Lux...|        5.0|         5.0|            4|
|B000IE5W7Y|HP RC465AA USB Ke...|        5.0|         5.0|           10|
|B0041LN94I|PrimoFlex Pro LRT...|        5.0|         5.0|            3|
|B001P2EUYE|HQRP 15V 90W Lapt...|        5.0|      

# Consulta 3 - Retornar os consumidores que mais avaliaram produtos 

In [20]:
consulta3 = hiveContext.sql("SELECT customer_id, count(customer_id) as \
count_reviews FROM sales.user_reviews GROUP BY customer_id ORDER BY count(customer_id) DESC")
consulta3.show()

+-----------+-------------+
|customer_id|count_reviews|
+-----------+-------------+
|   17957446|          458|
|   44834233|          442|
|   52938899|          366|
|   45664110|          275|
|   49452274|          261|
|   50820654|          256|
|   45070473|          251|
|   12200139|          251|
|   32038204|          241|
|   49266466|          240|
|   39789300|          236|
|   43856165|          228|
|   29863275|          226|
|   52859210|          225|
|   52340667|          222|
|   50027179|          217|
|   53037408|          213|
|   52690060|          211|
|   51346302|          209|
|   52318215|          195|
+-----------+-------------+
only showing top 20 rows



# Consulta 4 - Retornar média de estrelas das marcas presentes no sistema

In [38]:
! pip3 install pandas

Collecting pandas
  Downloading https://files.pythonhosted.org/packages/c3/e2/00cacecafbab071c787019f00ad84ca3185952f6bb9bca9550ed83870d4d/pandas-1.1.5-cp36-cp36m-manylinux1_x86_64.whl (9.5MB)
[K    100% |████████████████████████████████| 9.5MB 4.0kB/s eta 0:00:01��████████████████████▋   | 8.5MB 30.8MB/s eta 0:00:01
[?25hCollecting pytz>=2017.2 (from pandas)
  Using cached https://files.pythonhosted.org/packages/d3/e3/d9f046b5d1c94a3aeab15f1f867aa414f8ee9d196fae6865f1d6a0ee1a0b/pytz-2021.3-py2.py3-none-any.whl
Collecting python-dateutil>=2.7.3 (from pandas)
  Using cached https://files.pythonhosted.org/packages/36/7a/87837f39d0296e723bb9b62bbb257d0355c7f6128853c78955f57342a56d/python_dateutil-2.8.2-py2.py3-none-any.whl
Collecting numpy>=1.15.4 (from pandas)
  Downloading https://files.pythonhosted.org/packages/45/b2/6c7545bb7a38754d63048c7696804a0d947328125d81bf12beaa692c3ae3/numpy-1.19.5-cp36-cp36m-manylinux1_x86_64.whl (13.4MB)
[K    100% |████████████████████████████████| 13.4MB

In [39]:
import pandas

marcas = ["apple", "microsoft", "dell", "nvidia", "sony", "polaroid", "hp", "tp-link", "lenovo", "linksys", "nintendo", "philips", "canon", "panasonic", "kingston", "hitachi", "fermax", "asus", "xiaomi", "nokia", "logitech", "samsung", "motorola", "amazfit", "altera", "cisco", "lg", "evga", "microsoft", "philco", \
          "western", "panasonic", "seagate", "alienware", "thermaltake", "sandisk", "ibm", "compaq", "acer", "toshiba", \
          "belkin", "siemens", "crucial", "fujitsu", "d-link", "rayovac", "steelseries", "duracell", "benq", "hitachi", \
          "asrock", "zotac", "gigabyte", "silverstone", "blackberry", "fujifilm", "logisys", "kodak", "casio", "yamaha"]

consulta4 = hiveContext.sql("SELECT product_title FROM sales.products")

df = consulta4.select("*").toPandas()

marcas_produtos = []

for index, row in df.iterrows():
   found = 0
   for marca in marcas:
     nome_produto = row["product_title"]
     nome_produto = nome_produto.lower()
     if marca in nome_produto:
       found = 1
       marcas_produtos.append(marca)
       break
   if found == 0:
     marcas_produtos.append("unknown")

df["marca"] = marcas_produtos
df.to_csv('marcas_produtos.csv', sep= '\t')

marcas = spark.read.csv("./marcas_produtos.csv", sep= '\t', header=True)
marcas.createOrReplaceTempView("marcas")

products = hiveContext.sql("SELECT * FROM sales.products")
marcas = spark.sql("SELECT marca FROM marcas")

from pyspark.sql.window import Window
from pyspark.sql.functions import monotonically_increasing_id, row_number
w = Window.orderBy(monotonically_increasing_id())

products = products.withColumn("columnindex", row_number().over(w))
marcas = marcas.withColumn("columnindex", row_number().over(w))

productTable = products.join(marcas, "columnindex").drop(products.columnindex)
productTable.write.mode("overwrite").saveAsTable("sales.products_with_brands")

brands = hiveContext.sql("SELECT marca, avg(star_average) FROM sales.products_with_brands WHERE \
marca <> 'unknown' GROUP BY marca ORDER BY avg(star_average) DESC")

In [40]:
brands.show()

+-----------+-----------------+
|      marca|avg(star_average)|
+-----------+-----------------+
|    rayovac|              5.0|
|     altera|4.553571428571428|
|    crucial|4.452437510967074|
|       evga|4.390014082874379|
|   kingston|4.307081524227381|
|    sandisk|4.288081357578996|
|silverstone| 4.21852950983423|
|     yamaha|4.199819051536674|
|   fujifilm|4.184492142336977|
|      kodak|4.180007634280386|
|thermaltake|4.179309089056252|
|      casio|4.172671138630891|
|      canon|4.170336757822482|
|      cisco|4.145100470521573|
|    hitachi|4.137024049274133|
|       benq|4.134088301070568|
|         lg| 4.11455968428097|
|      zotac|4.112458918172588|
|   logitech|4.091913417095392|
|   gigabyte|4.087722502355112|
+-----------+-----------------+
only showing top 20 rows

