In [1]:
# ETAPA 1: Importação de bibliotecas
import pandas as pd
from sqlalchemy import create_engine, text
from hdfs import InsecureClient
import logging

In [2]:
# Configuração de logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

In [3]:
# ETAPA 2: Configurações
hdfs_url = 'http://master:9870'
hdfs_user = 'alexandre'
hdfs_dir = 'sisagua/'

In [4]:
# Configurações do PostgreSQL
pg_host = 'localhost'
pg_database = 'sisagua'
pg_user = 'postgres'
pg_password = '1234'
pg_port = '5433'
table_name = 'dados_sisagua_transformados'

In [6]:
def test_postgres_connection():
    """Testa a conexão com o PostgreSQL"""
    try:
        engine = create_engine(
            f'postgresql://{pg_user}:{pg_password}@{pg_host}:{pg_port}/{pg_database}'
        )
        with engine.connect() as conn:
            conn.execute(text("SELECT 1"))
        logger.info("✅ Conexão com PostgreSQL testada com sucesso!")
        return True
    except Exception as e:
        logger.error(f"❌ Falha na conexão com PostgreSQL: {e}")
        return False

In [7]:
def create_target_table(engine):
    """Cria a tabela de destino se não existir"""
    try:
        with engine.begin() as conn:
            conn.execute(text(f"""
                CREATE TABLE IF NOT EXISTS {table_name} (
                    municipio TEXT,
                    ano INT,
                    tipo_da_forma_de_abastecimento TEXT,
                    mes TEXT,
                    parametro TEXT,
                    data_da_coleta DATE,
                    resultado TEXT,
                    __arquivo_origem TEXT,
                    resultado_num FLOAT,
                    conformidade TEXT,
                    tipo_analise TEXT
                );
            """))
            logger.info(f"✅ Tabela {table_name} verificada/criada")
    except Exception as e:
        logger.error(f"❌ Erro ao criar tabela: {e}")
        raise

In [8]:
def classificar_conformidade(row):
    """Classifica a conformidade dos resultados"""
    parametro = str(row['parametro']).strip().upper()
    valor = str(row['resultado']).strip().upper()
    valor_num = row['resultado_num']
    try:
        if parametro == 'TURBIDEZ (UT)':
            return 'CONFORME' if valor_num < 5 else 'NÃO CONFORME'
        elif parametro == 'ESCHERICHIA COLI':
            return 'CONFORME' if valor == 'AUSENTE' else 'NÃO CONFORME'
        elif parametro == 'COLIFORMES TOTAIS':
            return 'CONFORME' if valor == 'AUSENTE' else 'NÃO CONFORME'
        elif parametro == 'CLORO RESIDUAL LIVRE (MG/L)':
            return 'CONFORME' if 0.2 <= valor_num <= 5.0 else 'NÃO CONFORME'
        elif parametro == 'PH':
            return 'CONFORME' if 6 <= valor_num <= 9.5 else 'NÃO CONFORME'
        elif parametro == 'COR APARENTE (UH)':
            return 'CONFORME' if valor_num < 15 else 'NÃO CONFORME'
        elif parametro == 'FLUORETO (MG/L)':
            return 'CONFORME' if valor_num <= 1.5 else 'NÃO CONFORME'
    except:
        return 'ERRO'
    return 'IGNORADO'

In [9]:
def classificar_tipo_analise(parametro):
    """Classifica o tipo de análise"""
    parametro = str(parametro).strip().upper()
    if parametro in ['COLIFORMES TOTAIS', 'ESCHERICHIA COLI']:
        return 'Microbiológica'
    elif parametro in ['TURBIDEZ (UT)', 'CLORO RESIDUAL LIVRE (MG/L)', 'PH', 'COR APARENTE (UH)', 'FLUORETO (MG/L)']:
        return 'Físico-Química'
    return 'Outros'

In [10]:
if test_postgres_connection():
    print("Conexão com PostgreSQL está funcionando corretamente!")
else:
    print("Problemas na conexão com PostgreSQL. Verifique as configurações.")

INFO:root:✅ Conexão com PostgreSQL testada com sucesso!


Conexão com PostgreSQL está funcionando corretamente!


In [11]:
# Configurar engine SQLAlchemy
engine = create_engine(
    f'postgresql://{pg_user}:{pg_password}@{pg_host}:{pg_port}/{pg_database}',
    pool_pre_ping=True
)

# Criar tabela de destino
create_target_table(engine)

INFO:root:✅ Tabela dados_sisagua_transformados verificada/criada


In [12]:
# Conexão com HDFS e listagem de arquivos
try:
    client = InsecureClient(hdfs_url, user=hdfs_user)
    arquivos = client.list(hdfs_dir)
    arquivos_csv = [arq for arq in arquivos if arq.endswith('.csv')]
    
    if not arquivos_csv:
        logger.warning("Nenhum arquivo CSV encontrado no HDFS.")
    else:
        print(f"Arquivos encontrados no HDFS: {arquivos_csv}")
except Exception as e:
    logger.error(f"Erro ao acessar HDFS: {e}")

INFO:hdfs.client:Instantiated <InsecureClient(url='http://master:9870')>.
INFO:hdfs.client:Listing 'sisagua/'.


Arquivos encontrados no HDFS: ['dados_2020_2025_04_30.csv', 'dados_2020_2025_05_01.csv', 'dados_2021_2025_05_01.csv', 'dados_2021_2025_05_02.csv', 'dados_2022_2025_05_02.csv', 'dados_2023_2025_05_02.csv', 'dados_2024_2025_05_02.csv', 'dados_2025_2025_05_02.csv']


In [13]:
# Processamento dos arquivos
if 'arquivos_csv' in locals() and arquivos_csv:
    todos_dados = []
    
    for arq in arquivos_csv:
        try:
            with client.read(f'{hdfs_dir}{arq}', encoding='utf-8') as reader:
                df = pd.read_csv(reader)
                df['__arquivo_origem'] = arq
                todos_dados.append(df)
            print(f"Arquivo {arq} lido com sucesso!")
        except Exception as e:
            print(f"Erro ao processar arquivo {arq}: {e}")
    
    if todos_dados:
        df = pd.concat(todos_dados, ignore_index=True)
        print(f"Total de registros lidos: {len(df)}")

INFO:hdfs.client:Reading file 'sisagua/dados_2020_2025_04_30.csv'.
INFO:hdfs.client:Reading file 'sisagua/dados_2020_2025_05_01.csv'.


Arquivo dados_2020_2025_04_30.csv lido com sucesso!


INFO:hdfs.client:Reading file 'sisagua/dados_2021_2025_05_01.csv'.


Arquivo dados_2020_2025_05_01.csv lido com sucesso!


INFO:hdfs.client:Reading file 'sisagua/dados_2021_2025_05_02.csv'.


Arquivo dados_2021_2025_05_01.csv lido com sucesso!


INFO:hdfs.client:Reading file 'sisagua/dados_2022_2025_05_02.csv'.


Arquivo dados_2021_2025_05_02.csv lido com sucesso!


INFO:hdfs.client:Reading file 'sisagua/dados_2023_2025_05_02.csv'.


Arquivo dados_2022_2025_05_02.csv lido com sucesso!


INFO:hdfs.client:Reading file 'sisagua/dados_2024_2025_05_02.csv'.


Arquivo dados_2023_2025_05_02.csv lido com sucesso!


INFO:hdfs.client:Reading file 'sisagua/dados_2025_2025_05_02.csv'.


Arquivo dados_2024_2025_05_02.csv lido com sucesso!
Arquivo dados_2025_2025_05_02.csv lido com sucesso!
Total de registros lidos: 1856071


In [14]:
if 'df' in locals():
    # Selecionar colunas
    colunas_selecionadas = [
        'municipio', 'ano', 'tipo_da_forma_de_abastecimento', 'mes',
        'parametro', 'data_da_coleta', 'resultado', '__arquivo_origem'
    ]
    df_teste = df[colunas_selecionadas].copy()
    
    # Filtrar apenas SAA
    df_saa = df_teste[df_teste['tipo_da_forma_de_abastecimento'] == 'SAA'].copy()
    print(f"Registros após filtrar SAA: {len(df_saa)}")
    
    # Converter resultados numéricos
    df_saa['resultado_num'] = pd.to_numeric(
        df_saa['resultado'].astype(str).str.replace(',', '.'),
        errors='coerce'
    )
    
    # Aplicar classificações
    df_saa['conformidade'] = df_saa.apply(classificar_conformidade, axis=1)
    df_saa['tipo_analise'] = df_saa['parametro'].apply(classificar_tipo_analise)
    
    # Mostrar amostra
    print("\nAmostra dos dados processados:")
    display(df_saa.head())

Registros após filtrar SAA: 1669856

Amostra dos dados processados:


Unnamed: 0,municipio,ano,tipo_da_forma_de_abastecimento,mes,parametro,data_da_coleta,resultado,__arquivo_origem,resultado_num,conformidade,tipo_analise
0,SOLONOPOLE,2020,SAA,12,Cloro residual livre (mg/L),2020-12-01,05,dados_2020_2025_04_30.csv,0.5,CONFORME,Físico-Química
1,IGUATU,2020,SAA,5,Escherichia coli,2020-05-14,AUSENTE,dados_2020_2025_04_30.csv,,CONFORME,Microbiológica
2,CRATO,2020,SAA,5,Turbidez (uT),2020-05-14,061,dados_2020_2025_04_30.csv,0.61,CONFORME,Físico-Química
3,MARCO,2020,SAA,4,Escherichia coli,2020-04-28,AUSENTE,dados_2020_2025_04_30.csv,,CONFORME,Microbiológica
4,TIANGUA,2020,SAA,5,Coliformes totais,2020-05-20,AUSENTE,dados_2020_2025_04_30.csv,,CONFORME,Microbiológica


In [15]:
if 'df_saa' in locals() and not df_saa.empty:
    try:
        with engine.begin() as conn:
            df_saa.to_sql(
                name=table_name,
                con=engine,
                if_exists='append',
                index=False
            )
        print(f"✅ {len(df_saa)} registros inseridos com sucesso em {table_name}")
    except Exception as e:
        print(f"❌ Erro ao inserir no PostgreSQL: {e}")

✅ 1669856 registros inseridos com sucesso em dados_sisagua_transformados
