# Install

In [4]:
! pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 38 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 61.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=cf043bdd5da3ee4b3aa3129cc57c3f4b35a5a17295c8cb124a2421b4947b96c6
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


# Google

In [2]:
import gspread
from google.auth import default
creds, _ = default()

gc = gspread.authorize(creds)

# Imports and Start the Spark Session

In [14]:
import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

In [13]:
import pyspark

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DateType

import pyspark.pandas as ps


In [15]:
spark = (SparkSession.builder
                    .master("local[*]")
                    .appName("Films")
                    .getOrCreate())

spark

# Functions

In [21]:
PATH_NETFLIX_MOVIES = "/content/drive/MyDrive/Solvimm/neteflix_data/movie_titles.csv"
PATH_NETFLIX_DATA_01 = "/content/drive/MyDrive/Solvimm/neteflix_data/combined_data_1.txt"
PATH_NETFLIX_DATA_02 = "/content/drive/MyDrive/Solvimm/neteflix_data/combined_data_2.txt"
PATH_NETFLIX_DATA_03 = "/content/drive/MyDrive/Solvimm/neteflix_data/combined_data_3.txt"
PATH_NETFLIX_DATA_04 = "/content/drive/MyDrive/Solvimm/neteflix_data/combined_data_4.txt"

PATH_AMAZON_DIGITAL_VIDEO = "/content/drive/MyDrive/Solvimm/amazon_data/amazon_reviews_us_Digital_Video_Download_v1_00.tsv.gz"
PATH_AMAZON_VIDEO_DVD = "/content/drive/MyDrive/Solvimm/amazon_data/amazon_reviews_us_Video_DVD_v1_00.tsv.gz"
PATH_AMAZON_VIDEO = "/content/drive/MyDrive/Solvimm/amazon_data/amazon_reviews_us_Video_v1_00.tsv.gz"

PATH_OUTPUT = "/content/drive/MyDrive/Solvimm" 

## Amazon

In [23]:
def amazon_transform_load():
  
  """ 
  Transform the amazon data. Create a spark dataframe and load the amazon data indo the Warehouse.
  :param : 
  :return: A spark dataframe with a amazon transform dataset.
  """
  
  amazon_video_v1 = spark.read.option("header","true").option("delimiter","\t").csv(PATH_AMAZON_DIGITAL_VIDEO)
  amazon_digital_video = spark.read.option("header","true").option("delimiter","\t").csv(PATH_AMAZON_VIDEO_DVD)
  amazon_video = spark.read.option("header","true").option("delimiter","\t").csv(PATH_AMAZON_VIDEO)

  amazon_rating_all = amazon_video_v1.union(amazon_digital_video)
  amazon_rating_all = amazon_rating_all.union(amazon_video)

  amazon_rating_resume = amazon_rating_all .drop(F.col("marketplace")).drop(F.col("customer_id"))\
                             .drop(F.col("review_id")).drop(F.col("product_id"))\
                             .drop(F.col("product_parent")).drop(F.col("product_category")) \
                             .drop(F.col("helpful_votes")).drop(F.col("total_votes")) \
                             .drop(F.col("vine")).drop(F.col("verified_purchase")) \
                             .drop(F.col("review_headline")).drop(F.col("review_body")) \
                             .drop(F.col("review_date")) 

  amazon_rating_resume  = amazon_rating_resume .withColumn("product_title", amazon_rating_resume.product_title.cast(StringType()))
  amazon_rating_resume  = amazon_rating_resume .withColumn("star_rating", amazon_rating_resume.star_rating.cast(IntegerType()))
 
  amazon_rating_resume.write.option("header","True").csv(PATH_OUTPUT+"/data_warehouse/solvimm.db/amazon_data", 'overwrite')
  
  return amazon_rating_resume  

## Netflix

In [24]:
def netflix_data_preprocessing ():

  """ 
  Make a pre proprocessing on netflix data to ajust the format. 
  :param db_file: 
  :return: A spark dataframe with the pre processed data.
  """

  
  # file to store all imported data
  if not os.path.isfile('/content/data.csv'):
      data = open('data.csv', mode='w')
      data.write( 'MovieIDs,CustomerIDs,Ratings,Dates')
      data.write('\n')
      
      files = [ PATH_NETFLIX_DATA_01,
                PATH_NETFLIX_DATA_02,
                PATH_NETFLIX_DATA_03,
                PATH_NETFLIX_DATA_04 ]

      # Remove the line with movie_id: and add a new column of movie_id
      # Combine all data files into a csv file
      for file in files:
        print("Opening file: {}".format(file))
        with open(file) as f:
          for line in f:
              line = line.strip()
              if line.endswith(':'):
                  movie_id = line.replace(':', '')
              else:
                  data.write(movie_id + ',' + line)
                  data.write('\n')
      data.close()
      print("Pre-processing is done")
    
  else:
    print("Data alreday pre_processed") 
  
  return

In [25]:
def netflix_transform_load():

  """ 
  Transform the netflix data. Create a spark dataframe and load the netflix data indo the Warehouse.
  :param : 
  :return: A spark dataframe with a amazon transform dataset.
  """

  netflix_data_preprocessing ()

  schema_rating = StructType([StructField("movie_ids",StringType(),True), 
                            StructField("customer_ids",StringType(),True),
                            StructField("ratings",IntegerType(),True), 
                            StructField("dates", DateType(), True) ])
  
  netflix_rating = (spark.read.option("header","true").option("delimiter",",")
                              .schema(schema_rating).csv("/content/data.csv"))
  
  netflix_rating.write.option("header","True").csv(PATH_OUTPUT+"/data_warehouse/solvimm.db/netflix_rating", 'overwrite')

  return netflix_rating

In [26]:
def netflix_transform_movies_titles ():

  """ 
  Transform the netflix movies. Create a spark dataframe and load the netflix movies data indo the Warehouse.
  :param : 
  :return: A spark dataframe with netflix movies transform dataset.
  """

  schema_movies = StructType([StructField("movie_id",StringType(),True), 
                            StructField("year_release",IntegerType(),True),
                            StructField("title",StringType(),True) ])
  
  netflix_movies = spark.read.option("delimiter",",").schema(schema_movies).csv(PATH_NETFLIX_MOVIES)
  netflix_movies.write.option("header","True").csv(PATH_OUTPUT+"/data_warehouse/solvimm.db/netflix_movie_titles", 'overwrite')

  return netflix_movies


# Main

In [27]:
if __name__ == "__main__":
    
    print(f"Oi ! Vamos iniciar o processo de ETL")
    print(f"É importante que todos dados iniciais estajam na pasta raiz")
    
    try:
      print(f"Iniciando o processo de transformação dos dados da Amazon")
      amazon_data = amazon_transform_load()
      print(f"Veja os dados Amazon abaixo. Estes dados já estão no Warehouse! Have fun!")
      amazon_data.show()
    except:
      print(f"Error so processar dados da Amazon")
    
    try:
      print(f"Iniciando o processo de transformação dos dados da Netflix")
      netflix_rating_data = netflix_transform_load()
      print(f"Veja os Rating Data da Netflix abaixo. Estes dados já estão no Warehouse! Have fun!")
      netflix_rating_data.show()
      print(f"Agora vamos caregar a lista de filmes da Netflix")
      netflix_movies = netflix_transform_movies_titles()
      print(f"Pronto! Veja abaixo a lista de fimes da Netflix. Estes dados já estão no Warehouse! Have fun!")
      netflix_movies.show()
    except:
      print(f"Error so processar dados da Netflix")


Oi ! Vamos iniciar o processo de ETL
É importante que todos dados iniciais estajam na pasta raiz
Iniciando o processo de transformação dos dados da Amazon
Veja os dados Amazon abaixo. Estes dados já estão no Warehouse! Have fun!
+--------------------+-----------+
|       product_title|star_rating|
+--------------------+-----------+
|Enlightened: Seas...|          5|
|             Vicious|          5|
|         After Words|          4|
|Masterpiece: Insp...|          5|
|   On The Waterfront|          5|
|Rick and Morty Se...|          5|
|      Africa Screams|          4|
| Entourage: Season 7|          3|
|Catastrophe - Sea...|          2|
|The Worricker Tri...|          3|
|Mr. Selfridge, Se...|          5|
|  Five Branded Women|          5|
|Downton Abbey Sea...|          4|
|    Extant, Season 1|          5|
|            Casanova|          1|
|Boardwalk Empire ...|          1|
|      Reunion Part 2|          5|
|Catastrophe - Sea...|          3|
|      Grimm Season 1|          4|
|

# Respostas das perguntas de negócios

## A.Quantos filmes estão disponíveis na Amazon?


### Vejamos algumas formos de responder essa pergunta de negocio usando alguns caminho Pysarpk  

In [28]:
amazon_data.show(3)

+--------------------+-----------+
|       product_title|star_rating|
+--------------------+-----------+
|Enlightened: Seas...|          5|
|             Vicious|          5|
|         After Words|          4|
+--------------------+-----------+
only showing top 3 rows



In [29]:
total_amazon = amazon_data.dropDuplicates(["product_title"]).count()

print( "O total de filmes na Amazon é {} .".format(total_amazon))

O total de filmes na Amazon é 407777 .


In [30]:
total_amazon = amazon_data.select("product_title").distinct().count()

print( "O total de filmes na Amazon é {} .".format(total_amazon))

O total de filmes na Amazon é 407777 .


#### Usando SQL QUEREY com a API SQL do Pyspark

In [31]:
amazon_data.createGlobalTempView("amazon_data")
spark.sql("SELECT Count(Distinct(product_title)) FROM global_temp.amazon_data").show()

+-----------------------------+
|count(DISTINCT product_title)|
+-----------------------------+
|                       407777|
+-----------------------------+



## B. Quantos filmes estão disponíveis na Netflix?

### Vejamos algumas formos de responder essa pergunta de negocio usando alguns caminho Pysarpk  

In [None]:
netflix_movies.show(3)

+--------+------------+--------------------+
|movie_id|year_release|               title|
+--------+------------+--------------------+
|       1|        2003|     Dinosaur Planet|
|       2|        2004|Isle of Man TT 20...|
|       3|        1997|           Character|
+--------+------------+--------------------+
only showing top 3 rows



In [None]:
netflix_movies.dropDuplicates(["movie_id"]).count()

total_netflix = netflix_movies.dropDuplicates(["movie_id"]).count()

print( "O total de filmes na Netflix é {}.".format(total_netflix))

O total de filmes na Netflix é 17770 .


#### Usando SQL QUEREY com a API SQL do Pyspark

In [None]:
#netflix_movies.createGlobalTempView("netflix_movies")
spark.sql("SELECT Count(Distinct(movie_id)) FROM global_temp.netflix_movies").show()

+------------------------+
|count(DISTINCT movie_id)|
+------------------------+
|                   17770|
+------------------------+



## C. Dos filmes disponíveis na Amazon, quantos % estão disponíveis na Netflix?

#### Vejamos algumas formos de responder essa pergunta de negocio usando alguns caminho Pysarpk

In [None]:
amazon_data.show(3)

+--------------------+-----------+
|       product_title|star_rating|
+--------------------+-----------+
|Enlightened: Seas...|          5|
|             Vicious|          5|
|         After Words|          4|
+--------------------+-----------+
only showing top 3 rows



In [None]:
amazon_title = amazon_data.select("product_title")

In [None]:
netflix_titles = netflix_movies.select("title")

In [None]:
film_in_both = netflix_titles.join(amazon_title.dropDuplicates(), [amazon_title.product_title == netflix_titles.title], how = "inner").count()
film_on_amazon = amazon_data.dropDuplicates(["product_title"]).count()

print( "O percentual dos filmes disponíveis na Amazon que estão disponiveis no Netflix é {:.2%}.".format(film_in_both/film_on_amazon))


O percentual dos filmes disponíveis na Amazon que estão disponiveis no Netflix é 2.86%.


#### Usando SQL QUEREY com a API SQL do Pyspark

In [None]:
# spark.sql(" SELECT ( Count(Distinct(movie_id))/SELECT( Count(Distinct(product_title)) ), FROM amazon_data ), FROM netflix_movies INNER JOIN amazon_data.product_title ON netflix_movies.title = global_temp.amazon_data.product_title ;" ).show()

print( "O percentual dos filmes disponíveis na Amazon que estão disponiveis no Netflix é {:.2%}.".format(film_in_both/film_on_amazon))

O percentual dos filmes disponíveis na Amazon que estão disponiveis no Netflix é 2.86%.


## D. O quão perto a médias das notas dos filmes disponíveis na Amazon está dos filmes disponíveis na Netflix?

### Vejamos algumas formos de responder essa pergunta de negocio usando alguns caminho Pysarpk

In [None]:
amazon_avg = amazon_data.agg(F.avg("star_rating")).toPandas().iloc[0,0]

In [None]:
netflix_avg = netflix_rating_data.agg(F.avg("ratings")).toPandas().iloc[0,0]

In [None]:
print( "A média das notas do filmes do Amazon é {:+.2} da media da Netflix.".format(amazon_avg - netflix_avg))

A média das notas do filmes do Amazon é +0.65 da media da Netflix.


#### Usando SQL QUEREY

In [None]:
diffrence = spark.sql(" SELECT AVG(star_rating) - (SELECT AVG(ratings) FROM global_temp.netflix_rating_data) as difference \
            FROM  global_temp.amazon_data ")

In [None]:
print( "O percentual dos filmes disponíveis na Amazon que estão disponiveis no Netflix é {:.2}.".format(amazon_avg - netflix_avg))

O percentual dos filmes disponíveis na Amazon que estão disponiveis no Netflix é 0.61.


## E. Qual ano de lançamento possui mais filmes na Amazon?

## F. Qual ano de lançamento possui mais filmes na Netflix?

### Vejamos algumas formos de responder essa pergunta de negocio usando alguns caminho Pysarpk

In [None]:
Ano = netflix_movies.groupBy("year_release").count().orderBy("count", ascending= False).toPandas().iloc[0,0]
print( "O ano com a maior quantidade lançamentos é {:^.0f}.".format(Ano))


In [None]:
spark.sql("SELECT year_release \
            FROM  global_temp. netflix_movies\
            GROUP BY year_release\
             ORDER BY  count(movie_id) DESC\
             limit 1" ).show()

+------------+
|year_release|
+------------+
|        2004|
+------------+



## G. Quais filmes que não estão disponíveis no catálogo da Netflix foram melhor avaliados (notas 4 e 5)?

### Vejamos algumas formos de responder essa pergunta de negocio usando alguns caminho Pysarpk

In [None]:
spark.sql("SELECT global_temp.amazon_data.product_title \
          FROM  global_temp.amazon_data\
          LEFT JOIN global_temp.netflix_movies\
          on global_temp.netflix_movies.title =  global_temp.amazon_data.product_title\
          Where global_temp.amazon_data.star_rating >=4").show()

+--------------------+
|       product_title|
+--------------------+
|The Night They Sa...|
|Hamlet / Kline, N...|
|Nascar Dual Power...|
|The Man From U.N....|
|Cabaret Balkan - ...|
|Wrinkles:in Need ...|
|Ladies Club,the [...|
|The Campitelli Ad...|
|Return of the Jed...|
|Texas Carnival [VHS]|
|National Geograph...|
| The Intruder Within|
|       Robbery [VHS]|
|Special Kids Spee...|
|Arizona Bushwhack...|
|101 Dalmatians [VHS]|
|Tony Little's ONE...|
|   Fever Pitch [VHS]|
|Jigsaw Murders [VHS]|
|Heroes Stand Alon...|
+--------------------+
only showing top 20 rows



## H. Quais filmes que não estão disponíveis no catálogo da Netflix foram melhor avaliados (notas 4 e 5)?


In [None]:
spark.sql("SELECT global_temp.amazon_data.product_title \
          FROM  global_temp.amazon_data\
          LEFT JOIN global_temp.netflix_movies\
          on global_temp.netflix_movies.title =  global_temp.amazon_data.product_title\
          Where global_temp.amazon_data.star_rating >=4").show()

+--------------------+
|       product_title|
+--------------------+
|Enlightened: Seas...|
|             Vicious|
|         After Words|
|Masterpiece: Insp...|
|   On The Waterfront|
|Rick and Morty Se...|
|      Africa Screams|
|Mr. Selfridge, Se...|
|  Five Branded Women|
|Downton Abbey Sea...|
|    Extant, Season 1|
|      Reunion Part 2|
|      Grimm Season 1|
|Catastrophe - Sea...|
|     Bedtime Stories|
|The Last Ship Sea...|
|The Ambassador - ...|
|      Bosch Season 1|
| Lucky Number Slevin|
|An American Girl:...|
+--------------------+
only showing top 20 rows

