# Exemplo projeto PySpark

## Importar bibliotecas necessárias

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

## Abrir sessão Spark

In [2]:
spark = SparkSession.builder.appName("MeuApp").getOrCreate()

your 131072x1 screen size is bogus. expect trouble
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/03 15:55:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/05/03 15:56:00 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Funções úteis

In [3]:
def get_name_source(col):
    return F.when(F.lower(F.col(col)).contains("netflix"), "Netflix")\
        .when(F.lower(F.col(col)).contains("amazon_prime"), "Amazon Prime")\
        .when(F.lower(F.col(col)).contains("disney_plus"), "Disney Plus")\
        .otherwise(F.col(col))

## Extração - Lendo os arquivos de origem

In [4]:
df_titles = spark.read.option("delimiter", ",").option("header", True).csv("sources/*.csv")
df_titles = df_titles.withColumn("source", F.input_file_name())
df_titles = df_titles.withColumn("id_title", F.md5(F.col("title")))
df_titles.show(truncate=False)

                                                                                

+-------+-------+---------------------------------+---------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+--------------+------------+------+--------+---------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------+--------------------------------+
|show_id|type   |titl

## Extração - Preparando melhor os dados que foram lidos

In [5]:
df_final = df_titles.withColumn("source", get_name_source("source"))
df_final = df_final.select("id_title", "show_id", "source", "title", "type", "director", "country")
df_final.show(truncate=False)

+--------------------------------+-------+------------+---------------------------------+-------+---------------------------+--------------+
|id_title                        |show_id|source      |title                            |type   |director                   |country       |
+--------------------------------+-------+------------+---------------------------------+-------+---------------------------+--------------+
|0c5f3999f04d96be0dd4c124b22a693e|s1     |Amazon Prime|The Grand Seduction              |Movie  |Don McKellar               |Canada        |
|c40efb117bd151f189667b486996adb1|s2     |Amazon Prime|Take Care Good Night             |Movie  |Girish Joshi               |India         |
|c02c2d01fc26c302e46ae5a1d9599a64|s3     |Amazon Prime|Secrets of Deception             |Movie  |Josh Webber                |United States |
|47d18e7be0669e6c5dc270c3db5484cb|s4     |Amazon Prime|Pink: Staying True               |Movie  |Sonia Anderson             |United States |
|9e7dfa4faecc

In [6]:
print("Total items:", df_final.count())

Total items: 19932


## Transformação - Criando algumas visões analíticas

In [7]:
df_sources = df_final.groupBy("source").agg(F.count(F.col("source")).alias("total"))
df_sources.show(truncate=False)

+------------+-----+
|source      |total|
+------------+-----+
|Amazon Prime|9674 |
|Netflix     |8808 |
|Disney Plus |1450 |
+------------+-----+



In [10]:
df_type = df_final.groupBy("source", "type").agg(F.count(F.col("type")).alias("total"))
df_type = df_type.na.drop()
df_type.show(truncate=False)

+------------+-------+-----+
|source      |type   |total|
+------------+-------+-----+
|Amazon Prime|TV Show|1854 |
|Amazon Prime|Movie  |7814 |
|Netflix     |TV Show|2676 |
|Netflix     |Movie  |6131 |
|Disney Plus |TV Show|398  |
|Disney Plus |Movie  |1052 |
+------------+-------+-----+



In [9]:
df_filter = df_final.filter((F.col("title") == "Zombie Dumb"))
df_filter.show(truncate=False)

+--------------------------------+-------+-------+-----------+-------+--------+-------+
|id_title                        |show_id|source |title      |type   |director|country|
+--------------------------------+-------+-------+-----------+-------+--------+-------+
|e02a55ecc08c5d3780b4c593af41714c|s8804  |Netflix|Zombie Dumb|TV Show|NULL    |NULL   |
+--------------------------------+-------+-------+-----------+-------+--------+-------+

