In [1]:
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName("Extraction_Data")\
        .config("spark.driver.extraClassPath", "/home/adriano/Documentos/airflow/jdbc/postgresql-42.7.4.jar")\
        .getOrCreate()

25/04/05 09:27:31 WARN Utils: Your hostname, Ubuntu-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/04/05 09:27:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/05 09:27:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
from datetime import datetime

current_year = datetime.now().year

In [3]:
def sql_delete_duplicates(table_name, key_columns):
    select_key_columns = ' , '.join(key_columns)
    comparison_keys_columns = [f'a.{key_column} = b.{key_column}' for key_column in key_columns]
    where_key_columns = '\nAND '.join(comparison_keys_columns)
    sql = f'''DELETE FROM {table_name} a USING (
                SELECT MIN(ctid) as ctid, {select_key_columns}
                FROM {table_name} 
                GROUP BY ({select_key_columns}) HAVING COUNT(*) > 1
            ) b
            WHERE {where_key_columns}
            AND a.ctid <> b.ctid'''
    return sql


def delete_duplicates(table_name, key_columns, connection_properties):
    conn, curr = create_connection(connection_properties)
    sql_delete = sql_delete_duplicates(table_name, key_columns)
    curr.execute(sql_delete)
    conn.commit()
    conn.close()

def delete_duplicates_stg():
    connection_properties = {}
    tables_key_columns = [('stg_projeto_investimento_eixos', ['idunico','id']),
                          ('stg_projeto_investimento_executores', ['idunico', 'codigo']),
                          ('stg_projeto_investimento_fontes_de_recurso', ['idunico', 'origem', 'valorinvestimentoprevisto']),
                          ('stg_projeto_investimento_repassadores', ['idunico', 'codigo']),
                          ('stg_projeto_investimento_sub_tipos', ['idunico', 'id']),
                          ('stg_projeto_investimento_tipos', ['idunico', 'id']),
                          ('stg_projeto_investimento_tomadores', ['idunico', 'codigo']),
                          ('stg_projeto_investimento', ['idunico'])]
    for table_name, key_columns in tables_key_columns:
        print(f'Deleting duplicates on table {table_name}...')
        delete_duplicates(table_name, key_columns, connection_properties)


#delete_duplicates_stg()
#print(sql_delete_duplicates('public.stg_projeto_investimento_eixos', ['idunico','id']))

In [4]:
import psycopg2
import os


user_dw = os.getenv('USER_DW')
password_dw = os.getenv('PASSWORD_DW')
host_dw = os.getenv('HOST_DW')
database_dw = os.getenv('DATABASE_DW')
port_dw = os.getenv('PORT_DW')
driver = "org.postgresql.Driver"
connection_properties = {'db_name':database_dw, 'user':user_dw, 'password':password_dw, 'host':host_dw, 'port':port_dw}


def create_connection(connection_properties): 
    #Connect to the Postgresql database using the psycopg2 adapter. 
    #Pass your database name , username , password , hostname and port number 
    conn = psycopg2.connect(f"dbname='{connection_properties['db_name']}' user='{connection_properties['user']}' password='{connection_properties['password']}'\
                            host='{connection_properties['host']}' port='{connection_properties['port']}'") 
    #Get the cursor object from the connection object 
    curr = conn.cursor() 
    return conn,curr 

def delete_duplicates_distinct_years(table_name, years, connection_properties):
    conn, curr = create_connection(connection_properties)

    curr.execute(f'''create table IF NOT EXISTS {table_name}_temp (like {table_name})''')
    conn.commit()

    curr.execute(f'''INSERT INTO {table_name}_temp SELECT DISTINCT * FROM {table_name} where cast("nomeArquivo" as integer) in ({','.join(years)})''')
    conn.commit()

    curr.execute(f'''delete from {table_name} where cast("nomeArquivo" as integer) in ({','.join(years)})''')
    conn.commit()

    curr.execute(f'''insert into {table_name} select * from {table_name}_temp''')
    conn.commit()

    curr.execute(f'''drop table {table_name}_temp''')
    conn.commit()

    conn.close()

def delete_duplicates_distinct(table_name, connection_properties):
    conn, curr = create_connection(connection_properties)

    curr.execute(f'''create table IF NOT EXISTS {table_name}_temp (like {table_name})''')
    conn.commit()

    curr.execute(f'''INSERT INTO {table_name}_temp SELECT DISTINCT * FROM {table_name}''')
    conn.commit()

    curr.execute(f'''delete from {table_name}''')
    conn.commit()

    curr.execute(f'''insert into {table_name} select * from {table_name}_temp''')
    conn.commit()

    curr.execute(f'''drop table {table_name}_temp''')
    conn.commit()

    conn.close()

#delete_duplicates_distinct('stg_execucao_financeira', connection_properties)


25/04/05 09:27:47 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [16]:
type(datetime.now().year)

int

In [17]:
ano_inicial = 2023
ano_final = 2024
for ano in range(int(ano_inicial), int(ano_final) + 1):
    print(ano)

2023
2024


In [5]:
ano_inicial = current_year
ano_final = current_year

dif_anos = (ano_final - ano_inicial) + 1
for i in range(dif_anos):
    print(ano_inicial + i)

2025


In [6]:
from pyspark.sql.functions import input_file_name, substring, regexp_replace

origin = '/home/adriano/Documentos/airflow/database/dest/bronze/projeto_investimento/uf'
process_all = True
ano_final = current_year
ano_inicial = current_year


if process_all == False:
    origin_file = origin + '/' + str(ano_inicial)+ '.json'
    df = spark.read.json(origin_file)
elif process_all == True:
    origin_file = origin + '/*.json'
    df = spark.read.json(origin_file) 
    df = df.withColumn('nomeArquivo',substring(input_file_name(),-9,4))
    df = df.withColumn('teste',regexp_replace(input_file_name(), '.*\/|\.json$', ''))

df.show()
#print(df.show())
#print(df.printSchema())

  df = df.withColumn('teste',regexp_replace(input_file_name(), '.*\/|\.json$', ''))
25/04/05 09:28:20 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 1:>                                                          (0 + 1) / 1]

+----------+------------+----------------+-----------------+------------------+-------------------+------------+----------------------------------+------------------------+--------------------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+-----------+----------------+--------------------+--------------------+--------------+--------------------+----------------------+--------------------+------------------+--------------------+----------+--------------------+--------------------+--------------------+---+-----------+-----+
|       cep|dataCadastro|dataFinalEfetiva|dataFinalPrevista|dataInicialEfetiva|dataInicialPrevista|dataSituacao|descPlanoNacionalPoliticaVinculado|descPopulacaoBeneficiada|           descricao|               eixos|            endereco|    especie|          executores|     fontesDeRecurso|        funcaoSocial|          geometrias|    idUnico|isModeladaPorBim|          metaGlobal|

                                                                                

In [7]:
from pyspark.sql.functions import input_file_name, substring, explode, col, to_date, length
from pyspark.sql.types import ArrayType, StringType
origin = '/home/adriano/Documentos/airflow/database/dest/bronze/projeto_investimento/date'
process_all = False
ano_final = current_year
ano_inicial = current_year


if process_all == False:
    origin_file = origin + '/2024-09-18.json'
    df = spark.read.json(origin_file)
    df = df.withColumn('nomeArquivo',substring(input_file_name(),-15,10))
elif process_all == True:
    origin_file = origin + '/*.json'
    df = spark.read.json(origin_file) 
    df = df.withColumn('nomeArquivo',substring(input_file_name(),-9,4))

list_drop = ['tomadores','executores', 'teste']

df_eixos = df.drop()
df_eixos = df_eixos.withColumn("dataCadastro", to_date(col("dataCadastro"), "yyyy-MM-dd"))

drop_columns = []

for column in df.schema:
    if type(column.dataType) == ArrayType:
        drop_columns.append(column.name)

drop_columns

#df_eixos = df.drop(*drop_columns)
df_eixos.printSchema()
#df.schema[5].dataType



#curs = conn.cursor()
#curs.execute(f'''delete from stg_execucao_financeira where "nomeArquivo" = '{ano_final}' ''')

#print(df.show())
#print(df.printSchema())
#df.createOrReplaceTempView("eixos")

#results = spark.sql("select idUnico, eixos.eixos.* from eixos")
#results.show()

root
 |-- cep: string (nullable = true)
 |-- dataCadastro: date (nullable = true)
 |-- dataFinalEfetiva: string (nullable = true)
 |-- dataFinalPrevista: string (nullable = true)
 |-- dataInicialEfetiva: string (nullable = true)
 |-- dataInicialPrevista: string (nullable = true)
 |-- dataSituacao: string (nullable = true)
 |-- descPlanoNacionalPoliticaVinculado: string (nullable = true)
 |-- descPopulacaoBeneficiada: string (nullable = true)
 |-- descricao: string (nullable = true)
 |-- eixos: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- descricao: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |-- endereco: string (nullable = true)
 |-- especie: string (nullable = true)
 |-- executores: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- codigo: long (nullable = true)
 |    |    |-- nome: string (nullable = true)
 |-- fontesDeRecurso: array (nullable = true)
 |    |-- element: struct (con

In [8]:
import jaydebeapi
import os
from dotenv import load_dotenv
from pathlib import Path
from datetime import timedelta

dotenv_path = Path('/home/adriano/Documentos/airflow/.env')
load_dotenv(dotenv_path=dotenv_path)

user_dw = os.getenv('USER_DW')
password_dw = os.getenv('PASSWORD_DW')
host_dw = os.getenv('HOST_DW')
database_dw = os.getenv('DATABASE_DW')
port_dw = os.getenv('PORT_DW')
driver = "org.postgresql.Driver"
mode = 'append'
properties = {"user": user_dw, "password": password_dw, "driver": driver}
url = f'jdbc:postgresql://{host_dw}:{port_dw}/{database_dw}'

df_geometria = df.withColumn("geometrias1", explode("geometrias")).select("idUnico","dataCadastro","geometrias1.*")
df_geometria = df_geometria.withColumn("dataCadastro", to_date(col("dataCadastro"), "yyyy-MM-dd"))
df_geometria = df_geometria.withColumn("dataCriacao", to_date(col("dataCriacao"), "yyyy-MM-dd"))
df_geometria = df_geometria.withColumn("dataMetadado", to_date(col("dataMetadado"), "yyyy-MM-dd"))
df_geometria.write.jdbc(url=url, table='stg_projeto_investimento_geometria', mode=mode, properties=properties)

drop_columns = []
for column in df_geometria.schema:
    if type(column.dataType) == ArrayType:
        drop_columns.append(column.name)
    if type(column.dataType) == StringType:
        print(column.name,df_geometria.withColumn("len_col",length(col(column.name))).groupby().max("len_col").head()[0])

"""
df_projeto_investimento = df.drop(*drop_columns)
df_projeto_investimento = df_projeto_investimento.withColumn("dataCadastro", to_date(col("dataCadastro"), "yyyy-MM-dd"))
df_projeto_investimento = df_projeto_investimento.withColumn("dataInicialPrevista", to_date(col("dataInicialPrevista"), "yyyy-MM-dd"))
df_projeto_investimento = df_projeto_investimento.withColumn("dataFinalPrevista", to_date(col("dataFinalPrevista"), "yyyy-MM-dd"))
df_projeto_investimento = df_projeto_investimento.withColumn("dataInicialEfetiva", to_date(col("dataInicialEfetiva"), "yyyy-MM-dd"))
df_projeto_investimento = df_projeto_investimento.withColumn("dataFinalEfetiva", to_date(col("dataFinalEfetiva"), "yyyy-MM-dd"))
df_projeto_investimento = df_projeto_investimento.withColumn("dataCadastro", to_date(col("dataCadastro"), "yyyy-MM-dd"))
df_projeto_investimento = df_projeto_investimento.withColumn("dataSituacao", to_date(col("dataSituacao"), "yyyy-MM-dd"))
df_projeto_investimento.write.jdbc(url=url, table='stg_projeto_investimento', mode=mode, properties=properties)

today = datetime.now()
date_before = today - timedelta(days=30)

mode = 'append'
url = f'jdbc:postgresql://{host_dw}:{port_dw}/{database_dw}'
properties = {"user": user_dw, "password": password_dw, "driver": driver}
conn = jaydebeapi.connect(driver, url, [user_dw, password_dw], '/home/adriano/Documentos/airflow/jdbc/postgresql-42.7.4.jar')
curs = conn.cursor()
curs.execute(f'''delete from stg_projeto_investimento_eixos         where "datacadastro" between '{date_before}' and '{today}';
                 delete from stg_projeto_investimento_tomadores     where "datacadastro" between '{date_before}' and '{today}';
                 delete from stg_projeto_investimento_executores    where "datacadastro" between '{date_before}' and '{today}';
                 delete from stg_projeto_investimento_repassadores  where "datacadastro" between '{date_before}' and '{today}';
                 delete from stg_projeto_investimento_tipos         where "datacadastro" between '{date_before}' and '{today}';
                 delete from stg_projeto_investimento_sub_tipos     where "datacadastro" between '{date_before}' and '{today}';
                 delete from stg_projeto_investimento_geometria     where "datacadastro" between '{date_before}' and '{today}';
                 delete from stg_projeto_investimento_fontes_de_recurso where "datacadastro" between '{date_before}' and '{today}';
                 delete from stg_projeto_investimento               where "datacadastro" between '{date_before}' and '{today}';''')

#mode = 'append'
#properties = {"user": user_dw, "password": password_dw, "driver": driver}
#df_eixos.write.jdbc(url=url, table='stg_projeto_investimento_fontes_de_recurso', mode=mode, properties=properties)
"""

                                                                                

idUnico 11
cepAreaExecutora 9
datum 11
enderecoAreaExecutora 51
geometria 42
infoAdicionais 92
nomeAreaExecutora 60
origem 11
paisAreaExecutora 6


'\ndf_projeto_investimento = df.drop(*drop_columns)\ndf_projeto_investimento = df_projeto_investimento.withColumn("dataCadastro", to_date(col("dataCadastro"), "yyyy-MM-dd"))\ndf_projeto_investimento = df_projeto_investimento.withColumn("dataInicialPrevista", to_date(col("dataInicialPrevista"), "yyyy-MM-dd"))\ndf_projeto_investimento = df_projeto_investimento.withColumn("dataFinalPrevista", to_date(col("dataFinalPrevista"), "yyyy-MM-dd"))\ndf_projeto_investimento = df_projeto_investimento.withColumn("dataInicialEfetiva", to_date(col("dataInicialEfetiva"), "yyyy-MM-dd"))\ndf_projeto_investimento = df_projeto_investimento.withColumn("dataFinalEfetiva", to_date(col("dataFinalEfetiva"), "yyyy-MM-dd"))\ndf_projeto_investimento = df_projeto_investimento.withColumn("dataCadastro", to_date(col("dataCadastro"), "yyyy-MM-dd"))\ndf_projeto_investimento = df_projeto_investimento.withColumn("dataSituacao", to_date(col("dataSituacao"), "yyyy-MM-dd"))\ndf_projeto_investimento.write.jdbc(url=url, table

In [9]:
from pyspark.sql.functions import cast

url = f'jdbc:postgresql://{host_dw}:{port_dw}/{database_dw}'
properties = {"user": user_dw, "password": password_dw, "driver": driver}

df_eixos = spark.read.format("jdbc")\
    .option("url", url)\
    .option("query", "select distinct cast(id as varchar) as nk_eixo, descricao from stg_projeto_investimento_eixos")\
    .option("driver", properties["driver"])\
    .option("user", properties["user"])\
    .option("password", properties["password"])\
    .load()

df_eixos.printSchema()

root
 |-- nk_eixo: string (nullable = true)
 |-- descricao: string (nullable = true)



In [10]:
rdd = df_eixos.rdd
data = rdd.map(tuple)

data = data.collect()
data

                                                                                

[('2', 'Militar'),
 ('1', 'Administrativo'),
 ('3', 'Econômico'),
 ('4', 'Social')]

In [12]:
from pyspark.sql.functions import concat
from pyspark.sql.functions import udf 

@udf(StringType())
def concat_names(column):
    print(column)
    if column not in ('Militar', 'Social'):
        abc = column + 'abc'
        return abc
    return column

df_eixos_2 = df_eixos
df_eixos_2 = df_eixos_2.withColumn('descricao', concat_names('descricao'))
df_eixos_2 = df_eixos_2.filter("descricao != 'Militar'")
df_eixos_2.show()

Militar32:>                                                         (0 + 1) / 1]
Administrativo
Econômico
Social
Administrativo
Econômico
Social
                                                                                

+-------+-----------------+
|nk_eixo|        descricao|
+-------+-----------------+
|      1|Administrativoabc|
|      3|     Econômicoabc|
|      4|           Social|
+-------+-----------------+



In [None]:
from pyspark.sql.functions import when, lit
from functools import reduce

def values_to_update(df_old, df_new, key_columns, compare_columns):
    """
    Compared two dataframes and find the rows to update.

    :param df_old: Old DataFrame
    :param df_new: New DataFrame
    :param key_columns: Join columns
    :param compare_columns: Comparisson coluns
    :return: Update DataFrame
    """
    # Criar a condição de join
    join_condition = [col(f"new.{key}") == col(f"old.{key}") for key in key_columns]
    
    # Realizar o join
    joined_df = df_new.alias("new").join(df_old.alias("old"), on=join_condition, how="inner")

    # Filtrar as diferenças
    # filter_condition = [
    #    (col(f"new.{column}") != col(f"old.{column}")) for column in compare_columns
    #]

    filter_condition = [
        f'not new.{column} <=> old.{column}' for column in compare_columns
    ]

    # Aplicar o filtro
    #differences = joined_df.filter(reduce(lambda a, b: a | b, filter_condition))
    differences = joined_df.filter(' or '.join(filter_condition))
    
    return differences.select("new.*")

def values_to_insert_delete(df1, df2, key_columns):
    """
    Compared two dataframes and find the rows to update.

    :param df1: Secondary DataFrame
    :param df2: Main DataFrame
    :param key_columns: Join columns
    :return: Insert Or Delete DataFrame
    """
    # Criar a condição de join
    join_condition = [col(f"new.{key}") == col(f"old.{key}") for key in key_columns]
    
    # Realizar o join
    joined_df = df2.alias("new").join(df1.alias("old"), on=join_condition, how="leftanti")

    return joined_df.select("new.*")

def values_to_delete(df1, df2, key_columns):
    """
    Compara dois DataFrames do Spark com base em colunas chave e colunas a serem comparadas.

    :param df1: Primeiro DataFrame
    :param df2: Segundo DataFrame
    :param key_columns: Lista de colunas chave para o join
    :param compare_columns: Lista de colunas a serem comparadas
    :return: DataFrame contendo as diferenças
    """
    # Criar a condição de join
    join_condition = [col(f"new.{key}") == col(f"old.{key}") for key in key_columns]
    
    # Realizar o join
    joined_df = df1.alias("new").join(df2.alias("old"), on=join_condition, how="leftanti")

    return joined_df.select("new.*")

teste = values_to_update(df_eixos, df_eixos_2, ['nk_eixo'], ['descricao'])

#teste = values_to_insert(df_eixos, df_eixos_2, ['nk_eixo'])

#teste = values_to_delete(df_eixos, df_eixos_2, ['nk_eixo'])
 
teste.show()

#conditions_ = len([when(df_eixos[c]!=df_eixos_2[c], lit(c)).otherwise("") for c in df_eixos.columns if c != 'nk_eixo']) == 0

#df_eixos.join(df_eixos_2, ['nk_eixo'], how='outer').select(df_eixos.descricao, df_eixos_2.descricao).show()

Militar
Administrativo
Econômico
Social
Administrativo
Econômico
Social


+-------+-----------------+
|nk_eixo|        descricao|
+-------+-----------------+
|      1|Administrativoabc|
|      3|     Econômicoabc|
+-------+-----------------+



In [13]:
from math import ceil

bulk_size = 3
split_w = ceil(df_eixos.count() / bulk_size)
df_result = df_eixos
for i in range(split_w):
    df_eixos_limited = df_result.limit(bulk_size)
    df_result = df_result.subtract(df_eixos_limited)
    print(df_eixos_limited.show())

print(split_w)

df_eixos.columns

+-------+--------------+
|nk_eixo|     descricao|
+-------+--------------+
|      2|       Militar|
|      1|Administrativo|
|      3|     Econômico|
+-------+--------------+

None
+-------+---------+
|nk_eixo|descricao|
+-------+---------+
|      4|   Social|
+-------+---------+

None
2


['nk_eixo', 'descricao']

In [14]:
import  psycopg2 
from math import ceil

f"dbname='{database_dw}' user='{properties['user']}' password='{properties['password']}' host='{host_dw}' port='{port_dw}'"

def create_sql(table_name, columns, key_columns, type):
    sql = ''
    if type in ('DELETE'):
        columns_comparisson = [f'"{column}" = %s' for column in key_columns]
        columns_comparisson = ' and '.join(columns_comparisson)
        sql = f'DELETE FROM {table_name} WHERE {columns_comparisson}'
    elif type in ('INSERT'):
        columns_str = list(map(lambda x: '"' + x + '"',columns + key_columns))
        columns_str = ', '.join(columns_str)
        placeholders = ', '.join(['%s'] * len(columns + key_columns))
        sql = f'INSERT INTO {table_name}({columns_str}) VALUES({placeholders})'
    elif type in ('UPDATE'):
        columns_comparisson = [f'"{column}" = %s' for column in key_columns]
        columns_comparisson = ' and '.join(columns_comparisson)
        columns_set = [f'"{column}" = %s' for column in columns]
        columns_set = ', '.join(columns_set)
        sql = f'UPDATE {table_name} SET {columns_set} WHERE {columns_comparisson}'

    return sql

def create_connection(connection_properties): 
    #Connect to the Postgresql database using the psycopg2 adapter. 
    #Pass your database name , username , password , hostname and port number 
    conn = psycopg2.connect(f"dbname='{connection_properties['db_name']}' user='{connection_properties['user']}' password='{connection_properties['password']}'\
                            host='{connection_properties['host']}' port='{connection_properties['port']}'") 
    #Get the cursor object from the connection object 
    curr = conn.cursor() 
    return conn,curr 

def bulk_values(table_name, df, bulk_size, connection_properties, type, columns, key_columns=[]):

    if df.count() == 0:
        return

    splits = ceil(df.count() / bulk_size)
    conn, curr = create_connection(connection_properties)
    rdn_splits = [float(bulk_size)] * splits
    splits_dfs = df.randomSplit(rdn_splits)


    for split_index, split_df in enumerate(splits_dfs):

        if split_df.count() == 0:
            continue

        query = create_sql(table_name, columns, key_columns, type)

        if type in ('UPDATE', 'INSERT'):
            split_df = split_df.select(columns + key_columns)
        elif type in ('DELETE'):
            split_df = split_df.select(key_columns)


        rdd = split_df.rdd
        data = rdd.map(tuple)
        data = data.collect()


        curr.executemany(query, data) 
        conn.commit() 

        print(f'Split: {split_index + 1}\\{splits}\
                Bulk Size: {split_df.count()}')
        split_df.unpersist()

    conn.close() 

df_execucao_fin = spark.read.format("jdbc")\
    .option("url", url)\
    .option("query", "select * from public.stg_projeto_investimento")\
    .option("driver", properties["driver"])\
    .option("user", properties["user"])\
    .option("password", properties["password"])\
    .load()

table_name = 'stg_projeto_investimento_teste'
connection_properties = {'db_name':database_dw, 'user':user_dw, 'password':password_dw, 'host':host_dw, 'port':port_dw}

#bulk_values(table_name, df_execucao_fin, 500, connection_properties, 'UPDATE' ,df_execucao_fin.columns, ['idunico'])
        

In [20]:
df_project_inv = spark.read.format("jdbc")\
    .option("url", url)\
    .option("query", "select * from public.stg_projeto_investimento")\
    .option("driver", properties["driver"])\
    .option("user", properties["user"])\
    .option("password", properties["password"])\
    .load().cache()

df_project_inv_teste = spark.read.format("jdbc")\
    .option("url", url)\
    .option("query", "select * from public.stg_projeto_investimento_teste")\
    .option("driver", properties["driver"])\
    .option("user", properties["user"])\
    .option("password", properties["password"])\
    .load().cache()

def crud_database(df_old, df_new, table_name, key_columns, connection_properties, bulk_size, insert = True, update = True, delete = False):
    columns_comparisson = df_new.columns
    columns_comparisson = [column for column in columns_comparisson if not column in key_columns] 

    if delete == True:
        df_delete = values_to_insert_delete(df_new, df_old, key_columns)
        print('Starting delete...')
        bulk_values(table_name, df_delete, bulk_size, connection_properties, 'DELETE' ,columns_comparisson, key_columns)
        print(f'Deleted {df_delete.count()} rows')
        df_delete.unpersist()

    if update == True:
        df_update = values_to_update(df_old, df_new, key_columns, columns_comparisson)
        print('Starting update...')
        bulk_values(table_name, df_update, bulk_size, connection_properties, 'UPDATE' ,columns_comparisson, key_columns)
        print(f'Updated {df_update.count()} rows')
        df_update.unpersist()

    if insert == True:
        df_insert = values_to_insert_delete(df_old, df_new, key_columns)
        print('Starting insert...')
        bulk_values(table_name, df_insert, bulk_size, connection_properties, 'INSERT' ,columns_comparisson, key_columns)
        print(f'Inserted {df_insert.count()} rows')
        df_insert.unpersist()


def crud_database_table(sql_old, sql_new, table_name, key_columns, connection_properties, bulk_size, insert = True, update = True, delete = False):

    url = f'jdbc:postgresql://{connection_properties['host']}:{connection_properties['port']}/{connection_properties['db_name']}'

    df_new = spark.read.format("jdbc")\
        .option("url", url)\
        .option("query", sql_new)\
        .option("driver", connection_properties['driver'])\
        .option("user", connection_properties['user'])\
        .option("password", connection_properties['password'])\
        .load().cache()

    df_old = spark.read.format("jdbc")\
        .option("url", url)\
        .option("query", sql_old)\
        .option("driver", connection_properties['driver'])\
        .option("user", connection_properties['user'])\
        .option("password", connection_properties['password'])\
        .load().cache()
    
    crud_database(df_old, df_new, table_name, key_columns, connection_properties, bulk_size, insert, update, delete)

    df_new.unpersist()
    df_old.unpersist()


connection_properties = {'db_name':database_dw, 'user':user_dw, 'password':password_dw, 'host':host_dw, 'port':port_dw, 'driver': "org.postgresql.Driver"}
table_name = 'public.stg_projeto_investimento_teste'
sql_new = 'select * from public.stg_projeto_investimento'
sql_old = 'select * from public.stg_projeto_investimento_teste'
key_columns = ['idunico']
bulk_size = 500

crud_database_table(sql_old, sql_new, table_name, key_columns, connection_properties, bulk_size, delete=True)


#columns_comparisson = df_project_inv_teste.columns
#columns_comparisson = [column for column in columns_comparisson if not column in key_columns] 

#crud_database(df_project_inv_teste, df_project_inv , table_name, key_columns, connection_properties, 500, delete=True)



    


#key_columns = ['idunico']
#columns_comparisson = df_project_inv.columns

#values_to_update(df_project_inv,df_project_inv_teste,key_columns,columns_comparisson).show()

#df_project_inv_teste.filter('idunico <=> "13414.41-30"').select('qdtempregosgerados').show()

#[column for column in columns_comparisson if not column in key_columns or key_columns.remove(column)] 

Starting delete...
Deleted 0 rows
Starting update...
Updated 0 rows
Starting insert...
Inserted 0 rows


In [37]:
def create_sql(table_name, columns, key_columns, type):
    sql = ''
    if type in ('DELETE'):
        columns_comparisson = [f'"{column}" = %s' for column in key_columns]
        columns_comparisson = ' and '.join(columns_comparisson)
        sql = f'DELETE FROM {table_name} WHERE {columns_comparisson}'
    elif type in ('INSERT'):
        columns_str = list(map(lambda x: '"' + x + '"',columns))
        columns_str = ', '.join(columns_str)
        placeholders = ', '.join(['%s'] * len(columns))
        sql = f'INSERT INTO {table_name}({columns_str}) VALUES({placeholders})'
    elif type in ('UPDATE'):
        columns_comparisson = [f'"{column}" = %s' for column in key_columns]
        columns_comparisson = ' and '.join(columns_comparisson)
        columns_set = [f'"{column}" = %s' for column in columns]
        columns_set = ', '.join(columns_set)
        sql = f'UPDATE {table_name} SET {columns_set} WHERE {columns_comparisson}'
    return sql



columns = ['descricao', 'nk_eixo']
key_columns = ['nk_eixo']

df_eixos.select(columns + key_columns).show()

print(create_sql('public.dim_eixos', columns, key_columns, 'UPDATE'))


+--------------+-------+-------+
|     descricao|nk_eixo|nk_eixo|
+--------------+-------+-------+
|       Militar|      2|      2|
|Administrativo|      1|      1|
|     Econômico|      3|      3|
|        Social|      4|      4|
+--------------+-------+-------+

UPDATE public.dim_eixos SET "descricao" = %s, "nk_eixo" = %s WHERE "nk_eixo" = %s


TypeError: unsupported operand type(s) for -: 'list' and 'list'

In [9]:
df_eixos = spark.read.format("jdbc")\
    .option("url", url)\
    .option("query", "select * from stg_projeto_investimento_eixos")\
    .option("driver", properties["driver"])\
    .option("user", properties["user"])\
    .option("password", properties["password"])\
    .load()

NameError: name 'data' is not defined

In [19]:
import  psycopg2 
  
#Method to create a connection object to the database. 
#It creates a pointer cursor to the database and returns it along with  Connection object 
def create_connection(properties, database_dw, host_dw, port_dw): 
      #Connect to the Postgresql database using the psycopg2 adapter. 
    #Pass your database name , username , password , hostname and port number 
    conn = psycopg2.connect(f"dbname='{database_dw}' user='{properties['user']}' password='{properties['password']}' host='{host_dw}' port='{port_dw}'") 
    #Get the cursor object from the connection object 
    curr = conn.cursor() 
    return conn,curr 

conn, curr = create_connection(properties, database_dw, host_dw, port_dw)

deptpoints_update_query = """INSERT INTO dim_eixos(nk_eixos,descricao) values(%s,%s)"""
# Pass the new values and update query to the executemany() method of Cursor 
curr.executemany(deptpoints_update_query, data)
conn.commit()
conn.close() 


In [25]:
from pyspark.sql.functions import when, explode_outer

def generate_dates(date, days):
    days_date = []
    date_before = date - timedelta(days=days)
    date_generated = [date_before + timedelta(days=x) for x in range(0, (date-date_before).days)]

    for d in date_generated:
        days_date.append(str(d.strftime('%Y-%m-%d')))
    return days_date

today = datetime.now()
dates = generate_dates(today, 30)

df = []
origins = []

for date in dates:
    origin_file = origin + f'/{date}.json'
    if os.path.isfile(origin_file) == True:
        origins.append(origin_file)

print(origins)

df_novo = spark.read.json(origins)
df_novo = df_novo.withColumn('nomeArquivo',substring(input_file_name(),-15,10))

sql_full = ''

ufs = ['RP','SCA']

tables = ['stg_projeto_investimento_eixos', 'stg_projeto_investimento_tomadores']
for table in tables:
    sql_full += f'''delete from {table} stg
                    using stg_projeto_investimento stg_project
                    where stg_project."idunico" = stg."idunico" and stg_project."uf" in ({",".join("'" + uf + "'"  for uf in ufs)});\n'''
print(sql_full)


def generate_sql_deletion_ufs(tables, ufs):
    sql_full = ''
    for table in tables:
        sql_full += f'''delete from {table} stg
                        using stg_projeto_investimento stg_project
                        where stg_project."idunico" = stg."idunico" and stg_project.uf in ({",".join( "'" + uf + "'" for uf in ufs)});\n'''
    return sql_full

conn = jaydebeapi.connect(driver, url, [user_dw, password_dw], '/home/adriano/Documentos/airflow/jdbc/postgresql-42.7.4.jar')
curs = conn.cursor()
curs.execute(sql_full)
curs.execute(f'delete from stg_projeto_investimento where uf in ({",".join("'" + uf + "'"  for uf in ufs)});')

#print( '(' + ",".join(uf for uf in ufs) + ')')

#df += df_novo

#df_tomadores = df.withColumn("tomadores1", explode(col("tomadores"))).count()
#df_tomadores
len(origins)

['/home/adriano/Documentos/airflow/database/dest/bronze/projeto_investimento/date/2024-09-20.json', '/home/adriano/Documentos/airflow/database/dest/bronze/projeto_investimento/date/2024-09-21.json', '/home/adriano/Documentos/airflow/database/dest/bronze/projeto_investimento/date/2024-09-22.json', '/home/adriano/Documentos/airflow/database/dest/bronze/projeto_investimento/date/2024-09-23.json', '/home/adriano/Documentos/airflow/database/dest/bronze/projeto_investimento/date/2024-09-24.json', '/home/adriano/Documentos/airflow/database/dest/bronze/projeto_investimento/date/2024-09-25.json', '/home/adriano/Documentos/airflow/database/dest/bronze/projeto_investimento/date/2024-09-26.json', '/home/adriano/Documentos/airflow/database/dest/bronze/projeto_investimento/date/2024-09-27.json', '/home/adriano/Documentos/airflow/database/dest/bronze/projeto_investimento/date/2024-09-28.json', '/home/adriano/Documentos/airflow/database/dest/bronze/projeto_investimento/date/2024-09-29.json', '/home/ad

29

In [7]:
import jaydebeapi
import os
from dotenv import load_dotenv
from pathlib import Path

dotenv_path = Path('/home/adriano/Documentos/airflow/.env')
load_dotenv(dotenv_path=dotenv_path)

user_dw = os.getenv('USER_DW')
password_dw = os.getenv('PASSWORD_DW')
host_dw = os.getenv('HOST_DW')
database_dw = os.getenv('DATABASE_DW')
port_dw = os.getenv('PORT_DW')
driver = "org.postgresql.Driver"

mode = 'append'
url = f'jdbc:postgresql://{host_dw}:{port_dw}/{database_dw}'
properties = {"user": user_dw, "password": password_dw, "driver": driver}
#conn = jaydebeapi.connect(driver, url, [user_dw, password_dw], '/home/adriano/Documentos/airflow/jdbc/postgresql-42.7.4.jar')
#df.write.jdbc(url=url, table='stg_execucao_financeira', mode=mode, properties=properties)





In [8]:
#curs = conn.cursor()
#curs.execute(f'''delete from stg_execucao_financeira where "nomeArquivo" = '{ano_final}' ''')

In [10]:
import requests
import time
import json

def extract_data_api(url_base, endpoint, uf, dest_path, page_size = 100, errors_limit = -1, errors_consecutives_limit = 5, executions_limit = 200):
    success = False
    page = 0
    errors_consecutives = 0
    errors = 0
    executions = 0
    method = "GET"
    dest_path_file = dest_path + '/' + str(uf) + '.json'

    Path(dest_path).mkdir(parents=True, exist_ok=True)

    if os.path.isfile(dest_path_file) == True:
        os.remove(dest_path_file)

    content_all = []

    while success == False and (errors_consecutives < errors_consecutives_limit or errors_consecutives_limit == -1) and (errors < errors_limit or errors_limit == -1) and (executions < executions_limit or executions_limit == -1):
        url = url_base + endpoint + '?' + 'pagina=' + str(page) + '&' + 'tamanhoDaPagina=' + str(page_size) + '&' + 'uf=' + str(uf)
        response = requests.request(method, url)
        if response.status_code == 200:
            if(len(response.json()["content"]) == 0):
                success = True 
            else:
                page += 1
                errors_consecutives = 0
                executions += 1
                content_all += response.json()["content"]
                #with open(dest_path_file, 'w') as f:
                #    json.dump(response.json()["content"],f)
        else:
            errors_consecutives += 1
            errors += 1
            executions += 1
            if response.status_code == 429:
                time.sleep(1)
        print(f'Status Code: {response.status_code}\n'
            f'Executions: {executions}\n'
            f'Pages: {page}\n' 
            f'N° Registers: {len(content_all)}\n'
            f'Errors: {errors}\n'
            f'Errors Consecutives: {errors_consecutives}\n')
        time.sleep(1)

    if success == True:
        print('Execution Finished with success!')
    else:
        print('Execution Finished with error!')
        if errors_consecutives < errors_consecutives_limit:
            raise Exception("Number of consecutives errors exceeded")
        elif errors < errors_limit:
            raise Exception("Number of total errors exceeded")
        elif executions < executions_limit:
            raise Exception("Number of executions exceeded")

    with open(dest_path_file, 'w', encoding='utf-8') as f:
        json.dump(content_all, f)
    f.close()


page_size = 100
url_base = "https://api.obrasgov.gestao.gov.br"
endpoint = "/obrasgov/api/projeto-investimento"
current_year = datetime.now().year
initial_year = int(current_year)
final_year = int(current_year)
dest_path = '/home/adriano/Documentos/airflow/database/dest/bronze/execucao-financeira'
errors_limit = - 1
errors_consecutives_limit = 5
executions_limit = -1

extract_data_api(url_base, endpoint, 'PR', dest_path, page_size, errors_limit, errors_consecutives_limit, executions_limit)

Status Code: 500
Executions: 1
Pages: 0
N° Registers: 0
Errors: 1
Errors Consecutives: 1

Status Code: 200
Executions: 2
Pages: 1
N° Registers: 99
Errors: 1
Errors Consecutives: 0

Status Code: 200
Executions: 3
Pages: 2
N° Registers: 198
Errors: 1
Errors Consecutives: 0

Status Code: 200
Executions: 4
Pages: 3
N° Registers: 298
Errors: 1
Errors Consecutives: 0

Status Code: 200
Executions: 5
Pages: 4
N° Registers: 398
Errors: 1
Errors Consecutives: 0

Status Code: 200
Executions: 6
Pages: 5
N° Registers: 497
Errors: 1
Errors Consecutives: 0

Status Code: 200
Executions: 7
Pages: 6
N° Registers: 597
Errors: 1
Errors Consecutives: 0

Status Code: 200
Executions: 8
Pages: 7
N° Registers: 696
Errors: 1
Errors Consecutives: 0

Status Code: 200
Executions: 9
Pages: 8
N° Registers: 796
Errors: 1
Errors Consecutives: 0

Status Code: 200
Executions: 10
Pages: 9
N° Registers: 896
Errors: 1
Errors Consecutives: 0

Status Code: 200
Executions: 11
Pages: 10
N° Registers: 994
Errors: 1
Errors Conse

In [11]:
from datetime import datetime, date, timedelta
today = datetime.today()
days_before = today - timedelta(days=2)


date_generated = [days_before + timedelta(days=x) for x in range(0, (today-days_before).days)]

for d in date_generated:
    print(d)

2024-10-11 10:22:48.977500
2024-10-12 10:22:48.977500


In [None]:
def extract_api(url_base, endpoint, param, method, path_dest_env, fun_api, page_size, errors_limit, errors_consecutives_limit, executions_limit):
    success = False
    page = 0
    errors_consecutives = 0
    errors = 0
    executions = 0
    dest_path = os.getenv(path_dest_env)
    dest_path_file = dest_path + '/' + str(param) + '.json'

    Path(dest_path).mkdir(parents=True, exist_ok=True)

    if os.path.isfile(dest_path_file) == True:
        os.remove(dest_path_file)

    content_all = []

    while success == False and (errors_consecutives < errors_consecutives_limit or errors_consecutives_limit == -1) and (errors < errors_limit or errors_limit == -1) and (executions < executions_limit or executions_limit == -1):
        success, content,  page, errors_consecutives, errors, executions, status_code =  fun_api(url_base, endpoint, param, method, page_size, page, errors_consecutives, errors, executions)

        if content is not None:
            content_all.append(content)

        print(f'Status Code: {status_code}\n'
            f'Executions: {executions}\n'
            f'Pages: {page}\n' 
            f'N° Registers: {len(content_all)}\n'
            f'Errors: {errors}\n'
            f'Errors Consecutives: {errors_consecutives}\n')
        time.sleep(1)

    if success == True:
        print('Execution Finished with success!')
    else:
        print('Execution Finished with error!')
        if errors_consecutives < errors_consecutives_limit:
            raise Exception("Number of consecutives errors exceeded")
        elif errors < errors_limit:
            raise Exception("Number of total errors exceeded")
        elif executions < executions_limit:
            raise Exception("Number of executions exceeded")

    with open(dest_path_file, 'w', encoding='utf-8') as f:
        json.dump(content_all, f)
    f.close()

def extract_api_projeto_investimento_uf(url_base, endpoint, param, method, page_size, page, errors_consecutives, errors, executions):
    params = {"pagina": str(page), "tamanhoDaPagina": str(page_size), "uf": str(param)}
    url = generate_url(url_base, endpoint, params)
    response = requests.request(method, url)
    if response.status_code == 200:
        if(len(response.json()["content"]) == 0):
            success = True
            return success, None,  page, errors_consecutives, errors, executions, response.status_code
        else:
            page += 1
            errors_consecutives = 0
            executions += 1
            content = response.json()["content"]
            return success, content,  page, errors_consecutives, errors, executions, response.status_code
    else:
        errors_consecutives += 1
        errors += 1
        executions += 1
        if response.status_code == 429:
            time.sleep(1)
        return success, None,  page, errors_consecutives, errors, executions, response.status_code

@task()
def extract_data_api_project(url_base, endpoint, date, page_size = 100, errors_limit = -1, errors_consecutives_limit = 5, executions_limit = 200):
    method = "GET"
    path_dest_env = "PATH_DEST_PROJETO_INVESTIMENTO"
    extract_api(url_base, endpoint, date, method, path_dest_env, extract_api_projeto_investimento_uf, page_size, errors_limit, errors_consecutives_limit, executions_limit)

def extract_api_execucao_financeira_year(url_base, endpoint, year, method, page_size, page, errors_consecutives, errors, executions):
    params = {"pagina": str(page), "tamanhoDaPagina": str(page_size), "anoFinal": str(year), "anoInicial": str(year)}
    url = generate_url(url_base, endpoint, params)
    success = False
    response = requests.request(method, url)
    if response.status_code == 200:
        page += 1
        errors_consecutives = 0
        executions += 1
        content = response.json()["content"]
        return success, content,  page, errors_consecutives, errors, executions, response.status_code
    elif response.status_code == 404:
        success = True
        return success, None,  page, errors_consecutives, errors, executions, response.status_code 
    else:
        errors_consecutives += 1
        errors += 1
        executions += 1
        if response.status_code == 429:
            time.sleep(1)
        return success, None,  page, errors_consecutives, errors, executions, response.status_code

In [15]:
def generate_dates_reprocessing(date, days):
    days_date = []
    date_before = date - timedelta(days=days)
    date_generated = [date_before + timedelta(days=x) for x in range(0, (date-date_before).days)]

    for d in date_generated:
        days_date.append(str(d.strftime('%Y-%m-%d')))
        print(d.strftime('%Y-%m-%d'))
    return days_date

generate_dates_reprocessing(today, 30)

2024-09-13
2024-09-14
2024-09-15
2024-09-16
2024-09-17
2024-09-18
2024-09-19
2024-09-20
2024-09-21
2024-09-22
2024-09-23
2024-09-24
2024-09-25
2024-09-26
2024-09-27
2024-09-28
2024-09-29
2024-09-30
2024-10-01
2024-10-02
2024-10-03
2024-10-04
2024-10-05
2024-10-06
2024-10-07
2024-10-08
2024-10-09
2024-10-10
2024-10-11
2024-10-12


['2024-09-13',
 '2024-09-14',
 '2024-09-15',
 '2024-09-16',
 '2024-09-17',
 '2024-09-18',
 '2024-09-19',
 '2024-09-20',
 '2024-09-21',
 '2024-09-22',
 '2024-09-23',
 '2024-09-24',
 '2024-09-25',
 '2024-09-26',
 '2024-09-27',
 '2024-09-28',
 '2024-09-29',
 '2024-09-30',
 '2024-10-01',
 '2024-10-02',
 '2024-10-03',
 '2024-10-04',
 '2024-10-05',
 '2024-10-06',
 '2024-10-07',
 '2024-10-08',
 '2024-10-09',
 '2024-10-10',
 '2024-10-11',
 '2024-10-12']