In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
import pandas as pd
from pyspark.sql.functions import col
import os
from pyspark.sql.functions import lit
import pyspark.sql.functions as F
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession


In [2]:
# Config Spark
conf = pyspark.SparkConf().setAppName('5GAnalisys').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [3]:
# Execute if the csv with all Netflix reviews is not available
def make_netflix_csv(): # Read all txt file and store them in one big file
    files = ['./netflix_reviews/combined_data_1.txt','./netflix_reviews/combined_data_2.txt','./netflix_reviews/combined_data_3.txt',
            './netflix_reviews/combined_data_4.txt']
    if not os.path.isfile('netflix_reviews.csv'):
        data = open('netflix_reviews.csv', mode='w')

        row = list()
        for file in files:
            print('reading ratings from {}...'.format(file))
            with open(file) as f:
                for line in f:
                    del row[:]
                    line = line.strip()
                    if line.endswith(':'):
                        #all are rating
                        movid_id = line.replace(':', '')
                    else:
                        row = [x for x in line.split(',')]
                        row.insert(0, movid_id)
                        data.write(','.join(row))
                        data.write('\n')
            print('Done.\n')
        data.close()

# Using Spark

In [4]:
# Defining paths to files
path_netflix_reviews = 'netflix_reviews.csv'
path_netflix_movie_titles = 'netflix_reviews/movie_titles.csv'
print('creating the dataframe from data.csv file..') # Criando os dataframes
netflix_reviews = spark.read.csv(path_netflix_reviews).toDF('movie_id','user','rating','date')
netflix_movie_titles = spark.read.csv("netflix_reviews/movie_titles.csv",encoding = 'ISO-8859-1').toDF('movie_id', 'year_of_release', 'title')

creating the dataframe from data.csv file..


In [5]:
# Defining paths to files
path_amazon_digital_video = './amazon_reviews_us_Digital_Video_Download_v1_00/amazon_reviews_us_Digital_Video_Download_v1_00.tsv'
print('creating the dataframe from data.csv file..') # Criando os data frames
separator = '\t'
amazon_reviews = spark.read.csv(path_amazon_digital_video, sep = separator,header = True).toDF('marketplace', 'customer_id', 'review_id', 'product_id','product_parent', 'product_title', 'product_category', 'star_rating','helpful_votes', 'total_votes', 'vine', 'verified_purchase','review_headline', 'review_body', 'review_date')
print('Done.\n')

creating the dataframe from data.csv file..
Done.



In [6]:
def transform_netflix_data(netflix_reviews): # IN: Netflix reviews Dataframe. OUT: Netflix reviews transformed.
    netflix_reviews = netflix_reviews.join(netflix_movie_titles,netflix_movie_titles.movie_id == netflix_reviews.movie_id)
    netflix_reviews = netflix_reviews.toDF('product_id','user','star_rating','date','movie_id_2','year_of_release','product_title')
    netflix_reviews = netflix_reviews.drop(*['user','movie_id_2','date'])
    netflix_reviews = netflix_reviews.withColumn('company_review',lit('netflix'))
    return netflix_reviews

In [7]:
def transform_amazon_data(amazon_reviews): #IN: Amazon reviews dataframe. OUT: Amazon dataframe transformed.
    amazon_reviews = amazon_reviews.withColumn('company_review',lit('amazon'))
    amazon_reviews = amazon_reviews.drop(*['marketplace','customer_id','review_id','product_parent','review_date',
                                      'product_category','helpful_votes','total_votes','vine','verified_purchase',
                                      'review_headline','review_body'])
    amazon_reviews = amazon_reviews.withColumn('year_of_release',lit(None))
    return amazon_reviews

In [8]:
netflix_reviews_transformed = transform_netflix_data(netflix_reviews)
netflix_reviews_transformed.show(3)

+----------+-----------+---------------+---------------+--------------+
|product_id|star_rating|year_of_release|  product_title|company_review|
+----------+-----------+---------------+---------------+--------------+
|         1|          3|           2003|Dinosaur Planet|       netflix|
|         1|          5|           2003|Dinosaur Planet|       netflix|
|         1|          4|           2003|Dinosaur Planet|       netflix|
+----------+-----------+---------------+---------------+--------------+
only showing top 3 rows



In [9]:
amazon_reviews_transformed = transform_amazon_data(amazon_reviews)
amazon_reviews_transformed.show(3)

+----------+--------------------+-----------+--------------+---------------+
|product_id|       product_title|star_rating|company_review|year_of_release|
+----------+--------------------+-----------+--------------+---------------+
|B00AYB1482|Enlightened: Seas...|          5|        amazon|           null|
|B00KQD28OM|             Vicious|          5|        amazon|           null|
|B01489L5LQ|         After Words|          4|        amazon|           null|
+----------+--------------------+-----------+--------------+---------------+
only showing top 3 rows



In [10]:
def union_dataframes(netflix_reviews_transformed,amazon_reviews_transformed): #IN: Netflix and Amazon reviews transformed. OUT: Joined dataframe.
    all_reviews = netflix_reviews_transformed.union(amazon_reviews_transformed.select('product_id','star_rating','year_of_release','product_title','company_review'))
    all_reviews = all_reviews.withColumn("star_rating", col("star_rating").cast("int"))
    all_reviews = all_reviews.withColumn("year_of_release", col("year_of_release").cast("int"))
    return all_reviews

In [11]:
all_reviews = union_dataframes(netflix_reviews_transformed,amazon_reviews_transformed)
all_reviews.show(5)

+----------+-----------+---------------+---------------+--------------+
|product_id|star_rating|year_of_release|  product_title|company_review|
+----------+-----------+---------------+---------------+--------------+
|         1|          3|           2003|Dinosaur Planet|       netflix|
|         1|          5|           2003|Dinosaur Planet|       netflix|
|         1|          4|           2003|Dinosaur Planet|       netflix|
|         1|          4|           2003|Dinosaur Planet|       netflix|
|         1|          3|           2003|Dinosaur Planet|       netflix|
+----------+-----------+---------------+---------------+--------------+
only showing top 5 rows



In [12]:
try:
    all_reviews.write.csv("all_reviews.csv")
except:
    print("This csv file already exists")
try:
    all_reviews.write.parquet("all_reviews.parquet")
except:
    print("This parquet file already exists")


This csv file already exists
This parquet file already exists


# Queryes que respondem as perguntas de negócio

# Quantos filmes estão disponíveis na Amazon?

In [13]:
all_reviews.createOrReplaceTempView("all_reviews") # To use SQL statemnts

In [14]:
number_amazon_number = spark.sql("SELECT count(distinct product_id) from all_reviews WHERE company_review = 'amazon'").collect()[0][0]
print('A amazon dispõe de ' + str(number_amazon_number) + ' filmes.')

A amazon dispõe de 166748 filmes.


# Quantos filmes estão disponíveis na Netflix?

In [15]:
number_netflix_movie = spark.sql("SELECT count(distinct product_id) from all_reviews WHERE company_review = 'netflix'").collect()[0][0]
print('A netflix dispõe de ' + str(number_netflix_movie) + ' filmes.')

KeyboardInterrupt: 

# Dos filmes disponíveis na Amazon, quantos % estão disponíveis na Netflix?

In [None]:
number_amazon_movies_in_netflix = spark.sql("SELECT count(distinct product_title) from all_reviews t1 WHERE company_review='amazon' and EXISTS(SELECT product_title from all_reviews t2 where LOWER(t1.product_title)=LOWER(t2.product_title) and t2.company_review!=t1.company_review)").collect()

In [None]:
percentage = (number_amazon_movies_in_netflix[0][0] / number_amazon_number)*100

In [None]:
print('Dos filmes disponíveis na Amazon, '+ str(round(percentage,2))+'% estão disponíveis na Netflix')

# O quão perto a médias das notas dos filmes disponíveis na Amazon está dos filmes disponíveis na Netflix?

In [None]:
netflix_avg = spark.sql("SELECT avg(star_rating) from all_reviews WHERE company_review == 'netflix'").collect()

In [None]:
amazon_avg = spark.sql("SELECT avg(star_rating) from all_reviews WHERE company_review == 'amazon'").collect()

In [None]:
print('A média de notas dos filmes disponíveis na Amazon é ' + str(round(amazon_avg[0][0],3)) + ', e a média de notas dos filmes da Netflix é '+ str(round(netflix_avg[0][0],3)))

# Qual ano de lançamento possui mais filmes na Amazon?

In [None]:
# Não há este dado disponível no dataset disponibilizado

# Qual ano de lançamento possui mais filmes na Netflix?

In [None]:
year_max = spark.sql("SELECT year_of_release, count(year_of_release) from all_reviews GROUP BY year_of_release ORDER BY count(year_of_release) desc limit 1").collect()

In [None]:
print('O ano com mais lançamentos na netflix é ' + str(year_max[0][0]))

# Quais filmes que não estão disponíveis no catálogo da Netflix foram melhor avaliados (notas 4 e 5)?

In [None]:
best_movies_not_in_netflix = spark.sql("SELECT * from (select distinct(product_title) from all_reviews t1 WHERE company_review='amazon' and NOT EXISTS(select product_title from all_reviews t2 where LOWER(t1.product_title)=LOWER(t2.product_title) and t2.company_review!=t1.company_review and t1.company_review = 'amazon'))").collect()

In [None]:
len(best_movies_not_in_netflix)

In [None]:
with open("best_movies_not_in_netflix.txt", 'w',encoding="utf-8") as file:
        for element in best_movies_not_in_netflix:
            file.write(str(list(element))+'\n')

# Quais filmes que não estão disponíveis no catálogo da Amazon foram melhor avaliados (notas 4 e 5)?

In [None]:
    best_movies_not_in_amazon = spark.sql("SELECT * from (SELECT distinct(product_title) from all_reviews t1 WHERE company_review='netflix' and star_rating >= 4 and NOT EXISTS(SELECT product_title from all_reviews t2 WHERE LOWER(t1.product_title)=LOWER(t2.product_title) and t2.company_review!=t1.company_review and t1.company_review = 'netflix'))").collect()

In [None]:
len(best_movies_not_in_amazon)

In [None]:
with open("best_movies_not_in_amazon.txt", 'w',encoding="utf-8") as file:
        for element in best_movies_not_in_amazon:
            file.write(str(list(element))+'\n')