In [1]:
import psycopg2
import os

In [2]:
# Informações de conexão com o PostgreSQL
DB_HOST = "airflow-postgres"
DB_PORT = "5432"
DB_USER = os.getenv('POSTGRES_USER')
DB_PASSWORD = os.getenv('POSTGRES_PASSWORD')
DB_NAME = "azurecost"

conn = None

In [3]:
try:
    # Conecta-se ao banco de dados 'postgres' para criar o banco 'azurecost'
    conn = psycopg2.connect(
        host=DB_HOST,
        port=DB_PORT,
        database="postgres",
        user=DB_USER,
        password=DB_PASSWORD
    )
    conn.autocommit = True
    cur = conn.cursor()

    # Verifica e cria o banco de dados 'azurecost' se não existir
    cur.execute(f"SELECT 1 FROM pg_database WHERE datname='{DB_NAME}'")
    exists = cur.fetchone()
    if not exists:
        cur.execute(f"CREATE DATABASE {DB_NAME}")
        print(f"Banco de dados '{DB_NAME}' criado com sucesso.")
    else:
        print(f"Banco de dados '{DB_NAME}' já existe.")

    cur.close()
    conn.close()

except psycopg2.Error as e:
    print(f"Erro ao conectar ou criar o banco de dados: {e}")
finally:
    if conn:
        conn.close()

Banco de dados 'azurecost' criado com sucesso.


In [4]:
try:

    # Conecta-se ao banco de dados 'azurecost' para criar a tabela
    conn = psycopg2.connect(
        host=DB_HOST,
        port=DB_PORT,
        database=DB_NAME,
        user=DB_USER,
        password=DB_PASSWORD
    )
    conn.autocommit = True
    cur = conn.cursor()

    # Comando SQL para criar a tabela 'resources'
    create_table_query = """
    CREATE TABLE IF NOT EXISTS resources (
        Id SERIAL PRIMARY KEY,
        ResourceName VARCHAR,
        SubscriptionId VARCHAR,
        ResourceGroup VARCHAR,
        Provider VARCHAR,
        StatusRecourse VARCHAR,
        Currency VARCHAR,
        TendenciaCusto VARCHAR
    );
    """

    cur.execute(create_table_query)
    conn.commit()
    print(f"Tabela 'resources' criada com sucesso no banco de dados '{DB_NAME}'.")

    cur.close()

except psycopg2.Error as e:
    print(f"Erro ao criar tabela: {e}")
finally:
    if conn:
        conn.close()

Tabela 'resources' criada com sucesso no banco de dados 'azurecost'.


In [5]:
try:

    # Conecta-se ao banco de dados 'azurecost' para criar a tabela
    conn = psycopg2.connect(
        host=DB_HOST,
        port=DB_PORT,
        database=DB_NAME,
        user=DB_USER,
        password=DB_PASSWORD
    )
    conn.autocommit = True
    cur = conn.cursor()

    # Comando SQL para criar a tabela 'costresources'
    create_table_query = """
    CREATE TABLE IF NOT EXISTS costresources (
        Id SERIAL,
        PreTaxCost DOUBLE PRECISION,
        Pct_Change DOUBLE PRECISION,
        PrevisaoProxima DOUBLE PRECISION,
        UsageDate TIMESTAMP
    );
    """

    cur.execute(create_table_query)
    conn.commit()
    print(f"Tabela 'costresources' criada com sucesso no banco de dados '{DB_NAME}'.")

    cur.close()

except psycopg2.Error as e:
    print(f"Erro ao criar tabela: {e}")
finally:
    if conn:
        conn.close()

Tabela 'costresources' criada com sucesso no banco de dados 'azurecost'.


In [6]:
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import max, col,row_number, concat_ws
from pyspark.sql.window import Window
import os

DELTA_LAKE_PACKAGE = "io.delta:delta-core_2.12:3.3.2"

spark = SparkSession.builder \
    .appName("PySpark Delta Lake MinIO Save") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", os.getenv("KEY_ACCESS")) \
    .config("spark.hadoop.fs.s3a.secret.key", os.getenv("KEY_SECRETS")) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .getOrCreate()


# Caminho para a tabela Delta (no seu MinIO)
gold_path = "s3a://azurecost/gold"

# Lê os dados somente da última partição
df = spark.read.format("delta").load(gold_path)

df_id = df.withColumn('concat', concat_ws("_", col("ResourceName"), col("SubscriptionId"), col("ResourceGroup")))

df_id = df_id.select(
    "concat",
    "ResourceName",
    "SubscriptionId",
    "ResourceGroup"
).distinct()

window_spec = Window.orderBy("concat")

df_id = df_id.withColumn("Id", row_number().over(window_spec))

df_id = df_id.select(
    "concat",
    "Id"
).distinct()

df_id.show()

+--------------------+---+
|              concat| Id|
+--------------------+---+
|appfunckabum_da48...|  1|
|appfuncmagalu_da4...|  2|
|dbstorage7ifgyhji...|  3|
|nintendoservplan_...|  4|
|nintendostorageac...|  5|
+--------------------+---+



In [7]:
df = df.select(
    "ResourceName",
    "SubscriptionId",
    "ResourceGroup",
    "Provider",
    "StatusRecourse",
    "PreTaxCost",
    "Pct_Change",
    "Currency",
    "UsageDate",
    "TendenciaCusto",
    "PrevisaoProxima"
)

latest_dates_df = df.groupBy("ResourceName").agg(max("UsageDate").alias("MaxUsageDate"))

window_spec = Window.partitionBy("ResourceName").orderBy(col("UsageDate").desc())
df_with_rank = df.withColumn("rank", row_number().over(window_spec))

df_latest_per_resource = df_with_rank.filter(col("rank") == 1).drop("rank")

df_resources = df_latest_per_resource.select(
    "ResourceName",
    "SubscriptionId",
    "ResourceGroup",
    "Provider",
    "StatusRecourse",
    "Currency",
    "TendenciaCusto"
)

df_resources = df_resources.withColumn('concat', concat_ws("_", col("ResourceName"), col("SubscriptionId"), col("ResourceGroup")))

df_resources = df_resources.join(df_id, on="concat", how="left").drop("concat")

df_resources = df_resources.select(
    "Id",
    "ResourceName",
    "SubscriptionId",
    "ResourceGroup",
    "Provider",
    "StatusRecourse",
    "Currency",
    "TendenciaCusto"
)

df_resources.show(truncate=False)

+---+----------------------+------------------------------------+-------------------------------------+-----------------+--------------+--------+--------------+
|Id |ResourceName          |SubscriptionId                      |ResourceGroup                        |Provider         |StatusRecourse|Currency|TendenciaCusto|
+---+----------------------+------------------------------------+-------------------------------------+-----------------+--------------+--------+--------------+
|1  |appfunckabum          |da483b95-1caf-404c-bfe4-36abef87f6e6|nintendoproject                      |microsoft.web    |Ativo         |BRL     |Estável       |
|2  |appfuncmagalu         |da483b95-1caf-404c-bfe4-36abef87f6e6|nintendoproject                      |microsoft.web    |Ativo         |BRL     |Estável       |
|3  |dbstorage7ifgyhjijpdgi|da483b95-1caf-404c-bfe4-36abef87f6e6|nintendodatabrickswi86no-workspace-rg|microsoft.storage|Ativo         |BRL     |Estável       |
|4  |nintendoservplan      |da483b

In [8]:
jdbc_url = f"jdbc:postgresql://{DB_HOST}:{DB_PORT}/{DB_NAME}"

properties = {
    "user": DB_USER,
    "password": DB_PASSWORD,
    "driver": "org.postgresql.Driver"
}

df_resources.write \
  .format("jdbc") \
  .option("url", jdbc_url) \
  .option("dbtable", "resources") \
  .option("user", DB_USER) \
  .option("password", DB_PASSWORD) \
  .option("driver", "org.postgresql.Driver") \
  .mode("overwrite") \
  .save()

print("Dados inseridos com sucesso na tabela 'resources'.")  



Dados inseridos com sucesso na tabela 'resources'.


In [9]:
df = df.select(
    "ResourceName",
    "SubscriptionId",
    "ResourceGroup",
    "Provider",
    "StatusRecourse",
    "PreTaxCost",
    "Pct_Change",
    "Currency",
    "UsageDate",
    "TendenciaCusto",
    "PrevisaoProxima"
)

df = df.withColumn('concat', concat_ws("_", col("ResourceName"), col("SubscriptionId"), col("ResourceGroup")))

df_prices = df.join(df_id, on="concat", how="left").drop("concat").drop("concat")

df_prices = df_prices.select(
    "Id",
    "PreTaxCost",
    "Pct_Change",
    "PrevisaoProxima",
    "UsageDate"
)

df_prices.show(truncate=False)

+---+----------+----------+---------------+-------------------+
|Id |PreTaxCost|Pct_Change|PrevisaoProxima|UsageDate          |
+---+----------+----------+---------------+-------------------+
|1  |0.0       |0.0       |0.0            |2025-08-01 16:00:00|
|1  |0.0       |0.0       |0.0            |2025-08-01 15:55:00|
|1  |0.0       |0.0       |0.0            |2025-08-01 15:50:00|
|1  |0.0       |0.0       |0.0            |2025-08-01 15:45:00|
|1  |0.0       |0.0       |0.0            |2025-08-01 15:40:00|
|1  |0.0       |0.0       |0.0            |2025-08-01 15:35:00|
|1  |0.0       |0.0       |0.0            |2025-08-01 15:30:00|
|1  |0.0       |0.0       |0.0            |2025-08-01 15:25:00|
|1  |0.0       |0.0       |0.0            |2025-08-01 15:20:00|
|1  |0.0       |0.0       |0.0            |2025-08-01 15:15:00|
|1  |0.0       |0.0       |0.0            |2025-08-01 15:10:00|
|1  |0.0       |0.0       |0.0            |2025-08-01 15:05:00|
|1  |0.0       |0.0       |0.0          

In [10]:
jdbc_url = f"jdbc:postgresql://{DB_HOST}:{DB_PORT}/{DB_NAME}"

properties = {
    "user": DB_USER,
    "password": DB_PASSWORD,
    "driver": "org.postgresql.Driver"
}

df_prices.write \
  .format("jdbc") \
  .option("url", jdbc_url) \
  .option("dbtable", "costresources") \
  .option("user", DB_USER) \
  .option("password", DB_PASSWORD) \
  .option("driver", "org.postgresql.Driver") \
  .mode("overwrite") \
  .save()

print("Dados inseridos com sucesso na tabela 'costresources'.")  

Dados inseridos com sucesso na tabela 'costresources'.


## Para visualizações do banco de dados

In [12]:
import psycopg2
import os

DB_HOST = "airflow-postgres"
DB_PORT = "5432"
DB_USER = os.getenv('POSTGRES_USER')
DB_PASSWORD = os.getenv('POSTGRES_PASSWORD')

# Abre conexão com o banco padrão
conn = psycopg2.connect(
    host=DB_HOST,
    port=DB_PORT,
    user=DB_USER,
    password=DB_PASSWORD,
    database="postgres"
)
cur = conn.cursor()

# Executa a consulta para listar os bancos de dados
cur.execute("SELECT datname FROM pg_database;")
databases = cur.fetchall()

print("Bancos de dados disponíveis:")
for db in databases:
    print(db[0])

cur.close()
conn.close()

Bancos de dados disponíveis:
postgres
airflow
template1
template0
azurecost


In [None]:
import psycopg2
import os

DB_HOST = "airflow-postgres"
DB_PORT = "5432"
DB_USER = os.getenv('POSTGRES_USER')
DB_PASSWORD = os.getenv('POSTGRES_PASSWORD')
DB_NAME = "airflow"

conn = psycopg2.connect(
    host=DB_HOST,
    port=DB_PORT,
    user=DB_USER,
    password=DB_PASSWORD,
    database=DB_NAME
)

cur = conn.cursor()

# Query para listar tabelas do schema 'public'
cur.execute("""
    SELECT table_name 
    FROM information_schema.tables 
    WHERE table_schema = 'public'
    AND table_type = 'BASE TABLE';
""")

tables = cur.fetchall()

print(f"Tabelas no banco '{DB_NAME}':")
for table in tables:
    print(table[0])

cur.close()
conn.close()


Tabelas no banco 'airflow':
log
dag_priority_parsing_request
job
slot_pool
callback_request
dag_code
dag_pickle
ab_user
ab_register_user
connection
sla_miss
variable
import_error
serialized_dag
dataset_alias
dataset_alias_dataset
dataset
dataset_alias_dataset_event
dataset_event
dag_schedule_dataset_alias_reference
dag
dag_schedule_dataset_reference
task_outlet_dataset_reference
dataset_dag_run_queue
log_template
dag_run
dag_tag
dag_owner_attributes
ab_permission
ab_permission_view
ab_view_menu
ab_user_role
ab_role
dagrun_dataset_event
trigger
task_instance
dag_run_note
ab_permission_view_role
rendered_task_instance_fields
task_fail
task_map
task_reschedule
xcom
task_instance_note
task_instance_history
session
alembic_version


In [None]:
import psycopg2
import os

DB_HOST = "airflow-postgres"
DB_PORT = "5432"
DB_USER = os.getenv('POSTGRES_USER')
DB_PASSWORD = os.getenv('POSTGRES_PASSWORD')
DB_NAME = "airflow" 

table_name = 'dag'

conn = psycopg2.connect(
    host=DB_HOST,
    port=DB_PORT,
    user=DB_USER,
    password=DB_PASSWORD,
    database=DB_NAME
)

cur = conn.cursor()

cur.execute("""
    SELECT column_name, data_type, is_nullable, column_default
    FROM information_schema.columns
    WHERE table_schema = 'public'
    AND table_name = %s
    ORDER BY ordinal_position;
""", (table_name,))

columns = cur.fetchall()

print(f"Colunas da tabela '{table_name}':")
for col in columns:
    print(f"Nome: {col[0]}, Tipo: {col[1]}, Pode ser NULL: {col[2]}, Default: {col[3]}")

cur.close()
conn.close()


Colunas da tabela 'dag':
Nome: dag_id, Tipo: character varying, Pode ser NULL: NO, Default: None
Nome: root_dag_id, Tipo: character varying, Pode ser NULL: YES, Default: None
Nome: is_paused, Tipo: boolean, Pode ser NULL: YES, Default: None
Nome: is_subdag, Tipo: boolean, Pode ser NULL: YES, Default: None
Nome: is_active, Tipo: boolean, Pode ser NULL: YES, Default: None
Nome: last_parsed_time, Tipo: timestamp with time zone, Pode ser NULL: YES, Default: None
Nome: last_pickled, Tipo: timestamp with time zone, Pode ser NULL: YES, Default: None
Nome: last_expired, Tipo: timestamp with time zone, Pode ser NULL: YES, Default: None
Nome: scheduler_lock, Tipo: boolean, Pode ser NULL: YES, Default: None
Nome: pickle_id, Tipo: integer, Pode ser NULL: YES, Default: None
Nome: fileloc, Tipo: character varying, Pode ser NULL: YES, Default: None
Nome: processor_subdir, Tipo: character varying, Pode ser NULL: YES, Default: None
Nome: owners, Tipo: character varying, Pode ser NULL: YES, Default: Non

In [7]:
try:
    conn = psycopg2.connect(
        host=DB_HOST,
        port=DB_PORT,
        database=DB_NAME,
        user=DB_USER,
        password=DB_PASSWORD
    )
    conn.autocommit = True
    cur = conn.cursor()

    drop_table_query = "DROP TABLE IF EXISTS costresources;"
    cur.execute(drop_table_query)
    print(f"Tabela 'costresources' excluída com sucesso.")

    cur.close()

except psycopg2.Error as e:
    print(f"Erro ao excluir a tabela: {e}")

finally:
    if conn:
        conn.close()

Tabela 'costresources' excluída com sucesso.


In [16]:
try:
    conn = psycopg2.connect(
        host=DB_HOST,
        port=DB_PORT,
        database="postgres",  # banco diferente do que será dropado
        user=DB_USER,
        password=DB_PASSWORD
    )
    conn.autocommit = True
    cur = conn.cursor()

    # Fecha conexões ativas no banco azurecost
    cur.execute("""
        SELECT pg_terminate_backend(pid) 
        FROM pg_stat_activity 
        WHERE datname = %s;
    """, (DB_NAME,))

    # Exclui o banco azurecost
    cur.execute(f"DROP DATABASE IF EXISTS {DB_NAME};")
    print(f"Banco de dados '{DB_NAME}' excluído com sucesso.")

    cur.close()

except psycopg2.Error as e:
    print(f"Erro ao excluir o banco de dados: {e}")

finally:
    if conn:
        conn.close()


Banco de dados 'azurecost' excluído com sucesso.
