In [1]:
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, udf, struct
from pyspark.sql.types import StringType
from awsglue.transforms import Map, DropFields, ApplyMapping
import psycopg2

sc = SparkContext()
context = GlueContext(sc)

In [2]:
path = "s3://network.cubo.datalake/airtable/raw/prod/"
path = path + "2025-03-17-18-05-57/data/"
path = path + "Calend√°rio de Eventos Cubo/Events/Plataforma Cubo Network/"
print(path)
dataframeBaseEventos = context.create_dynamic_frame.from_options(
                connection_type='s3',
                connection_options={
                    'paths': [path],
                    'recurse': True
                },
                format='json'
            )
dataframeBaseEventos.count()

s3://network.cubo.datalake/airtable/raw/prod/2025-03-17-18-05-57/data/Calend√°rio de Eventos Cubo/Events/Plataforma Cubo Network/


6

In [3]:
dataframeBaseEventos.printSchema()

root
|-- id: string
|-- createdTime: string
|-- fields: struct
|    |-- T√≠tulo do evento: string
|    |-- Categoria de evento: string
|    |-- Data e Hor√°rio: string
|    |-- Tipo do evento: string
|    |-- Link de inscri√ß√£o: string
|    |-- Formato: string
|    |-- Trimestre: string
|    |-- Semestre: string
|    |-- Publico: array
|    |    |-- element: string
|    |-- Hub: array
|    |    |-- element: string
|    |-- Imagem: array
|    |    |-- element: struct
|    |    |    |-- id: string
|    |    |    |-- width: int
|    |    |    |-- height: int
|    |    |    |-- url: string
|    |    |    |-- filename: string
|    |    |    |-- size: int
|    |    |    |-- type: string
|    |    |    |-- thumbnails: struct
|    |    |    |    |-- small: struct
|    |    |    |    |    |-- url: string
|    |    |    |    |    |-- width: int
|    |    |    |    |    |-- height: int
|    |    |    |    |-- large: struct
|    |    |    |    |    |-- url: string
|    |    |    |    |    |-- wid

In [4]:
mapping = []
for item in dataframeBaseEventos.unnest().toDF().dtypes:
    if item[0].split('.')[0] == "fields":
        if len(item[0].split(".")) == 2:
            mapping.append((item[0], item[0].split('.')[1]))
    else:
        mapping.append((item[0], item[0].split('.')[0]))  

In [5]:
#Fun√ß√£o para extrair a URL da imagem, lidando corretamente com Row objects
def extract_imagem_url(imagem):
    if isinstance(imagem, list) and len(imagem) > 0:
        first_image = imagem[0]  # Pegamos o primeiro elemento da lista

        #Verifica se o elemento √© um dicion√°rio ou um Row object
        if isinstance(first_image, dict):
            return first_image.get("url", None)
        elif hasattr(first_image, "asDict"):  # Caso seja um Row object
            return first_image.asDict().get("url", None)

    return None  # Retorna None se n√£o houver imagem

#Criar a UDF para uso no DataFrame
extract_imagem_url_udf = udf(extract_imagem_url, StringType())

#Converter DynamicFrame para DataFrame do Spark
newDf = dataframeBaseEventos.toDF()

#Atualizar a coluna "fields" para incluir "ImagemURL"
newDf = newDf.withColumn(
    "fields",
    struct(
        col("fields.*"),  # üî• Mant√©m todas as colunas j√° existentes em "fields"
        extract_imagem_url_udf(col("fields.Imagem")).alias("ImagemURL")  # üî• Adiciona ImagemURL corretamente
    )
)

#Converter DataFrame do Spark de volta para DynamicFrame no Glue
newDf = DynamicFrame.fromDF(newDf, context, "newDf")

In [None]:
#Verificar schema atualizado
newDf.printSchema()

In [6]:
#Aplicar apply_mapping() e resolver tipos
newDf = newDf.apply_mapping(mapping + [
    ("fields.ImagemURL", "string", "ImagemURL", "string")  #Adicionando ImagemURL ao mapeamento
]) \
    .resolveChoice([
        ("Evento Privado?", "cast:boolean"),
        ("ImagemURL", "cast:string")  #Garantindo que ImagemURL seja reconhecido
    ]) \
    .drop_fields([
        "id", "Tipo do evento", "createdTime", "Hub", "Publico", "Trimestre", "Semestre",
        "Categoria de evento", "Status do evento", "Created By", 
        "Nome do criador do evento", "Created",
        "Imagem"  #Removendo o campo original "Imagem"
    ])


In [None]:
#Verificar schema atualizado
newDf.printSchema()

In [None]:
# #Converter DynamicFrame de volta para DataFrame do Spark
# newDf_final = newDf.toDF().toDF(
#     "T√≠tulo do evento", "Data e Hor√°rio", "Link de inscri√ß√£o", "Formato", 
#     "Descri√ß√£o", "Ativo", "Listar em Eventos Recomendados na Plataforma?", 
#     "Evento Privado?", "Publicado?", "Label", "Data e Hor√°rio de T√©rmino", 
#     "Local do Evento", "Evento Destaque(site)", "Tipo de Ingresso(site)", 
#     "Local(Sala/Andar)(site)", "Tema(site)", "Resumo do tema(site)", 
#     "Programa√ß√£o(site)", "Palestrantes(site)", "ImagemURL"
# )

In [7]:
#Converter DynamicFrame de volta para DataFrame do Spark
newDf_final = newDf.toDF()

In [None]:
#Exibir o schema para confirmar que ImagemURL est√° presente
newDf_final.printSchema()

In [8]:
newDf_final.select("ImagemURL").show(10, truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ImagemURL                                                                                                                                                                                                                                                                                                                                                                 |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
newDf_final.show(5)

In [9]:
try:
    connection = psycopg2.connect(
        host="db-dev-poc-rds.c3dnragwy3og.us-east-1.rds.amazonaws.com",
        port="5432",
        database="db_dev_poc_rds",
        user="postgres",
        password="P06n9VDnXZ5CLlg9UNrgd6h53"
    )
    print("Conex√£o bem-sucedida!")
    connection.close()
except Exception as e:
    print(f"Erro ao conectar ao banco: {e}")

Conex√£o bem-sucedida!


In [10]:
jdbc_url = "jdbc:postgresql://db-dev-poc-rds.c3dnragwy3og.us-east-1.rds.amazonaws.com:5432/db_dev_poc_rds"
jdbc_properties = {
    "user": "postgres",
    "password": "P06n9VDnXZ5CLlg9UNrgd6h53",
    "driver": "org.postgresql.Driver",
    "stringtype": "unspecified"
}

In [11]:
from pyspark.sql.functions import col, to_timestamp

newDf_final = newDf_final.select(
    col("T√≠tulo do evento").alias("name"),
    to_timestamp(col("Data e Hor√°rio")).alias("start_date"),  # Convers√£o correta de data
    col("Link de inscri√ß√£o").alias("url"),
    col("Formato").alias("modality"),
    col("Descri√ß√£o").alias("detail"),
    col("Ativo").cast("boolean").alias("active"),
    col("Listar em Eventos Recomendados na Plataforma?").cast("boolean").alias("is_main_event"),
    col("Evento Privado?").cast("boolean").alias("private_event"), 
    col("Publicado?").cast("boolean").alias("published"), 
    col("Label").alias("id"),
    to_timestamp(col("Data e Hor√°rio de T√©rmino")).alias("end_date"),  # Convers√£o correta de data
    col("Local do Evento").alias("address"), 
    col("Evento Destaque(site)").cast("boolean").alias("featured_events"), 
    col("Tipo de Ingresso(site)").alias("ticket_type"),
    col("Local(Sala/Andar)(site)").alias("room_and_floor"),
    col("Tema(site)").alias("theme"),
    col("Resumo do tema(site)").alias("theme_summary"), 
    col("Programa√ß√£o(site)").alias("schedule"),
    col("Palestrantes(site)").alias("speakers"),
    col("ImagemURL").alias("image")
)


In [12]:
newDf_final.select("image").show(10, truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|image                                                                                                                                                                                                                                                                                                                                                                     |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
newDf_final.select("theme_summary").show(10, truncate=False)

In [None]:
newDf_final.select("id").show(10, truncate=False)

In [None]:
newDf_final.show(truncate=False)

In [None]:
newDf_final.printSchema()

In [None]:
# newDf_final.select("start_date", "end_date").show(truncate=False)

In [None]:
# newDf.limit(1).write.jdbc(
#     url=jdbc_url,
#     table="events_glue",
#     mode="append",
#     properties=jdbc_properties
# )

In [13]:
newDf_final.write.jdbc(
        url=jdbc_url,
        table="events",
        mode="append",  # Adiciona os registros sem sobrescrever
        properties=jdbc_properties
)