#### Mini-projeto de análise no Glue

In [None]:
# Configurações
import sys
import pandas as pd
import awswrangler as wr
import boto3

# FROMs
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

# Meus Imports
from pyspark.sql.functions import col, lit
from datetime import datetime
from datetime import datetime, date, timedelta

# Inicializar o SparkContext e o GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

# Parâmetros do Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
job.init(args['JOB_NAME'], args)

In [23]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd

database_name = 'projetos_jony'
table_name = 'titulos_filmes_existentes'

# Base com todos os titulos existentes
df_titles = glueContext.create_dynamic_frame.from_catalog(
    database = database_name, 
    table_name = table_name,
    push_down_predicate = "ANOMES = 202407"
)

df_titles.toDF()

df_selec = df_titles.select('primaryTitle', 'isAdult', 'startYear', 'genres', 'runtimeMinutes', 'titleType')
df_selec.show()

+--------------------+-------+---------+--------------------+--------------+---------+
|        primaryTitle|isAdult|startYear|              genres|runtimeMinutes|titleType|
+--------------------+-------+---------+--------------------+--------------+---------+
|          Carmencita|      0|     1894|   Documentary,Short|             1|    short|
|Le clown et ses c...|      0|     1892|     Animation,Short|             5|    short|
|      Pauvre Pierrot|      0|     1892|Animation,Comedy,...|             4|    short|
|         Un bon bock|      0|     1892|     Animation,Short|            12|    short|
|    Blacksmith Scene|      0|     1893|        Comedy,Short|             1|    short|
|   Chinese Opium Den|      0|     1894|               Short|             1|    short|
|Corbett and Court...|      0|     1894|         Short,Sport|             1|    short|
|Edison Kinetoscop...|      0|     1894|   Documentary,Short|             1|    short|
|          Miss Jerry|      0|     1894|   

In [10]:
# df_netflix = spark.read.load(r'C:\Users\joth1\Documents\z_projetinho_pyspark\netflix_titles.csv',     header=True,  inferSchema=True,    sep=',',    format='csv')


database_name = 'projetos_jony'
table_name = 'netflix_titles'

# Base com todos os titulos netflix
df_netflix = glueContext.create_dynamic_frame.from_catalog(
    database = database_name, 
    table_name = table_name,
    push_down_predicate = "ANOMES = 202407"
)
df_netflix.toDF()


df_netflix = df_netflix.select('type', 'title', 'director','date_added', 'release_year', 'duration')
df_netflix.show()


# Pega o número total de filmes no catalogo de dados da netflix
numero = df_netflix.count()
display(numero)

+-------+--------------------+--------------------+------------------+------------+---------+
|   type|               title|            director|        date_added|release_year| duration|
+-------+--------------------+--------------------+------------------+------------+---------+
|  Movie|Dick Johnson Is Dead|     Kirsten Johnson|September 25, 2021|        2020|   90 min|
|TV Show|       Blood & Water|                NULL|September 24, 2021|        2021|2 Seasons|
|TV Show|           Ganglands|     Julien Leclercq|September 24, 2021|        2021| 1 Season|
|TV Show|Jailbirds New Orl...|                NULL|September 24, 2021|        2021| 1 Season|
|TV Show|        Kota Factory|                NULL|September 24, 2021|        2021|2 Seasons|
|TV Show|       Midnight Mass|       Mike Flanagan|September 24, 2021|        2021| 1 Season|
|  Movie|My Little Pony: A...|Robert Cullen, Jo...|September 24, 2021|        2021|   91 min|
|  Movie|             Sankofa|        Haile Gerima|September

8809

In [22]:
database_name = 'projetos_jony'
table_name = 'idmb_filmes'

df_imdb = glueContext.create_dynamic_frame.from_catalog(
    database = database_name, 
    table_name = table_name,
    push_down_predicate = "ANOMES = 202407"
)
df_imdb.toDF()


df_imdb = df_imdb.select('genre', 'Series_Title', 'director', 'No_of_Votes', 'Gross', 'Runtime', 'Meta_score', 'IMDB_Rating')
# df_imdb.show()

from pyspark.sql.functions import regexp_replace, col
from pyspark.sql.types import DoubleType

df_imdb = df_imdb.withColumn('Gross', regexp_replace('Gross',',', ''))
df_imdb = df_imdb.withColumn('Gross', col('Gross').cast(DoubleType()))
df_imdb.show()

+--------------------+--------------------+--------------------+-----------+------------+-------+----------+-----------+
|               genre|        Series_Title|            director|No_of_Votes|       Gross|Runtime|Meta_score|IMDB_Rating|
+--------------------+--------------------+--------------------+-----------+------------+-------+----------+-----------+
|               Drama|The Shawshank Red...|      Frank Darabont|    2343110| 2.8341469E7|142 min|        80|        9.3|
|        Crime, Drama|       The Godfather|Francis Ford Coppola|    1620367|1.34966411E8|175 min|       100|        9.2|
|Action, Crime, Drama|     The Dark Knight|   Christopher Nolan|    2303232|5.34858444E8|152 min|        84|        9.0|
|        Crime, Drama|The Godfather: Pa...|Francis Ford Coppola|    1129952|      5.73E7|202 min|        90|        9.0|
|        Crime, Drama|        12 Angry Men|        Sidney Lumet|     689845|   4360000.0| 96 min|        96|        9.0|
|Action, Adventure...|The Lord o

In [41]:
df_imdb.createOrReplaceTempView('dados_imdb')
df_netflix.createOrReplaceTempView('netflix')
df_titles.createOrReplaceTempView('titulos')



# Pega somentos os titulos que existem na netflix e estão cadastrados no catalogo de filmes certificados
df = spark.sql("""
SELECT 
          A.*
          ,B.*
FROM NETFLIX AS A 
INNER JOIN titulos AS B ON B.PRIMARYTITLE = A.TITLE
""")
# df.show()
df = df.dropDuplicates(['title'])
df.createOrReplaceTempView('tabela_final')


df_final = spark.sql(
"""
SELECT
    TRIM(A.PRIMARYTITLE) AS TITULO
    ,TRIM(A.DIRECTOR) AS DIRETOR
    -- ,TRIM(A.COUNTRY) AS PAIS
    -- ,A.DATE_ADDED AS DATA_QUE_FOI_ADICIONADO
    ,A.release_year AS DATA_LANCAMENTO
    ,A.duration
    ,A.titleType
    ,TRIM(A.genres) AS GENERO

    
    ,B.IMDB_RATING
    ,B.META_SCORE
    ,B.NO_OF_VOTES AS QTD_AVALIACOES
    ,CAST(B.GROSS AS DECIMAL(18, 2)) AS RECEITA_FILME
    ,CAST(B.GROSS AS BIGINT)/1000000 AS RECEITA_EM_MM

FROM tabela_final AS A 
INNER JOIN DADOS_IMDB AS B ON B.SERIES_TITLE = A.TITLE
ORDER BY IMDB_RATING DESC, NO_OF_VOTES DESC
""")
df_final.show()
numero = df_final.count()
display(numero)

+--------------------+--------------------+---------------+--------+---------+--------------------+-----------+----------+--------------+-------------+-------------+
|              TITULO|             DIRETOR|DATA_LANCAMENTO|duration|titleType|              GENERO|IMDB_RATING|META_SCORE|QTD_AVALIACOES|RECEITA_FILME|RECEITA_EM_MM|
+--------------------+--------------------+---------------+--------+---------+--------------------+-----------+----------+--------------+-------------+-------------+
|        Pulp Fiction|   Quentin Tarantino|           1994| 154 min|    movie|         Crime,Drama|        8.9|        94|       1826188| 107928762.00|   107.928762|
|The Lord of the R...|       Peter Jackson|           2003| 201 min|    movie|Action,Adventure,...|        8.9|        94|       1642758| 377845905.00|   377.845905|
|    Schindler's List|    Steven Spielberg|           1993| 195 min|    movie|Biography,Drama,H...|        8.9|        94|       1213505|  96898818.00|    96.898818|
|   

172

### Salva a base no catalogo de dados do Glue

In [None]:
hoje = datetime.today()
hoje_formatado = hoje.strftime('%Y%m%d')

print(hoje_formatado)

# Insere a coluna com partição dentro do dataframe
df = df.withColumn('anomesdia', lit(hoje_formatado))


# Converte o DataFrame para um DynamicFrame
dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")

# Especifica o nome do database e tabela no Glue Catalog
database_name = "projetos_jony"
table_name = "tb_analise_netflix"

# Caminho do S3 onde os dados serão salvos
s3_path = "s3://aprendendo_glue/jony/"

# Configura o GlueSink para gravar no catálogo do Glue
sink = glueContext.getSink(
    connection_type="s3",
    path=s3_path,
    enableUpdateCatalog=True,
    updateBehavior="UPDATE_IN_DATABASE",
    partitionKeys=["anomesdia"]
)

sink.setFormat("glueparquet")
sink.setCatalogInfo(catalogDatabase=database_name,catalogTableName=table_name)

# Grava o DynamicFrame no S3 e atualiza o catálogo do Glue
sink.writeFrame(dynamic_frame)

# logging.info('Processo Finalizado')