In [None]:
%stop_session
#%additional_python_modules pyiceberg==0.3.0,pyspark==3.2.0

In [None]:
%idle_timeout 11520
%glue_version 4.0
%worker_type G.1X
%number_of_workers 2
%region us-east-1
%connections connection1,connection2
%max_concurrent_runs 20
%%configure
{
    "--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.glue_catalog.warehouse= --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO",
    "--datalake-formats": "iceberg",
    "--JOB_NAME":"",
    "--data":"CURRENT_DATE",
    "--intervalo":"0",
    "--catalog":"glue_catalog",
    "--database":"db_bronze",
    "--table":"",
    "--connection":"",
    "--query":"",
    "--match_id":"id",
    "--metodo":"drop-insert",
    "--partition": "true",
    "--enable-metrics": "true",
    "--enable-continuous-cloudwatch-log": "true",
    "--enable-spark-ui": "true",
    "--spark-event-logs-path":""
}

In [None]:
import sys
from awsglue.transforms import *
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, date_add, date_format
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.sql.functions import col, when, lit
from awsglue.dynamicframe import DynamicFrame
from datetime import datetime, timedelta, date
import pandas as pd

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
args = getResolvedOptions(sys.argv, ['JOB_NAME','data','intervalo','catalog','database','table','connection','query','match_id','metodo','partition'])
job.init(args['JOB_NAME'], args)

par_date = args['data']
par_intervalo = int(args['intervalo'])
catalog_name = args['catalog']
database_name = args['database']
table_name = args['table']
par_connection = args['connection']
par_query = args['query']
match_id = args['match_id']
metodo = args['metodo']
partition = args['partition']


if par_date == 'CURRENT_DATE':
    par_date = date.today()
    par_date = par_date.strftime("%Y-%m-%d")

def substituir_parametros(query,parametros):
    for chave, valor in parametros.items():
        query = query.replace(f"{{{chave}}}", valor)
    return query

def get_driver (driver_name):
    drivers = {
        "mysql": "com.mysql.cj.jdbc.Driver",
        "postgresql": "org.postgresql.Driver",
        "sqlserver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
    }
    return drivers[driver_name]

In [2]:
connection_options = glueContext.extract_jdbc_conf(par_connection)
source_url = connection_options.get("fullUrl")
source_user = connection_options.get("user")
source_password = connection_options.get("password")
source_driver = get_driver (connection_options.get("vendor"))

temp_table_name = f"tmp_{table_name}"







In [None]:
if metodo == 'merge-incremental':
    
    # Criar DataFrame com datas de referência
    date_final = datetime.strptime(par_date, "%Y-%m-%d")
    date_inicial = date_final - timedelta(days=par_intervalo)
    lt_periodo = pd.date_range(start=date_inicial, end=date_final).to_pydatetime().tolist()
    lt_periodo.sort(reverse=False)
    # Criar um DataFrame Spark a partir da lista de datas
    df_datas = spark.createDataFrame([(d,) for d in lt_periodo], ["data_referencia"])
    # Converter coluna de data para formato de data do Spark
    df_datas = df_datas.withColumn("data_referencia", col("data_referencia").cast("date"))

    for row in df_datas.collect():
        data_referencia = row.data_referencia
        parametros = {
        "par_referencia": data_referencia.strftime("%Y-%m-%d"),
        "par_data_inicial_timestamp": data_referencia.strftime("%Y-%m-%d %H:%M:%S"),
        "par_data_final_timestamp": data_referencia.strftime("%Y-%m-%d 23:59:59"),
        "par_data_inicial_extra": (data_referencia - timedelta(days=1)).strftime("%Y-%m-%d"),
        "par_data_final_extra": (data_referencia + timedelta(days=1)).strftime("%Y-%m-%d"),
        "par_data_inicial_extra_timestamp": (data_referencia - timedelta(days=1)).strftime("%Y-%m-%d %H:%M:%S"),
        "par_data_final_extra_timestamp": (data_referencia + timedelta(days=1)).strftime("%Y-%m-%d 23:59:59")
        }
        print(f"Executando tabela {catalog_name}.{database_name}.{table_name} para a data de {data_referencia}")

        #Carregar DataFrame##################################################################################################################################

        par_query_substituida = substituir_parametros(par_query,parametros)
        print(f"Executando Consulta = {par_query_substituida}")
        df = spark.read \
        .format("jdbc") \
        .option("url", source_url) \
        .option("dbtable", par_query_substituida) \
        .option("user", source_user) \
        .option("password", source_password) \
        .option("driver", source_driver) \
        .load()
        df.createOrReplaceTempView(temp_table_name)

        #####################################################################################################################################################

        #Criação de base de dados caso nao exista
        query_database = f"""CREATE DATABASE IF NOT EXISTS {catalog_name}.{database_name}"""
        spark.sql(query_database)
        print(f"Criando DataBase caso nao exista = {query_database}")

        if partition == True:
            #Criação de tabela caso nao exista
            query_table = f"""
            CREATE TABLE IF NOT EXISTS {catalog_name}.{database_name}.{table_name}
            USING iceberg 
            PARTITIONED BY (days(partition_date)) 
            TBLPROPERTIES ("format-version"="2")
            AS SELECT * FROM {temp_table_name} ORDER BY partition_date,{match_id}
            """
        else:
            #Criação de tabela caso nao exista
            query_table = f"""
            CREATE TABLE IF NOT EXISTS {catalog_name}.{database_name}.{table_name}
            USING iceberg 
            TBLPROPERTIES ("format-version"="2")
            AS SELECT * FROM {temp_table_name}
            """
    
        spark.sql(query_table)  
        print(f"Criando Tabela caso nao exista = {catalog_name}.{database_name}.{table_name}")
  
        #Inclusão de novas colunas na tabela existente
        esquema_tabela = spark.table(f"{catalog_name}.{database_name}.{table_name}").schema
        colunas_dataframe = df.columns
        colunas_tabela = [campo.name for campo in esquema_tabela.fields if campo != {match_id}]
        novas_colunas = list(set(colunas_dataframe) - set(colunas_tabela))

        for coluna in novas_colunas:
            print(f"Adicionando nova coluna à tabela Iceberg: {coluna}")
            tipo_de_dado = df.schema[coluna].dataType
            sql_coluna = f"""
            ALTER TABLE {catalog_name}.{database_name}.{table_name}
            ADD COLUMNS ({coluna} string)
            """
            spark.sql(sql_coluna)

        #Preparação dinamica do merge
        colunas = [coluna for coluna in df.columns if coluna != match_id]
        condicao_select = " ".join([f"b.{c} as {c}," for c in colunas])
        condicao_where = " AND ".join([f"a.{c} = b.{c}" for c in colunas])
        par_referencia = parametros['par_referencia']

        query_merge = f"""
        WITH changes AS
        (SELECT
        COALESCE(b.{match_id}, a.{match_id}) AS {match_id}, {condicao_select}
        CASE WHEN b.{match_id} IS NULL THEN 'D' WHEN a.{match_id} IS NULL THEN 'I' ELSE 'U' END as cdc
        FROM {catalog_name}.{database_name}.{table_name} a
        FULL OUTER JOIN {temp_table_name} b ON a.{match_id} = b.{match_id}
        WHERE b.partition_date = '{par_referencia}'
        AND NOT coalesce(({condicao_where}), false))

        MERGE INTO {catalog_name}.{database_name}.{table_name}
        USING changes
        ON {catalog_name}.{database_name}.{table_name}.{match_id} = changes.{match_id}
        WHEN MATCHED AND changes.cdc = 'D' THEN DELETE
        WHEN MATCHED AND changes.cdc = 'U' THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
        """
        print(f"Fazendo Merge da tabela tabela {catalog_name}.{database_name}.{table_name}")
        spark.sql(query_merge)
        print(query_merge)
    
elif metodo == 'drop-insert':    
    
    #Carregar DataFrame##################################################################################################################################

    print(f"Executando Consulta = {par_query}")
    df = spark.read \
    .format("jdbc") \
    .option("url", source_url) \
    .option("dbtable", par_query) \
    .option("user", source_user) \
    .option("password", source_password) \
    .option("driver", source_driver) \
    .load()
    df.createOrReplaceTempView(temp_table_name)

    #####################################################################################################################################################

    #Criação de base de dados caso nao exista
    query_database = f"""CREATE DATABASE IF NOT EXISTS {catalog_name}.{database_name}"""
    spark.sql(query_database)
    print(f"Criando DataBase caso nao exista = {query_database}")
    
    if partition == True:
        #Criação de tabela caso nao exista
        query_table = f"""
        CREATE TABLE IF NOT EXISTS {catalog_name}.{database_name}.{table_name}
        USING iceberg 
        PARTITIONED BY (days(partition_date)) 
        TBLPROPERTIES ("format-version"="2")
        AS SELECT * FROM {temp_table_name} ORDER BY partition_date,{match_id}
        """
    else:
        #Criação de tabela caso nao exista
        query_table = f"""
        CREATE TABLE IF NOT EXISTS {catalog_name}.{database_name}.{table_name}
        USING iceberg 
        TBLPROPERTIES ("format-version"="2")
        AS SELECT * FROM {temp_table_name}
        """

    spark.sql(query_table)  
    print(f"Criando Tabela caso nao exista = {catalog_name}.{database_name}.{table_name}")

    #Inclusão de novas colunas na tabela existente
    esquema_tabela = spark.table(f"{catalog_name}.{database_name}.{table_name}").schema
    colunas_dataframe = df.columns
    colunas_tabela = [campo.name for campo in esquema_tabela.fields if campo != {match_id}]
    novas_colunas = list(set(colunas_dataframe) - set(colunas_tabela))

    for coluna in novas_colunas:
        print(f"Adicionando nova coluna à tabela Iceberg: {coluna}")
        tipo_de_dado = df.schema[coluna].dataType
        sql_coluna = f"""
        ALTER TABLE {catalog_name}.{database_name}.{table_name}
        ADD COLUMNS ({coluna} string)
        """
        spark.sql(sql_coluna)

    query_truncate= f"""TRUNCATE TABLE {catalog_name}.{database_name}.{table_name}"""
    print(f"Truncando tabela {catalog_name}.{database_name}.{table_name}")
    spark.sql(query_truncate)
    
    if partition == True:
        query_insert = f"""
        INSERT INTO {catalog_name}.{database_name}.{table_name} 
        SELECT * FROM {temp_table_name} ORDER BY partition_date,{match_id}"""
    else:
        query_insert = f"""
        INSERT INTO {catalog_name}.{database_name}.{table_name} 
        SELECT * FROM {temp_table_name}"""
    
    spark.sql(query_insert)

In [None]:
job.commit()

In [None]:
%stop_session