# Bibliotecas e variáveis

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
import os
import re

In [2]:
if __name__ == '__main__':
    scSpark = SparkSession \
        .builder \
        .appName('5Gflix') \
        .getOrCreate()

In [3]:
path_raw = '.\\raw'
path_refined = '.\\refined'
path_trusted = '.\\trusted'

# Importação e transformação de dados

## Netflix - Avaliações

Identificação dos nomes dos arquivos de avaliações do dataset

In [4]:
ds_netflix_file_list = []

for filename in os.listdir(path_raw + '\\netflix'):
    if (re.search('^combined_data.*.txt$', filename) is not None):
        ds_netflix_file_list.append(path_raw + '\\netflix' + '\\' + filename)

Agrupando os quatro arquivos do dataset em um, com a criação de uma coluna com o código do filme

In [5]:
%%time
if not os.path.isfile(path_refined + '\\netflix_rating.csv'):
    with open(path_refined + '\\netflix_rating.csv', 'w+') as combined_file:
        combined_file.write('movie_id,user_id,rating,date\n')
        for file_name in ds_netflix_file_list:
            with open(file_name, 'r') as file:
                for _, line in enumerate(file):
                    if ':' in line:
                        movie_id = line.replace(':', '').replace('\n', '')
                    else:
                        combined_file.write(movie_id + ',' + line)
            file.close()
        combined_file.close()

Wall time: 0 ns


Criação de DF de avaliação a partir do dataset

In [6]:
schema_netflix_rating = StructType([ \
    StructField('movie_id', IntegerType(), True),
    StructField('user_id', IntegerType(), True),
    StructField('rating', IntegerType(), True),
    StructField('date', DateType(), True)
])

In [7]:
df_netflix_rating = scSpark.read.csv(path_refined + '\\netflix_rating.csv',
    schema=schema_netflix_rating, header=True, sep=',').cache()
df_netflix_rating.show(10)

+--------+-------+------+----------+
|movie_id|user_id|rating|      date|
+--------+-------+------+----------+
|       1|1488844|     3|2005-09-06|
|       1| 822109|     5|2005-05-13|
|       1| 885013|     4|2005-10-19|
|       1|  30878|     4|2005-12-26|
|       1| 823519|     3|2004-05-03|
|       1| 893988|     3|2005-11-17|
|       1| 124105|     4|2004-08-05|
|       1|1248029|     3|2004-04-22|
|       1|1842128|     4|2004-05-09|
|       1|2238063|     3|2005-05-11|
+--------+-------+------+----------+
only showing top 10 rows



Removendo colunas desnecessárias para a análise

In [8]:
df_netflix_rating = df_netflix_rating.drop(col('user_id'))
df_netflix_rating = df_netflix_rating.drop(col('date'))

In [9]:
df_netflix_rating.show(10)

+--------+------+
|movie_id|rating|
+--------+------+
|       1|     3|
|       1|     5|
|       1|     4|
|       1|     4|
|       1|     3|
|       1|     3|
|       1|     4|
|       1|     3|
|       1|     4|
|       1|     3|
+--------+------+
only showing top 10 rows



In [10]:
df_netflix_rating.printSchema()

root
 |-- movie_id: integer (nullable = true)
 |-- rating: integer (nullable = true)



Criação de DF com avaliação média e quantidade de votos por filme

In [29]:
df_netflix_rating_avg = (df_netflix_rating
    .groupBy(col('movie_id'))
    .agg(round(avg('rating'),1).alias('rating_avg'),
    count('rating').alias('qty_rating'))
)

In [30]:
df_netflix_rating_avg.printSchema()

root
 |-- movie_id: integer (nullable = true)
 |-- rating_avg: double (nullable = true)
 |-- qty_rating: long (nullable = false)



In [31]:
df_netflix_rating_avg = df_netflix_rating_avg.orderBy(col('movie_id'))

In [32]:
df_netflix_rating_avg.show(10)

+--------+----------+----------+
|movie_id|rating_avg|qty_rating|
+--------+----------+----------+
|       1|       3.7|       547|
|       2|       3.6|       145|
|       3|       3.6|      2012|
|       4|       2.7|       142|
|       5|       3.9|      1140|
|       6|       3.1|      1019|
|       7|       2.1|        93|
|       8|       3.2|     14910|
|       9|       2.6|        95|
|      10|       3.2|       249|
+--------+----------+----------+
only showing top 10 rows



## Netflix - Catálogo

Criação de DF com informações de código, ano e título dos filmes do catálogo da Netflix

In [19]:
schema_netflix_catalog = StructType([ \
    StructField('movie_id', IntegerType(), False),
    StructField('year', IntegerType(), False),
    StructField('movie_title', StringType(), False)
])

In [20]:
df_netflix_catalog = scSpark.read.csv(path_raw + '\\netflix\\movie_titles.csv', schema=schema_netflix_catalog, sep=',')
df_netflix_catalog.show(10, truncate=False)

+--------+----+----------------------------+
|movie_id|year|movie_title                 |
+--------+----+----------------------------+
|1       |2003|Dinosaur Planet             |
|2       |2004|Isle of Man TT 2004 Review  |
|3       |1997|Character                   |
|4       |1994|Paula Abdul's Get Up & Dance|
|5       |2004|The Rise and Fall of ECW    |
|6       |1997|Sick                        |
|7       |1992|8 Man                       |
|8       |2004|What the #$*! Do We Know!?  |
|9       |1991|Class of Nuke 'Em High 2    |
|10      |2001|Fighter                     |
+--------+----+----------------------------+
only showing top 10 rows



Limpeza do campo título

In [21]:
# Alterando título para lower case
df_netflix_catalog = df_netflix_catalog.withColumn('movie_title', lower(col('movie_title')))

# Remoção de espaços duplos
df_netflix_catalog = df_netflix_catalog.withColumn('movie_title', regexp_replace('movie_title', '  ', ' '))

# Remoção de espaços no inicío e fim
df_netflix_catalog = df_netflix_catalog.withColumn('movie_title', trim(col('movie_title')))

In [22]:
df_netflix_catalog.show(10, truncate=False)

+--------+----+----------------------------+
|movie_id|year|movie_title                 |
+--------+----+----------------------------+
|1       |2003|dinosaur planet             |
|2       |2004|isle of man tt 2004 review  |
|3       |1997|character                   |
|4       |1994|paula abdul's get up & dance|
|5       |2004|the rise and fall of ecw    |
|6       |1997|sick                        |
|7       |1992|8 man                       |
|8       |2004|what the #$*! do we know!?  |
|9       |1991|class of nuke 'em high 2    |
|10      |2001|fighter                     |
+--------+----+----------------------------+
only showing top 10 rows



In [23]:
df_netflix_catalog.printSchema()

root
 |-- movie_id: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- movie_title: string (nullable = true)



Quantidade de filmes no catálogo da Netflix

In [24]:
df_netflix_catalog.count()

17770

## Combinando DFs de catálogo e avaliações

In [33]:
df_netflix = df_netflix_catalog.join(df_netflix_rating_avg, on='movie_id', how='left')

In [34]:
df_netflix.orderBy(col('movie_id')).show(10, truncate=False)

+--------+----+----------------------------+----------+----------+
|movie_id|year|movie_title                 |rating_avg|qty_rating|
+--------+----+----------------------------+----------+----------+
|1       |2003|dinosaur planet             |3.7       |547       |
|2       |2004|isle of man tt 2004 review  |3.6       |145       |
|3       |1997|character                   |3.6       |2012      |
|4       |1994|paula abdul's get up & dance|2.7       |142       |
|5       |2004|the rise and fall of ecw    |3.9       |1140      |
|6       |1997|sick                        |3.1       |1019      |
|7       |1992|8 man                       |2.1       |93        |
|8       |2004|what the #$*! do we know!?  |3.2       |14910     |
|9       |1991|class of nuke 'em high 2    |2.6       |95        |
|10      |2001|fighter                     |3.2       |249       |
+--------+----+----------------------------+----------+----------+
only showing top 10 rows



Adicionando coluna com informação da empresa no DF

In [35]:
df_netflix = df_netflix.withColumn('company',lit('Netflix'))

In [36]:
df_netflix.printSchema()

root
 |-- movie_id: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- rating_avg: double (nullable = true)
 |-- qty_rating: long (nullable = true)
 |-- company: string (nullable = false)



In [37]:
df_netflix.orderBy(col('movie_id')).show(10, truncate=False)

+--------+----+----------------------------+----------+----------+-------+
|movie_id|year|movie_title                 |rating_avg|qty_rating|company|
+--------+----+----------------------------+----------+----------+-------+
|1       |2003|dinosaur planet             |3.7       |547       |Netflix|
|2       |2004|isle of man tt 2004 review  |3.6       |145       |Netflix|
|3       |1997|character                   |3.6       |2012      |Netflix|
|4       |1994|paula abdul's get up & dance|2.7       |142       |Netflix|
|5       |2004|the rise and fall of ecw    |3.9       |1140      |Netflix|
|6       |1997|sick                        |3.1       |1019      |Netflix|
|7       |1992|8 man                       |2.1       |93        |Netflix|
|8       |2004|what the #$*! do we know!?  |3.2       |14910     |Netflix|
|9       |1991|class of nuke 'em high 2    |2.6       |95        |Netflix|
|10      |2001|fighter                     |3.2       |249       |Netflix|
+--------+----+----------

## Amazon

Criação de DF geral de dados a partir do dataset

In [38]:
ds_amazon_file_list = []

for filename in os.listdir(path_raw + '\\amazon'):
    if (re.search('^amazon_reviews_us_.*.tsv$', filename) is not None):
        ds_amazon_file_list.append(path_raw + '\\amazon' + '\\' + filename)

In [39]:
%%time
df_amazon_data = scSpark.read.csv(ds_amazon_file_list, inferSchema=True, header=True, sep=r'\t').select('product_id', 'product_title', 'star_rating', 'review_date')

Wall time: 7min 59s


In [40]:
df_amazon_data.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- review_date: timestamp (nullable = true)



In [42]:
df_amazon_data.select(col('product_id').alias('movie_id')\
,col('product_title').alias('movie_title')\
,col('star_rating').alias('rating')\
,col('review_date')
)

DataFrame[movie_id: string, movie_title: string, rating: int, review_date: timestamp]

In [43]:
df_amazon_data.show(10)

+----------+--------------------+-----------+-------------------+
|product_id|       product_title|star_rating|        review_date|
+----------+--------------------+-----------+-------------------+
|B00AYB1482|Enlightened: Seas...|          5|2015-08-31 00:00:00|
|B00KQD28OM|             Vicious|          5|2015-08-31 00:00:00|
|B01489L5LQ|         After Words|          4|2015-08-31 00:00:00|
|B008LOVIIK|Masterpiece: Insp...|          5|2015-08-31 00:00:00|
|B0094LZMT0|   On The Waterfront|          5|2015-08-31 00:00:00|
|B0112OSOQE|Rick and Morty Se...|          5|2015-08-31 00:00:00|
|B000NPE5SA|      Africa Screams|          4|2015-08-31 00:00:00|
|B00XWV4QXG| Entourage: Season 7|          3|2015-08-31 00:00:00|
|B00X8UKOUK|Catastrophe - Sea...|          2|2015-08-31 00:00:00|
|B00OOKXTFU|The Worricker Tri...|          3|2015-08-31 00:00:00|
+----------+--------------------+-----------+-------------------+
only showing top 10 rows



### Ano de lançamento

Estimando o ano de lançamento do filme com base no voto mais antigo

In [44]:
df_amazon_year = df_amazon_data.select(col('product_id').alias('movie_id'),col('review_date'))\
    .filter(df_amazon_data.review_date. isNotNull())\
    .orderBy(col('review_date'))

In [45]:
df_amazon_year = df_amazon_year.groupBy('movie_id')\
    .agg(first('review_date').alias('year'))\
    .withColumn('year', year('year'))

In [46]:
df_amazon_year.show(10)

+----------+----+
|  movie_id|year|
+----------+----+
|6300181251|1997|
|6304873530|1998|
|6302917034|1998|
|6304492405|1998|
|6304684495|1998|
|6303471463|1998|
|6302227135|1998|
|6301334345|1998|
|6303402666|1998|
|6305123667|1998|
+----------+----+
only showing top 10 rows



### Código e nome dos filmes

Criação de DF com informações do código e título dos filmes

In [47]:
df_amazon_title = df_amazon_data.select(col('product_id').alias('movie_id'), col('product_title').alias('movie_title'))

In [48]:
df_amazon_title.show(10)

+----------+--------------------+
|  movie_id|         movie_title|
+----------+--------------------+
|B00AYB1482|Enlightened: Seas...|
|B00KQD28OM|             Vicious|
|B01489L5LQ|         After Words|
|B008LOVIIK|Masterpiece: Insp...|
|B0094LZMT0|   On The Waterfront|
|B0112OSOQE|Rick and Morty Se...|
|B000NPE5SA|      Africa Screams|
|B00XWV4QXG| Entourage: Season 7|
|B00X8UKOUK|Catastrophe - Sea...|
|B00OOKXTFU|The Worricker Tri...|
+----------+--------------------+
only showing top 10 rows



In [49]:
df_amazon_title = df_amazon_title.groupBy('movie_id').agg(first('movie_title').alias('movie_title'))

In [50]:
df_amazon_title.show(10)

+----------+--------------------+
|  movie_id|         movie_title|
+----------+--------------------+
|0000143502|Rise and Swine (G...|
|0000143529|My Fair Pastry (G...|
|000014357X|Everyday Italian ...|
|0000791156|Spirit Led—Moving...|
|0001499572|At Home with the ...|
|0001527665|   Peace Child [VHS]|
|0005000009|Where Jesus Walke...|
|0005022290|Live! The Young M...|
|0005041104|Camp Harmony (Rob...|
|0005048524|Dinosaurs and the...|
+----------+--------------------+
only showing top 10 rows



Limpeza do campo título

In [51]:
# Alterando título para lower case
df_amazon_title = df_amazon_title.withColumn('movie_title', lower(col('movie_title')))

# Remoção de espaços duplos
df_amazon_title = df_amazon_title.withColumn('movie_title', regexp_replace('movie_title', '  ', ' '))

# Remoção de espaços no inicío e fim
df_amazon_title = df_amazon_title.withColumn('movie_title', trim(col('movie_title')))

In [52]:
df_amazon_title.show(10, truncate=False)

+----------+----------------------------------------------------------------------------------------------------+
|movie_id  |movie_title                                                                                         |
+----------+----------------------------------------------------------------------------------------------------+
|0000143502|rise and swine (good eats vol. 7)                                                                   |
|0000143529|my fair pastry (good eats vol. 9)                                                                   |
|000014357X|everyday italian with giada de laurentis v2: anytime italian                                        |
|0000791156|spirit led—moving by grace in the holy spirit's gifts                                               |
|0001499572|at home with the guitar [vhs]                                                                       |
|0001527665|peace child [vhs]                                                           

### Avaliação por filme

Criação de DF com código e avaliação

In [53]:
df_amazon_rating = df_amazon_data.select(col('product_id').alias('movie_id'), col('star_rating').alias('rating'))

In [54]:
df_amazon_rating.show(10)

+----------+------+
|  movie_id|rating|
+----------+------+
|B00AYB1482|     5|
|B00KQD28OM|     5|
|B01489L5LQ|     4|
|B008LOVIIK|     5|
|B0094LZMT0|     5|
|B0112OSOQE|     5|
|B000NPE5SA|     4|
|B00XWV4QXG|     3|
|B00X8UKOUK|     2|
|B00OOKXTFU|     3|
+----------+------+
only showing top 10 rows



Criação de DF com avaliação média e quantidade de votos por filme

In [55]:
df_amazon_rating_avg = (df_amazon_rating
    .groupBy(col('movie_id'))
    .agg(round(avg('rating'),1).alias('rating_avg'),
    count('rating').alias('qty_rating'))
)

In [56]:
df_amazon_rating_avg.show(10)

+----------+----------+----------+
|  movie_id|rating_avg|qty_rating|
+----------+----------+----------+
|B002QUKA84|       4.0|         3|
|B0035LFOGG|       4.4|        68|
|B002XZHDJG|       4.8|        41|
|B00UBRIHU2|       3.5|       269|
|B001ACPEBC|       4.8|       642|
|B00PRX640A|       4.1|       258|
|B00ECSMSXI|       4.5|       268|
|B000K28QEK|       3.8|        55|
|B0088W6Y7K|       4.8|       113|
|B009ZQA212|       4.3|        61|
+----------+----------+----------+
only showing top 10 rows



## Combinando DFs de nomes, ano e avaliações

In [57]:
df_amazon = df_amazon_title.join(df_amazon_year, on='movie_id', how='left')\
    .join(df_amazon_rating_avg, on='movie_id', how='left')

Reordenando as colunas para manter padronização

In [59]:
df_amazon = df_amazon.select('movie_id', 'year', 'movie_title', 'rating_avg', 'qty_rating')

Adicionando coluna com informação da empresa no DF

In [61]:
df_amazon = df_amazon.withColumn('company',lit('Amazon'))

In [62]:
df_amazon.show(10)

+----------+----+--------------------+----------+----------+-------+
|  movie_id|year|         movie_title|rating_avg|qty_rating|company|
+----------+----+--------------------+----------+----------+-------+
|0000143502|2013|rise and swine (g...|       5.0|         1| Amazon|
|0000143529|2013|my fair pastry (g...|       5.0|         1| Amazon|
|000014357X|2013|everyday italian ...|       4.8|         5| Amazon|
|0000791156|2014|spirit led—moving...|       5.0|         2| Amazon|
|0001499572|2015|at home with the ...|       5.0|         1| Amazon|
|0001527665|2013|   peace child [vhs]|       5.0|         1| Amazon|
|0005000009|2013|where jesus walke...|       1.0|         1| Amazon|
|0005022290|2015|live! the young m...|       5.0|         1| Amazon|
|0005041104|2009|camp harmony (rob...|       5.0|         1| Amazon|
|0005048524|2011|dinosaurs and the...|       3.5|         2| Amazon|
+----------+----+--------------------+----------+----------+-------+
only showing top 10 rows



In [63]:
df_amazon.printSchema()

root
 |-- movie_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- rating_avg: double (nullable = true)
 |-- qty_rating: long (nullable = true)
 |-- company: string (nullable = false)



# Carregamento dos dados

Armazenando DF Netflix na camada Trusted

In [64]:
df_netflix.write.option('header', 'true').format('csv').mode('overwrite').save(path_trusted + '\\df_netflix_data')

Wall time: 26.7 s


Armazenando DF Amazon na camada Trusted

In [65]:
df_amazon.write.option('header', 'true').format('csv').mode('overwrite').save(path_trusted + '\\df_amazon_data')

Wall time: 31min 31s


Combinando os DFs Amazon e Netflix e armazenando na camada Trusted

In [66]:
df_complete = df_netflix.unionByName(df_amazon)

In [None]:
df_complete.write.option('header', 'true').mode('overwrite').parquet(path_trusted + '\\parquet_complete_data') 

# Consultas SQL

In [None]:
parquet_file = scSpark.read.parquet(path_trusted + '\\parquet_complete_data')

In [None]:
parquet_file.createOrReplaceTempView('parquet_complete')

In [None]:
scSpark.sql('SELECT * FROM parquet_complete').show(10)

## Quantos filmes estão disponíveis na Amazon?

In [None]:
sql_amazon_qty = """
SELECT COUNT(movie_title) AS qty_movies_amazon
FROM parquet_complete
WHERE company = 'Amazon'
"""

In [None]:
scSpark.sql(sql_amazon_qty).show()

## Quantos filmes estão disponíveis na Netflix?

In [None]:
sql_netflix_qty = """
SELECT COUNT(movie_title) AS qty_movies_netflix
FROM parquet_complete
WHERE company = 'Netflix'
"""

In [None]:
scSpark.sql(sql_netflix_qty).show()

## Dos filmes disponíveis na Amazon, quantos % estão disponíveis na Netflix?

In [None]:
sql_amazon_in_netflix = """
SELECT num.qty_amazon_in_netflix / den.qty_amazon
FROM
  (SELECT count(*) AS qty_amazon_in_netflix
   FROM
     (SELECT movie_title AS movie_title_amazon
      FROM parquet_complete
      WHERE company = 'Amazon') AS amazon
   LEFT JOIN
     (SELECT movie_title AS movie_title_netflix
      FROM parquet_complete
      WHERE company = 'Netflix') AS netflix ON (amazon.movie_title_amazon = netflix.movie_title_netflix)
   WHERE movie_title_netflix IS NOT NULL ) AS num
JOIN
  (SELECT COUNT(movie_title) AS qty_amazon
   FROM parquet_complete
   WHERE company = 'Amazon' ) AS den ON 1=1
"""

In [None]:
scSpark.sql(sql_amazon_in_netflix).show()

## O quão perto a médias das notas dos filmes disponíveis na Amazon está dos filmes disponíveis na Netflix?

In [None]:
sql_avg_rating = """
SELECT company,
       ROUND(AVG(rating_avg), 2) AS rating_avg
FROM parquet_complete
GROUP BY company
"""

In [None]:
scSpark.sql(sql_avg_rating).show(5)

## Qual ano de lançamento possui mais filmes na Amazon?

In [None]:
sql_amazon_biggest_year = """
SELECT year,
       count(*) AS qty
FROM parquet_complete
WHERE company = 'Amazon'
GROUP BY year
ORDER BY qty DESC
LIMIT 1
"""

In [None]:
scSpark.sql(sql_amazon_biggest_year).show()

## Qual ano de lançamento possui mais filmes na Netflix?

In [None]:
sql_netflix_biggest_year = """
SELECT year,
       count(*) AS qty
FROM parquet_complete
WHERE company = 'Netflix'
GROUP BY year
ORDER BY qty DESC
LIMIT 1
"""

In [None]:
scSpark.sql(sql_netflix_biggest_year).show()

## Quais filmes que não estão disponíveis no catálogo da Netflix foram melhor avaliados (notas 4 e 5)?

In [None]:
sql_best_rating_not_in_netflix = """
SELECT amazon.movie_title,
       amazon.rating_avg
FROM
  (SELECT movie_title,
          rating_avg
   FROM parquet_complete
   WHERE company = 'Amazon') AS amazon
LEFT JOIN
  (SELECT movie_title,
          rating_avg
   FROM parquet_complete
   WHERE company = 'Netflix') AS netflix ON (amazon.movie_title = netflix.movie_title)
WHERE amazon.rating_avg >=4
  AND netflix.movie_title IS NULL
"""

In [None]:
scSpark.sql(sql_best_rating_not_in_netflix).show(10)

In [None]:
sql_best_rating_not_in_netflix_qty = """
SELECT count(*) AS qty
FROM
  (SELECT movie_title,
          rating_avg
   FROM parquet_complete
   WHERE company = 'Amazon') AS amazon
LEFT JOIN
  (SELECT movie_title,
          rating_avg
   FROM parquet_complete
   WHERE company = 'Netflix') AS netflix ON (amazon.movie_title = netflix.movie_title)
WHERE amazon.rating_avg >=4
  AND netflix.movie_title IS NULL
"""

In [None]:
scSpark.sql(sql_best_rating_not_in_netflix_qty).show()

## Quais filmes que não estão disponíveis no catálogo da Amazon foram melhor avaliados (notas 4 e 5)?

In [None]:
sql_best_rating_not_in_amazon = """
SELECT netflix.movie_title,
       netflix.rating_avg
FROM
  (SELECT movie_title,
          rating_avg
   FROM parquet_complete
   WHERE company = 'Netflix') AS netflix
LEFT JOIN
  (SELECT movie_title,
          rating_avg
   FROM parquet_complete
   WHERE company = 'Amazon') AS amazon ON (netflix.movie_title = amazon.movie_title)
WHERE netflix.rating_avg >=4
  AND amazon.movie_title IS NULL
"""

In [None]:
scSpark.sql(sql_best_rating_not_in_amazon).show(10)

In [None]:
sql_best_rating_not_in_amazon_qty = """
SELECT count(*) as qty
FROM
  (SELECT movie_title,
          rating_avg
   FROM parquet_complete
   WHERE company = 'Netflix') AS netflix
LEFT JOIN
  (SELECT movie_title,
          rating_avg
   FROM parquet_complete
   WHERE company = 'Amazon') AS amazon ON (netflix.movie_title = amazon.movie_title)
WHERE netflix.rating_avg >=4
  AND amazon.movie_title IS NULL
"""

In [None]:
scSpark.sql(sql_best_rating_not_in_amazon_qty).show()