In [5]:
%idle_timeout 60
%glue_version 3.0
%worker_type G.1X
%number_of_workers 2

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.38.1 
Current idle_timeout is 2800 minutes.
idle_timeout has been set to 60 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 2
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::965478153338:role/service-role/AWSGlueServiceRole
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 2
Session ID: ef86d98e-c77d-4e47-beef-9e9b0b64ced5
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.38.1
--enable-glue-datacatalog true


Exception encountered while creating session: An error occurred (AlreadyExistsException) when calling the CreateSession operation: Session already created, sessionId=ef86d98e-c77d-4e47-beef-9e9b0b64ced5 
Traceback (most recent call last):
  File "/home/jupyter-user/.local/lib/python3.9/site-packages/aws_glue_interactive_sessions_kernel/glue_pyspark/GlueKernel.py", line 293, in do_execute
    self.create_session()
  File "/home/jupyter-user/.local/lib/python3.9/site-packages/aws_glue_interactive_sessions_kernel/glue_pyspark/GlueKernel.py", line 800, in create_session
    response = self.glue_client.create_session(
  File "/home/jupyter-user/.local/lib/python3.9/site-packages/botocore/client.py", line 534, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/home/jupyter-user/.local/lib/python3.9/site-packages/botocore/client.py", line 976, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.errorfactory.AlreadyExistsException: An err

In [5]:
from pyspark.sql.functions import col, expr
from pyspark.sql.functions import when, substring, lower, sum, count
from awsglue.dynamicframe import DynamicFrame

session_id=ef86d98e-c77d-4e47-beef-9e9b0b64ced5 has reached TIMEOUT status. 
Please re-run the same cell to restart the session. You may also need to re-run previous cells if trying to use pre-defined variables.


In [3]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType




In [4]:
# criando um DynamicFrame a partir da tabela dos dados no AWS Glue Data Catalog e exibindo seu esquema
dyf = glueContext.create_dynamic_frame.from_catalog(database='movies-raw-data', table_name='csv')
dyf.printSchema()

root
|-- adult: choice
|    |-- boolean
|    |-- string
|-- backdrop_path: string
|-- genre_ids: string
|-- id: choice
|    |-- long
|    |-- string
|-- original_language: string
|-- original_title: string
|-- overview: string
|-- popularity: double
|-- poster_path: string
|-- release_date: string
|-- title: string
|-- video: boolean
|-- vote_average: double
|-- vote_count: long


In [5]:
df_tmdb_crime = dyf.toDF()
df_tmdb_crime.show()

+-------------+--------------------+--------------------+---------------+-----------------+--------------------+--------------------+----------+--------------------+------------+--------------------+-----+------------+----------+
|        adult|       backdrop_path|           genre_ids|             id|original_language|      original_title|            overview|popularity|         poster_path|release_date|               title|video|vote_average|vote_count|
+-------------+--------------------+--------------------+---------------+-----------------+--------------------+--------------------+----------+--------------------+------------+--------------------+-----+------------+----------+
|{false, null}|                    |                [80]|{1170682, null}|               en|Convicts Two and Sex|Skin flick about ...|       0.6|/p8Xy4LgPHPQyE1ZH...|  1970-01-01|Convicts Two and Sex|false|         0.0|         0|
|{false, null}|                    |        [18, 80, 12]| {967070, null}|       

In [6]:
# Lista de nomes das colunas para remover
colunas_remover = ['adult','backdrop_path','poster_path','original_title','video']

# método drop para remover as colunas
df_tmdb_crime = df_tmdb_crime.drop(*colunas_remover)




In [7]:
df_tmdb_crime.show()

+--------------------+---------------+-----------------+--------------------+----------+------------+--------------------+------------+----------+
|           genre_ids|             id|original_language|            overview|popularity|release_date|               title|vote_average|vote_count|
+--------------------+---------------+-----------------+--------------------+----------+------------+--------------------+------------+----------+
|                [80]|{1170682, null}|               en|Skin flick about ...|       0.6|  1970-01-01|Convicts Two and Sex|         0.0|         0|
|        [18, 80, 12]| {967070, null}|               tr|This film is abou...|     0.932|  1970-01-01|       Blood and Gun|         0.0|         0|
|            [18, 80]| {923867, null}|               cs|                    |       0.6|  1970-01-01|   Bigbeatová svatba|         0.0|         0|
|                [80]| {844478, null}|               si|                    |       0.6|  1970-01-01|            මොකද 

In [8]:
# lista com expressão para calcular a contagem de nulos em cada coluna
expressao_contagem_nulos = [count(when(col(coluna).isNull(), coluna)).alias(coluna) for coluna in df_tmdb_crime.columns]

# função select para calcular a contagem de nulos em cada coluna
contagem_nulos = df_tmdb_crime.select(expressao_contagem_nulos)

# Exiba a contagem de nulos
contagem_nulos.show()

+---------+---+-----------------+--------+----------+------------+-----+------------+----------+
|genre_ids| id|original_language|overview|popularity|release_date|title|vote_average|vote_count|
+---------+---+-----------------+--------+----------+------------+-----+------------+----------+
|       15| 12|               12|      12|        19|          26|   26|          26|        26|
+---------+---+-----------------+--------+----------+------------+-----+------------+----------+


In [9]:
def convert_genres_id(genres_str):
    # Remove colchetes e espaços em branco e divide os valores
    genres_str = genres_str.strip("[]")
    values = genres_str.split(",")
    
    # Converte os valores para inteiros e retorna como uma string separada por vírgulas
    return ",".join(map(lambda x: x.strip(), values))

# Registre a função UDF
convert_genres_id_udf = udf(convert_genres_id, StringType())

# Aplique a função UDF à coluna "genres_id" e crie uma nova coluna
df_tmdb_crime = df_tmdb_crime.withColumn("genres_numeric", convert_genres_id_udf(df_tmdb_crime["genre_ids"]))




In [10]:
df_tmdb_crime.show(5)

+------------+---------------+-----------------+--------------------+----------+------------+--------------------+------------+----------+--------------+
|   genre_ids|             id|original_language|            overview|popularity|release_date|               title|vote_average|vote_count|genres_numeric|
+------------+---------------+-----------------+--------------------+----------+------------+--------------------+------------+----------+--------------+
|        [80]|{1170682, null}|               en|Skin flick about ...|       0.6|  1970-01-01|Convicts Two and Sex|         0.0|         0|            80|
|[18, 80, 12]| {967070, null}|               tr|This film is abou...|     0.932|  1970-01-01|       Blood and Gun|         0.0|         0|      18,80,12|
|    [18, 80]| {923867, null}|               cs|                    |       0.6|  1970-01-01|   Bigbeatová svatba|         0.0|         0|         18,80|
|        [80]| {844478, null}|               si|                    |       

In [11]:
# Mapeamento entre códigos e nomes dos gêneros
genre_mapping = {28: "Action",12: "Adventure",16: "Animation",35: "Comedy",80: "Crime",99: "Documentary",18: "Drama",10751: "Family",
                 14: "Fantasy",36: "History",27: "Horror",10402: "Music",9648: "Mystery",10749: "Romance",878: "Science Fiction",
                 10770: "TV Movie",53: "Thriller",10752: "War",37: "Western"}




In [17]:
# Defina uma função UDF para mapear os gêneros
def map_genres(genre_ids):
    genre_ids = genre_ids.split(",")
    genres = [genre_mapping.get(int(genre_id), "Unknown") for genre_id in genre_ids]
    return ",".join(genres)

# Registre a função UDF
# spark.udf.register("map_genres_udf", map_genres, StringType())
map_genres_udf = udf(map_genres, StringType())

# Aplique a função UDF à coluna "genres_numeric"
df_tmdb_crime = df_tmdb_crime.withColumn("genres_mapped", udf("map_genres_udf")(df_tmdb_crime["genres_numeric"]))

TypeError: Invalid function: not a function or callable (__call__ is not defined): <class 'pyspark.sql.column.Column'>


In [15]:
# Aplique a transformação para mapear os gêneros
df_tmdb_crime = df_tmdb_crime.withColumn("genres_mapped", expr("map_genres_udf(genres_numeric)"))

AnalysisException: Undefined function: 'map_genres_udf'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 0


In [None]:
# Excluindo as colunas "genre_ids" e "genres_numeric"
df_tmdb_crime = df_tmdb_crime.drop("genres_numeric","genre_ids")

In [None]:
df_tmdb_crime.show(3)

In [None]:
# Defina a função UDF para extrair o valor numérico
def extract_numeric_value(value):
    # Remova colchetes, espaços em branco e vírgula e converta para inteiro
    value = value.strip("[] ,")
    return int(value) if value.isdigit() else None

# Registre a função UDF
spark.udf.register("extract_numeric_value_udf", extract_numeric_value, IntegerType())

In [None]:
# Aplique a função UDF à coluna "id" para extrair os valores numéricos
df_tmdb_crime = df_tmdb_crime.withColumn("id_numeric", udf("extract_numeric_value_udf")(df_tmdb_crime["id"]))

In [None]:
s3output = glueContext.getSink(
  path="s3://bucket_name/folder_name",
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  partitionKeys=[],
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3output.setCatalogInfo(
  catalogDatabase="demo", catalogTableName="populations"
)
s3output.setFormat("glueparquet")
s3output.writeFrame(DyF)

In [None]:
# Criando um DynamicFrame a partir do df
dyf_tmdb_crime = DynamicFrame.fromDF(df_tmdb_crime, glueContext, "dyf_tmdb_crime")

s3output = glueContext.getSink(
  path="s3://data-lake-daniele/Trusted/dados-tmdb/",
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3output.setCatalogInfo(
  catalogDatabase="movies-trusted-data", catalogTableName="dados-tmdb"
)
s3output.setFormat("glueparquet")
s3output.writeFrame(dyf_tmdb_crime)