In [1]:
import psycopg as psy
import os
from dotenv import load_dotenv
import re
from tabulate import tabulate
from psycopg.rows import dict_row
import pandas as pd

In [2]:
load_dotenv()

DB_NAME = os.getenv('DB_NAME')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_HOST = os.getenv('DB_HOST')

conn_info = f"dbname={DB_NAME} user={DB_USER} password={DB_PASSWORD} host={DB_HOST}"
try:
   with psy.connect(conn_info) as conn:
       print("Connected to database")
       cur = conn.cursor()
       cur.execute("SELECT version();")
       record = cur.fetchone()
       print("You are connected to - ", record)
except Exception as e:
   print(f"Error: {e}")

Connected to database
You are connected to -  ('PostgreSQL 18.0 on x86_64-windows, compiled by msvc-19.44.35217, 64-bit',)


In [3]:
connection_string = os.getenv('DB_POOL_URL')
print(connection_string)

postgresql+psycopg2://postgres:ufsc2025@localhost:5432/smart_city_os


In [8]:
def create_tables(conn_info, schema='public', sql_file="../sql/create_tables.sql"):
    try:
        with psy.connect(conn_info) as conn:
            with conn.cursor() as cur:

                # Tabelas existentes antes
                cur.execute("""
                    SELECT tablename
                    FROM pg_tables
                    WHERE schemaname = '{}'
                """.format(schema))
                before_tables = {row[0] for row in cur.fetchall()}

                # Executa o script
                with open(sql_file, "r", encoding="utf-8") as f:
                    cur.execute(f.read())

                conn.commit()

                # Tabelas existentes depois
                cur.execute("""
                    SELECT tablename
                    FROM pg_tables
                    WHERE schemaname = '{}'
                """.format(schema))
                after_tables = {row[0] for row in cur.fetchall()}

                # Apenas as novas
                created_tables = sorted(after_tables - before_tables)

                if created_tables:
                    if len(created_tables) == 1:
                        print(f'Table created: {created_tables[0]}')
                    else:
                        print(f'Tables created: {", ".join(created_tables)}')
                else:
                    print("No new tables were created")

    except Exception as e:
        print(f"Error creating tables: {e}")
        import traceback
        traceback.print_exc()

create_tables(conn_info)

Table created: app_user


In [None]:
def insert_test_data(conn_info,schema='public'):
    """
    Insert test data into the SmartCityOS database
    """
    try:
        with psy.connect(conn_info) as conn:
            with conn.cursor() as cur:
                # TODO: Implement test data insertion
                # Example: insert a test app_user
                cur.execute("""
                    INSERT INTO {}.app_user (first_name, last_name, cpf, birth_date, email, phone, address, username, password_hash)
                    VALUES ('Test', 'User', '12345678901', '1990-01-01', 'test@example.com', '1234567890', 'Test Address', 'test_user', 'test_hash')
                    RETURNING id, first_name, email
                """.format(schema))
                user_id, first_name, email = cur.fetchone()
                print(f"Inserted test user with ID: {user_id}, Name: {first_name}, Email: {email}")
                conn.commit()
    except Exception as e:
        print(f"Error inserting test data: {e}")
        raise

insert_test_data(conn_info)

In [7]:
def drop_especific_tables(conn_info, table_names=[],schema="public"):
    try:
        with psy.connect(conn_info) as conn:
            with conn.cursor() as cur:
                for table_name in table_names:
                    cur.execute("DROP TABLE IF EXISTS {}.{} CASCADE".format(schema,table_name))
                conn.commit()
    except Exception as e:
        print(f"Error dropping table: {e}")

tables = ['app_user']
drop_especific_tables(conn_info, tables)

In [4]:
def table_names_from_sql(file_path="../sql/create_tables.sql"):
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            sql = f.read()

        pattern = re.compile(
            r'CREATE\s+TABLE\s+IF\s+NOT\s+EXISTS\s+("?[\w]+"?)',
            re.IGNORECASE
        )
        tables = pattern.findall(sql)
        tables = [t.replace('"', '') for t in tables]
        return tables
    except Exception as e:
        print(f"Erro ao ler o arquivo SQL: {e}")
        return None

print(table_names_from_sql())

['notification', 'payment_method', 'app_user', 'citizen', 'sensor', 'vehicle', 'reading', 'vehicle_citizen', 'traffic_incident', 'fine', 'fine_payment', 'app_user_notification', 'audit_log']


In [5]:
def drop_tables(conn_info, table_names):
    try:
      with psy.connect(conn_info) as conn:
         with conn.cursor() as cur:
            table_names = ', '.join(table_names)
            query = f'DROP TABLE IF EXISTS {table_names} CASCADE;'
            cur.execute(query)
            conn.commit()
    except Exception as e:
      print(f"Error: {e}")
      conn.rollback()


tables = table_names_from_sql()

print(f'Dropped table(s):')
for name in reversed(tables):
    drop_tables(conn_info, [name])
    print(f"{name}")
print()
print(f'Dropped {len(tables)} tables successfully')

Dropped table(s):
audit_log
app_user_notification
fine_payment
fine
traffic_incident
vehicle_citizen
reading
vehicle
sensor
citizen
app_user
payment_method
notification

Dropped 13 tables successfully


In [None]:
drop_tables(conn_info, ['notification'])
print("Dropped notification table")

In [None]:
def query_db(conn_info, query):
    query_upper = query.strip().upper()
    if not query_upper.startswith("SELECT"):
        raise ValueError("Only SELECT queries are allowed")
    with psy.connect(conn_info, row_factory=dict_row) as conn:
        with conn.cursor() as cur:
            cur.execute(query)
            rows = cur.fetchall()
            return rows
query = "SELECT * FROM app_user;"
try:
    result = query_db(conn_info, query)
except Exception as e:
    print(f"Error: {e}")
    result = []
print(result)

print(tabulate(result, headers="keys", tablefmt="github"))

In [None]:
query = "SELECT * FROM audit_log;"
df_log = pd.read_sql_query(query, connection_string)
df_log

In [None]:
def create_all_trigger(conn_info,file_path_func='../sql/create_triggers.sql',file_path_trig='../sql/create_triggers.sql'):
    """
    Create database triggers for Smart City OS
    """
    try:
        with psy.connect(conn_info, row_factory=dict_row) as conn:
            with conn.cursor() as cur:
                # Read and execute trigger_functions.sql
                with open(file_path_func, 'r') as f:
                    trigger_functions_sql = f.read()
                
                # Read and execute triggers.sql
                with open(file_path_trig, 'r') as f: 
                    triggers_sql = f.read()
                
                # Execute the SQL statements
                cur.execute(trigger_functions_sql)
                cur.execute(triggers_sql)
        
        print("Database triggers created successfully")
    except Exception as e:
        print(f"Error: {e}")
        conn.rollback()


create_all_trigger(conn_info)

In [None]:
def create_all_indexes(conn_info, file_path='../sql/index.sql'):
    """
    Create database indexes for Smart City OS
    """
    try:
        with psy.connect(conn_info, row_factory=dict_row) as conn:
            with conn.cursor() as cur:
                # Read and execute trigger_functions.sql
                with open(file_path, 'r') as f:
                    indexes = f.read()
                
                cur.execute(indexes)
    
        print("Database indexes created successfully")

    except Exception as e:
        print(f"Error: {e}")
        conn.rollback()

create_all_indexes(conn_info)