<a href="https://colab.research.google.com/github/Tiago-Pontes/Desafio-Engenharia-de-Dados/blob/main/Tiago_Pontes_Gomes_Desafio_Data_Enginner.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Importando Spark

Aqui começo instalando a dependência do Java 8, Apache Spark 3.2.1 junto com o Hadoop 3.2 para rodar o Spark no Colab.

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null  
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark

Próximo passo configurando as váriaveis de ambiente, pois isso habilita o ambiente Colab a indentificar corretamente onde as dependências estão rodando. Usando a biblioteca 'os' para manipular o terminal e interagir com ele.

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64/"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

A seguir importo a biblioteca findspark, que permite importar pacotes necessários para o funcionamento do PySpark.

In [3]:
import findspark
findspark.init()

Iniciando PySpark, após isso importando uma SparkSession e todas as funçôes de Sql do Pyspark.



In [4]:
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.functions import *

---

SparkSession é um ponto de entrada para PySpark e criar uma instância SparkSession seria a primeira instrução que você escreveria para programar com RDD, DataFrame e Dataset. SparkSession será criado usando SparkSession.builder por padrão.


In [5]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

---

# Tratando a base de dados da Netflix

Baixando e descompactando o dataset da Netflix.

In [6]:
!wget -O netflix.csv "https://storage.googleapis.com/kaggle-data-sets/1636/792972/bundle/archive.zip?X-Goog-Algorithm=GOOG4-RSA-SHA256&X-Goog-Credential=gcp-kaggle-com%40kaggle-161607.iam.gserviceaccount.com%2F20220702%2Fauto%2Fstorage%2Fgoog4_request&X-Goog-Date=20220702T015558Z&X-Goog-Expires=259199&X-Goog-SignedHeaders=host&X-Goog-Signature=6bf787b68b1db6f81d7e917f3aa1b150e0a95c0786193c7ed50a1c655cfa04988c82e8c129b8e17c620fa1c6a8ea80b64cfe529b6333c1c2b33c44b0eecab309c7d0406ec7078eedc281452b5b8823210ddd5895c6afd11e1de5a03be954d255d64b22c2fe561f56fb12541b904a7b5f6cf2567411af2f2b25da544938886a35130200561c5d38c4e898a1d8e3665f2aa44c04baa2d0be658ff2a07858c044ec646d14e27dccb0f3c191c45f3bccf55785db256647d050ef0c609c44ca3fdea3971b37af548ebb047be5e74c8bfd6a342cf0e294f76bb39e88387d6ec79e31ef67ab00b9c2f722a18e558951582a52bcbb383f0576301225d0a88ae845f2530f"

!unzip "netflix.csv"

--2022-07-03 02:59:22--  https://storage.googleapis.com/kaggle-data-sets/1636/792972/bundle/archive.zip?X-Goog-Algorithm=GOOG4-RSA-SHA256&X-Goog-Credential=gcp-kaggle-com%40kaggle-161607.iam.gserviceaccount.com%2F20220702%2Fauto%2Fstorage%2Fgoog4_request&X-Goog-Date=20220702T015558Z&X-Goog-Expires=259199&X-Goog-SignedHeaders=host&X-Goog-Signature=6bf787b68b1db6f81d7e917f3aa1b150e0a95c0786193c7ed50a1c655cfa04988c82e8c129b8e17c620fa1c6a8ea80b64cfe529b6333c1c2b33c44b0eecab309c7d0406ec7078eedc281452b5b8823210ddd5895c6afd11e1de5a03be954d255d64b22c2fe561f56fb12541b904a7b5f6cf2567411af2f2b25da544938886a35130200561c5d38c4e898a1d8e3665f2aa44c04baa2d0be658ff2a07858c044ec646d14e27dccb0f3c191c45f3bccf55785db256647d050ef0c609c44ca3fdea3971b37af548ebb047be5e74c8bfd6a342cf0e294f76bb39e88387d6ec79e31ef67ab00b9c2f722a18e558951582a52bcbb383f0576301225d0a88ae845f2530f
Resolving storage.googleapis.com (storage.googleapis.com)... 173.194.215.128, 173.194.217.128, 173.194.218.128, ...
Connecting to storage.

Spark lendo arquivo csv, com cabeçalho e inferindo Schema. Após isso renomendo o nome das colunas.

In [7]:
netflix_1 = spark.read.csv('/content/movie_titles.csv', header=False, inferSchema=True)

netflix_1 = netflix_1.selectExpr("_c0 as movie_id", "_c1 as year", "_c2 as movies")

In [8]:
netflix_1.printSchema()

root
 |-- movie_id: integer (nullable = true)
 |-- year: string (nullable = true)
 |-- movies: string (nullable = true)



Criando um arquivo vazio e recebendo (*combined_data_*\*), essa função executa os ids de todos os filmes até comecar o proximo apartir de ( : ) contando + 1, apagando o ( : ) e separando por vírgula.

In [9]:
start_t = time.time()

if not os.path.isfile('netflix_2.csv'):
    data = open('netflix_2.csv', mode='w')
    row = list()
    files=['/content/combined_data_1.txt','/content/combined_data_2.txt',
           '/content/combined_data_3.txt','/content/combined_data_4.txt']
    for file in files:
        with open(file) as f:
            for line in f: 
                line = line.strip()
                if line.endswith(':'):
                    movie_id = line.replace(':', '')
                else:
                    row = [x for x in line.split(',')]
                    row.insert(0, movie_id)
                    data.write(','.join(row))
                    data.write('\n')
    data.close()

165.12935328483582


Lendo o arquivo *'netflix_2.csv'* sem cabecalho e inferindo Schema. Logo abaixo renomeando as colunas.


In [10]:
netflix_2 = spark.read.csv('/content/netflix_2.csv', header=False, inferSchema=True)      

netflix_2 = netflix_2.selectExpr("_c0 as movie_id", "_c1 as user", "_c2 as rating","_c3 as date")

Criando a variavel *netflix* com um join entre netflix_1 e 2 para uso futuro.

In [11]:
netflix = netflix_2.join(F.broadcast(netflix_1),['movie_id'],how='inner')
netflix.show()

+--------+-------+------+----------+----+---------------+
|movie_id|   user|rating|      date|year|         movies|
+--------+-------+------+----------+----+---------------+
|       1|1488844|     3|2005-09-06|2003|Dinosaur Planet|
|       1| 822109|     5|2005-05-13|2003|Dinosaur Planet|
|       1| 885013|     4|2005-10-19|2003|Dinosaur Planet|
|       1|  30878|     4|2005-12-26|2003|Dinosaur Planet|
|       1| 823519|     3|2004-05-03|2003|Dinosaur Planet|
|       1| 893988|     3|2005-11-17|2003|Dinosaur Planet|
|       1| 124105|     4|2004-08-05|2003|Dinosaur Planet|
|       1|1248029|     3|2004-04-22|2003|Dinosaur Planet|
|       1|1842128|     4|2004-05-09|2003|Dinosaur Planet|
|       1|2238063|     3|2005-05-11|2003|Dinosaur Planet|
|       1|1503895|     4|2005-05-19|2003|Dinosaur Planet|
|       1|2207774|     5|2005-06-06|2003|Dinosaur Planet|
|       1|2590061|     3|2004-08-12|2003|Dinosaur Planet|
|       1|   2442|     3|2004-04-14|2003|Dinosaur Planet|
|       1| 543

---

# Quantos filmes estão disponíveis na Netflix?





Executando uma filtragem na coluna *movie_id* verificando valores nulos e fazendo a contagem. 

In [12]:
# Spark
tot_netflix = netflix_1.filter(netflix_1.movie_id.isNotNull()).distinct().count()
print(f'Na  Netflix estão disponiveis {tot_netflix} filmes.')

Na  Netflix estão disponiveis 17770 filmes.


In [13]:
# SQL
netflix_1.createOrReplaceTempView('movie_id')
spark.sql('''select distinct count(movie_id) from movie_id where movie_id is not null''').show()

+---------------+
|count(movie_id)|
+---------------+
|          17770|
+---------------+



---

# Tratando a base de dados da Amazon

Baixando a base de dados da Amazon direto na VM com wget. 


In [14]:
!wget "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Video_Download_v1_00.tsv.gz"

--2022-07-03 03:04:53--  https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Video_Download_v1_00.tsv.gz
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.216.242.214
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.216.242.214|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 506979922 (483M) [application/x-gzip]
Saving to: ‘amazon_reviews_us_Digital_Video_Download_v1_00.tsv.gz’


2022-07-03 03:05:00 (69.6 MB/s) - ‘amazon_reviews_us_Digital_Video_Download_v1_00.tsv.gz’ saved [506979922/506979922]



Spark lendo o arquivo csv, separando por tabulação, inferindo Schema e contendo cabeçalho. 


In [15]:
amazon = spark.read.csv('/content/amazon_reviews_us_Digital_Video_Download_v1_00.tsv.gz', sep='\t', inferSchema=True, header=True)

Renomeando a coluna *product_title* para *movies* e retirando as colunas que não serão usadas, facilitando assim um futuro join com a base de dados da Netfl\ix.


In [16]:
amazon_1 = amazon.withColumnRenamed("product_title", "movies").drop(*('customer_id','star_rating','review_date','review_id','total_votes','marketplace','product_id','helpful_votes','product_category','verified_purchase','vine','verified_purchase','review_headline','review_body'))

amazon_2 = amazon.withColumnRenamed("product_title", "movies").drop(*('review_id','total_votes','marketplace','product_id','helpful_votes','product_category','verified_purchase','vine','verified_purchase','review_headline','review_body'))

amazon.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [17]:
amazon_1.show(5)
amazon_2.show(5)

+--------------+--------------------+
|product_parent|              movies|
+--------------+--------------------+
|     668895143|Enlightened: Seas...|
|     246219280|             Vicious|
|     534732318|         After Words|
|     239012694|Masterpiece: Insp...|
|     535858974|   On The Waterfront|
+--------------+--------------------+
only showing top 5 rows

+-----------+--------------+--------------------+-----------+-----------+
|customer_id|product_parent|              movies|star_rating|review_date|
+-----------+--------------+--------------------+-----------+-----------+
|   12190288|     668895143|Enlightened: Seas...|          5| 2015-08-31|
|   30549954|     246219280|             Vicious|          5| 2015-08-31|
|   52895410|     534732318|         After Words|          4| 2015-08-31|
|   27072354|     239012694|Masterpiece: Insp...|          5| 2015-08-31|
|   26939022|     535858974|   On The Waterfront|          5| 2015-08-31|
+-----------+--------------+-------------

In [18]:
amazon_2 = amazon_2.filter((~amazon_2.movies.like("%Season%")) & (amazon_2.product_parent.isNotNull()))


---

# Quantos filmes estão disponíveis na Amazon?

Filtrando em Amazon *product_parent* verificando a presença de valores nulos, duplicados e retirando as Séries(*Season*) do meio dos filmes.


In [19]:
# Spark
amazon_1 = amazon_1.dropDuplicates(["product_parent"]).filter((~amazon_1.movies.like("%Season%")) & (amazon_1.product_parent.isNotNull()))
print(f'Na Amazon estão disponiveis {amazon_1.count()} filmes.')

Na Amazon estão disponiveis 59784 filmes.


In [20]:
# SQL
amazon_1.createOrReplaceTempView('product_parent')
spark.sql('''SELECT distinct count(product_parent) FROM product_parent \
           WHERE product_parent is NOT NULL AND movies NOT LIKE "%Season%"''').show()

+---------------------+
|count(product_parent)|
+---------------------+
|                59784|
+---------------------+



---

# Dos filmes disponíveis na Amazon, quantos % estão disponíveis na Netflix?



Executando um join do total da Netflix e da Amazon, com a contagem multiplicando por 100 e dividindo pelo total da Amazon.


In [21]:
# Spark
porc_join = amazon_1.join(F.broadcast(netflix_1), on=['movies'], how='inner').count()* 100 / amazon_1.count()
print('Dos filmes disponíveis na Amazon, estão disponíveis na Netflix {:.2f}'.format(porc_join),'%.')

Dos filmes disponíveis na Amazon, estão disponíveis na Netflix 10.82 %.


In [22]:
porc_join = amazon_1.join(netflix_1, on=['movies'], how='inner').count()* 100 / amazon_1.count()

In [23]:
# SQL
amazon_1.createOrReplaceTempView('product_parent')
netflix_1.createOrReplaceTempView('movies')
spark.sql('''SELECT count(*) * 100 / 66239 FROM movies a INNER JOIN movies n on a.movies == n.movies''').show()

+--------------------------+
|((count(1) * 100) / 66239)|
+--------------------------+
|         28.58436872537327|
+--------------------------+



In [24]:
teste = amazon_1.join(F.broadcast(netflix_1), on=['movies'], how='inner')
teste.count()

6472

In [25]:
# SQL
amazon_1.createOrReplaceTempView('movies')
netflix_1.createOrReplaceTempView('movies')
spark.sql('''select * from movies a, movies n where a.movies == n.movies''').show()

+--------+----+--------------------+--------+----+--------------------+
|movie_id|year|              movies|movie_id|year|              movies|
+--------+----+--------------------+--------+----+--------------------+
|       1|2003|     Dinosaur Planet|       1|2003|     Dinosaur Planet|
|       2|2004|Isle of Man TT 20...|       2|2004|Isle of Man TT 20...|
|       3|1997|           Character|       3|1997|           Character|
|       4|1994|Paula Abdul's Get...|       4|1994|Paula Abdul's Get...|
|       5|2004|The Rise and Fall...|       5|2004|The Rise and Fall...|
|       6|1997|                Sick|       6|1997|                Sick|
|       7|1992|               8 Man|       7|1992|               8 Man|
|       8|2004|What the #$*! Do ...|       8|2004|What the #$*! Do ...|
|       9|1991|Class of Nuke 'Em...|       9|1991|Class of Nuke 'Em...|
|      10|2001|             Fighter|      10|2001|             Fighter|
|      11|1999|Full Frame: Docum...|      11|1999|Full Frame: Do

---


# O quão perto a médias das notas dos filmes disponíveis na Amazon está dos filmes disponíveis na Netflix? #

In [26]:
# Spark
a_media = amazon_2.agg(F.avg('star_rating')).collect()[0][0]
n_media = netflix_2.agg(F.avg('rating')).collect()[0][0]
perto_media = (a_media)-(n_media)

In [27]:
print("Entre Amazon e Netflix está perto em {:.2f}".format(perto_media),"%, Amazon com {:.2f}".format(a_media),"% e Netflix com {:.2f}".format(n_media),"%.")

Entre Amazon e Netflix está perto em 0.35 %, Amazon com 3.95 % e Netflix com 3.60 %.


In [28]:
# SQL
amazon_2.createOrReplaceTempView('movies')
spark.sql('''SELECT cast(avg(star_rating) as decimal(5,2)) as media_amazon From movies''').show()

netflix_2.createOrReplaceTempView('movies')
spark.sql('''SELECT cast(avg(rating) as decimal(5,2)) as media_netflix From movies''').show()

+------------+
|media_amazon|
+------------+
|        3.95|
+------------+

+-------------+
|media_netflix|
+-------------+
|         3.60|
+-------------+



---

# Qual ano de lançamento possui mais filmes na Netflix?



Agrupando *year*, fazendo a contagem dos filmes ordenando por crescente.Imprimindo apenas o primeiro da lista.


In [29]:
# Spark
ano_lanc = netflix_1.groupBy('year').count().orderBy('count',ascending=False).show(1,False)
print(f"O ano de lançamento com mais filmes lançado {ano_lanc} filmes.")

+----+-----+
|year|count|
+----+-----+
|2004|1436 |
+----+-----+
only showing top 1 row

O ano de lançamento com mais filmes lançado None filmes.


In [30]:
ano_lanc_n = netflix_1.groupBy('year').count().orderBy('count',ascending=False).collect()[0][0]
print(f"O ano de lançamento com mais filmes lançados foi {ano_lanc_n}.")

O ano de lançamento com mais filmes lançados foi 2004.


In [31]:
# SQL
netflix_1.createOrReplaceTempView('movies')
spark.sql('''SELECT year, count(*) as total FROM movies\
          GROUP BY year
          ORDER BY count(year) desc
          LIMIT 1''').show()

+----+-----+
|year|total|
+----+-----+
|2004| 1436|
+----+-----+



---

# Qual ano de lançamento possui mais filmes na Amazon?



Agrupando os anos e fazendo a contagem dos filmes em cada ano. 


Fazendo um join com a base da Netflix que tem os anos dos filmes.


In [32]:
unionAN = amazon_1.join(netflix_1, on=['movies'],how='inner')

In [33]:
# Spark
ano_lanc_a = unionAN.groupBy('year').count().orderBy('count',ascending=False).collect()[0][0]
print(f"O ano de lançamento com mais filmes lançado {ano_lanc_a} filmes.")

O ano de lançamento com mais filmes lançado 2002 filmes.


In [34]:
# SQL
unionAN.createOrReplaceTempView('movies')
spark.sql('SELECT year, count(*) as total FROM movies group by year order by count(year) desc').show(1)

+----+-----+
|year|total|
+----+-----+
|2002|  424|
+----+-----+
only showing top 1 row



In [35]:
# SQL
unionAN.createOrReplaceTempView('movies')
spark.sql('''SELECT year, count(*) as total FROM movies\
          GROUP BY year
          ORDER BY count(year) desc
          LIMIT 1''').show()

+----+-----+
|year|total|
+----+-----+
|2002|  424|
+----+-----+



---

#Quais filmes que não estão disponíveis no catálogo da Netflix foram melhor 
#avaliados (notas 4 e 5)?

Netflix e Amazon em um left_semi join para retirar apenas os valores não compartilhados entre ambos. 


In [36]:
n = netflix.filter(netflix.rating >= 4).groupBy('movies').count().orderBy('count',ascending=False)
a = amazon_2.filter(amazon_2.star_rating >= 4).groupBy('movies').count().orderBy('count',ascending=False)

In [37]:
n.createOrReplaceTempView('movies')
spark.sql('''SELECT * FROM movies''').show()

+--------------------+------+
|              movies| count|
+--------------------+------+
|       The Godfather|174050|
|Pirates of the Ca...|153325|
|      The Green Mile|152630|
|        Forrest Gump|151596|
|    Independence Day|135673|
|         The Patriot|134389|
|        Pretty Woman|133429|
|Lord of the Rings...|132570|
|     The Sixth Sense|130869|
|Lord of the Rings...|129794|
|The Shawshank Red...|129771|
|           Gladiator|124280|
|Finding Nemo (Wid...|123889|
|Indiana Jones and...|123036|
|             Shrek 2|120442|
|Lord of the Rings...|120400|
| The Bourne Identity|119884|
|      Ocean's Eleven|117242|
|        Pulp Fiction|116729|
|          The Matrix|113764|
+--------------------+------+
only showing top 20 rows



---

# Quais filmes que não estão disponíveis no catálogo da Amazon foram melhor avaliados
#(notas 4 e 5)?

Netflix e Amazon em um left_semi join para retirar apenas os valores não compartilhados entre ambos. 


In [38]:
n.join(a, on=['movies'], how='left_semi').drop(a.movies).show()

+--------------------+------+
|              movies| count|
+--------------------+------+
|       The Godfather|174050|
|      The Green Mile|152630|
|        Forrest Gump|151596|
|    Independence Day|135673|
|         The Patriot|134389|
|        Pretty Woman|133429|
|Lord of the Rings...|132570|
|     The Sixth Sense|130869|
|           Gladiator|124280|
|             Shrek 2|120442|
| The Bourne Identity|119884|
|        Pulp Fiction|116729|
|          The Matrix|113764|
|          Braveheart|112533|
|     The Incredibles|112466|
|     American Beauty|111768|
|The Silence of th...|110815|
| Saving Private Ryan|110324|
|            Monsters|110200|
|   Miss Congeniality|109557|
+--------------------+------+
only showing top 20 rows



In [39]:
unionAN.createOrReplaceTempView('movies')
spark.sql('''SELECT year, count(*) as total FROM movies\
          GROUP BY year
          ORDER BY count(year) desc
          LIMIT 1''').show()

+----+-----+
|year|total|
+----+-----+
|2002|  427|
+----+-----+



In [47]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
netflix.write.csv("/content/netflix.csv", header=True)
netflix_1.write.csv("/content/netflix_movies.csv", header=True)
netflix_2.write.csv("/content/netflix_rating.csv", header=True)

In [44]:
amazon.write.csv("/content/amazon.csv", header=True)
amazon_1.write.csv("/content/amazon_movies.csv", header=True)
amazon_2.write.csv("/content/amazon_rating.csv", header=True)