O arquivo lambda2_ons_aws.inpy tem como objetivos:

1- Ler arquivos do bucket "s3lambda" de forma temporaria

2- Tratar dados para armazenamento em um banco de dados

3- Envia os arquivos para o S3 buncket "etl_ons"

In [None]:
import boto3
import pandas as pd
import numpy as np
import os
from io import StringIO
from io import BytesIO
import shutil
import fnmatch
from datetime import datetime


def lambda_handler(event, context):

    s3 = boto3.client('s3')
    
    # Nome do bucket S3 e diretório a ser pesquisado
    bucket_name = 's3lambdaons'
    prefix_BEA = 'BEA/'

    # Lista todos os objetos no diretório do bucket
    response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix_BEA)
    
    # Cria uma lista vazia para armazenar os dataframes lidos
    dfs_BEA = []
    # Loop pelos objetos encontrados
    for obj in response.get('Contents', []):
        # Pula objetos que não são arquivos Excel
        if not obj['Key'].endswith('.xlsx'):
            continue
        # Baixa o arquivo para um diretório temporário
        tmp_dir = '/tmp/data'
        os.makedirs(tmp_dir, exist_ok=True)
        file_path = os.path.join(tmp_dir, os.path.basename(obj['Key']))
        with open(file_path, 'wb') as f:
            s3.download_fileobj(bucket_name, obj['Key'], f)
            
    # Lê o arquivo Excel como um dataframe
        df = pd.read_excel(file_path, sheet_name='Plan1')
    # Adiciona a coluna "regiao" com o nome da região extraído do nome do arquivo
        regiao = obj['Key'].split('/')[1].split('_')[2].lower()
        df['regiao'] = regiao
    # Adiciona o dataframe à lista
        dfs_BEA.append(df)

        
#ETL BEA   
    df_BEA_D = pd.concat(dfs_BEA, axis=0)
    # Renomeia colunas
    df_BEA_D = df_BEA_D.rename(columns={'Unnamed: 0': 'data',
                                 'Unnamed: 1': 'total',
                                 'Unnamed: 2': 'hidraulica',
                                 'Unnamed: 3': 'termica',
                                 'Unnamed: 4': 'eolica',
                                 'Unnamed: 5': 'solar',
                                 'Unnamed: 6': 'intercambio',
                                 'Unnamed: 7': 'carga'})
    
    to_remove = ['Dados Diários acumulados', 'Valores - MWmed', 'Subsistema Norte', 'Total', np.nan,'Data','Sistema Interligado Nacional', 'Subsistema Sul', 'Subsistema Nordeste', 'Subsistema Sudeste']
    df_BEA_D = df_BEA_D[~df_BEA_D['data'].isin(to_remove)]       
    df_BEA_D.dropna(subset=['total'], inplace=True)
    
    # Remove valores que se repetem em 'data' e 'regiao', incluindo valores vazios
    df_BEA_D.drop_duplicates(subset=['data', 'regiao'], inplace=True)  
    
    #atualizando formato dos dados
    df_BEA_D['total'] = pd.to_numeric(df_BEA_D['total'], errors='coerce')
    df_BEA_D['hidraulica'] = pd.to_numeric(df_BEA_D['hidraulica'], errors='coerce')
    df_BEA_D['termica'] = pd.to_numeric(df_BEA_D['termica'], errors='coerce')
    df_BEA_D['eolica'] = pd.to_numeric(df_BEA_D['eolica'], errors='coerce')
    df_BEA_D['solar'] = pd.to_numeric(df_BEA_D['solar'], errors='coerce')
    df_BEA_D['intercambio'] = pd.to_numeric(df_BEA_D['intercambio'], errors='coerce')
    df_BEA_D['carga'] = pd.to_numeric(df_BEA_D['carga'], errors='coerce')
    #df_BEA_D['data'] = pd.to_datetime(df_BEA_D['data']')

# Envia df_BEA_D para o bucket etl-ons
    bucket_name2 = 'etl-ons'
    prefix_bea_etl = 'BEA_etl/'
        
    # Exporta o dataframe como um arquivo Excel
    excel_buffer = BytesIO()
    df_BEA_D.to_excel(excel_buffer, index=False)
                
    # Faz o upload do arquivo Excel para o bucket S3
    file_name = 'df_BEA_D.xlsx'
    file_path = os.path.join('/tmp', file_name)
    with open(file_path, 'wb') as f:
        f.write(excel_buffer.getvalue())
    s3.upload_file(file_path, bucket_name2, f'{prefix_bea_etl}{file_name}')
          
          
 
    # PROD          
              
    dfs_h1 = []
    dfs_h2 = []
    dfs_h3 = []
    dfs_t1 = []
    dfs_t2 = []
    dfs_t3 = []
    
    # Define o nome do bucket e o prefixo da pasta
    prefix_prod = 'PROD_D/'
    filename_pattern_h = 'BDE_D_PROD__H*.xlsx'
    
    # Lista os objetos no bucket que correspondem ao prefixo
    response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix_prod)
    
    for obj in response['Contents']:
        # Extrai o nome do arquivo do objeto
        filename = obj['Key'].split('/')[-1]
        
        # Verifica se o nome do arquivo corresponde ao padrão
        if fnmatch.fnmatch(filename, filename_pattern_h):
            try:
                # Extrai a data do nome do arquivo
                date_str = '_'.join(filename.split('_')[5:9]).split('.')[0]
                
                # Tenta converter a string em um objeto datetime
                try:
                    date_obj = datetime.strptime(date_str, '%Y_%m_%d')
                    
                    # Lê as tabelas prodh_rg, poth_rg e prodh_usina em seus respectivos dataframes e adiciona uma coluna com a data
                    obj = s3.get_object(Bucket=bucket_name, Key=obj['Key'])
                    data = obj['Body'].read()
                    prodh_rg = pd.read_excel(BytesIO(data), sheet_name='Plan1', header=3, usecols='A:D', nrows=7)
                    prodh_rg['data'] = date_obj
                    dfs_h1.append(prodh_rg)
                    
                    poth_rg = pd.read_excel(BytesIO(data), sheet_name='Plan1', header=11, usecols='A:D', nrows=7)
                    poth_rg['data'] = date_obj
                    dfs_h2.append(poth_rg)
                    
                    prodh_usina = pd.read_excel(BytesIO(data), sheet_name='Plan1', header=22, usecols='A:D', nrows=1000)
                    prodh_usina['data'] = date_obj
                    dfs_h3.append(prodh_usina)
                except ValueError:
                    # A string não pode ser convertida em uma data
                    print(f"Valor inválido na coluna date do arquivo {filename}: {date_str}")
                    continue
            except Exception as e:
                print(f"Erro ao ler o arquivo {filename}: {e}")
     
    
    df_prodh_rg = pd.concat(dfs_h1)
    df_poth_rg = pd.concat(dfs_h2)
    df_prodh_usina = pd.concat(dfs_h3)
    
    # Envia os dataframes para o bucket etl_ons
    prefix_PROD_etl = 'PROD/'
    
    # Exporta os dataframes como arquivos Excel
    excel_buffers = []
    
    for i, df in enumerate([df_prodh_rg, df_poth_rg, df_prodh_usina]):
        excel_buffer = BytesIO()
        df.to_excel(excel_buffer, index=False)
        excel_buffers.append(excel_buffer)
        file_name = f'df_{["prodh_rg", "poth_rg", "prodh_usina"][i]}.xlsx'
        file_path = os.path.join('/tmp', file_name)
        with open(file_path, 'wb') as f:
            f.write(excel_buffer.getvalue())
        s3.upload_file(file_path, bucket_name2, f'{prefix_PROD_etl}{file_name}')
        
# PROD_T   
    filename_pattern_t = 'BDE_D_PROD__T*.xlsx'
    for obj in response['Contents']:
        # Extrai o nome do arquivo do objeto
        filename = obj['Key'].split('/')[-1]
        
        # Verifica se o nome do arquivo corresponde ao padrão
        if fnmatch.fnmatch(filename, filename_pattern_t):
            try:
                # Extrai a data do nome do arquivo
                date_str = '_'.join(filename.split('_')[5:9]).split('.')[0]
                
                # Tenta converter a string em um objeto datetime
                try:
                    date_obj = datetime.strptime(date_str, '%Y_%m_%d')
                    
                    # Lê as tabelas prodh_rg, poth_rg e prodh_usina em seus respectivos dataframes e adiciona uma coluna com a data
                    obj = s3.get_object(Bucket=bucket_name, Key=obj['Key'])
                    data = obj['Body'].read()
                    prodt_rg = pd.read_excel(BytesIO(data), sheet_name='Plan1', header=3, usecols='A:D', nrows=7)
                    prodt_rg['data'] = date_obj
                    dfs_t1.append(prodt_rg)
                    
                    pott_rg = pd.read_excel(BytesIO(data), sheet_name='Plan1', header=11, usecols='A:D', nrows=7)
                    pott_rg['data'] = date_obj
                    dfs_t2.append(pott_rg)
                    
                    prodt_usina = pd.read_excel(BytesIO(data), sheet_name='Plan1', header=22, usecols='A:D', nrows=1000)
                    prodt_usina['data'] = date_obj
                    dfs_t3.append(prodt_usina)
                except ValueError:
                    # A string não pode ser convertida em uma data
                    print(f"Valor inválido na coluna date do arquivo {filename}: {date_str}")
                    continue
            except Exception as e:
                print(f"Erro ao ler o arquivo {filename}: {e}")
                    
    df_prodt_rg = pd.concat(dfs_t1)
    df_pott_rg = pd.concat(dfs_t2)
    df_prodt_usina = pd.concat(dfs_t3)
    
# ETL prod, pot e usina 
    # A tabela df_prodt_rg apresentou valores em object mesmo a leitura correta
    df_prodt_rg['GWh no Dia'] = pd.to_numeric(df_prodt_rg['GWh no Dia'], errors='coerce')
    df_prodt_rg['GWh acum. no Mês até o Dia'] = pd.to_numeric(df_prodt_rg['GWh acum. no Mês até o Dia'], errors='coerce')
    df_prodt_rg['GWh acum. no Ano até o Dia'] = pd.to_numeric(df_prodt_rg['GWh acum. no Ano até o Dia'], errors='coerce')
    
    
    # Removendo linhas duplicadas
    dfs_prod = [df_prodh_rg, df_poth_rg, df_prodh_usina, df_prodt_rg, df_pott_rg, df_prodt_usina]
    dfs_prod = list(map(lambda df: df.drop_duplicates(), dfs_prod))
    
    
    # Normatizando nome das colunas para criar relações no PostgreSQL
    df_prodh_rg = df_prodh_rg.rename(columns={'Subsistema': 'subsistema',
                              'GWh no Dia': 'gwmed_dia',
                              'GWh acum. no Mês até o Dia': 'gwmed_mes',
                              'GWh acum. no Ano até o Dia': 'gwmed_ano'})
    
    df_prodt_rg = df_prodt_rg.rename(columns={'Subsistema': 'subsistema',
                              'GWh no Dia': 'gwmed_dia',
                              'GWh acum. no Mês até o Dia': 'gwmed_mes',
                              'GWh acum. no Ano até o Dia': 'gwmed_ano'})
    
    df_poth_rg = df_poth_rg.rename(columns={'Subsistema': 'subsistema',
                              'MWmed no Dia': 'mwmed_dia',
                              'MWmed no Mês até o Dia': 'mwmed_mes',
                              'MWmed no Ano até o Dia': 'mwmed_ano'})
    
    df_pott_rg = df_pott_rg.rename(columns={'Subsistema': 'subsistema',
                              'MWmed no Dia': 'mwmed_dia',
                              'MWmed no Mês até o Dia': 'mwmed_mes',
                              'MWmed no Ano até o Dia': 'mwmed_ano'})
    
    df_prodt_usina = df_prodt_usina.rename(columns={'Usina': 'usina',
                              'Código ONS': 'codigo_ons',
                              'Programado (MWmed)': 'programado',
                              'Verificado (MWmed)': 'verificado'})
    
    df_prodh_usina = df_prodh_usina.rename(columns={'Usina': 'usina',
                              'Código ONS': 'codigo_ons',
                              'Programado (MWmed)': 'programado',
                              'Verificado (MWmed)': 'verificado'})

    # Exporta os dataframes como arquivos Excel
    excel_buffers = []
    for i, df in enumerate([df_prodt_rg, df_pott_rg, df_prodt_usina]):
        excel_buffer = BytesIO()
        df.to_excel(excel_buffer, index=False)
        excel_buffers.append(excel_buffer)
        file_name = f'df_{["prodt_rg", "pott_rg", "prodt_usina"][i]}.xlsx'
        file_path = os.path.join('/tmp', file_name)
        with open(file_path, 'wb') as f:
            f.write(excel_buffer.getvalue())
        s3.upload_file(file_path, bucket_name2, f'{prefix_PROD_etl}{file_name}')          



# RESERVATORIOS
    # Lista todos os objetos no diretório do bucket
    prefix_res = 'RES_D/'
    response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix_res)
    
    # Cria uma lista vazia para armazenar os dataframes lidos
    dfs_res = []

    # Loop pelos objetos encontrados
    for obj in response.get('Contents', []):
        # Pula objetos que não são arquivos Excel
        if not obj['Key'].endswith('.xlsx'):
            continue
        # Baixa o arquivo para um diretório temporário
        tmp_dir = '/tmp/data'
        os.makedirs(tmp_dir, exist_ok=True)
        file_path = os.path.join(tmp_dir, os.path.basename(obj['Key']))
        with open(file_path, 'wb') as f:
            s3.download_fileobj(bucket_name, obj['Key'], f)

        # Lê o arquivo Excel como um dataframe
        df = pd.read_excel(file_path, sheet_name='Plan1')
        # Adiciona a coluna "regiao" com o nome da região extraído do nome do arquivo
        regiao = obj['Key'].split('/')[1].split('_')[2].lower()
        date_str = '_'.join(file_path.split('_')[3:6]).split('.')[0]
        df['regiao'] = regiao
        df['data'] = date_str

        # Adiciona o dataframe à lista
        dfs_res.append(df)

    # Concatena todos os dataframes da lista em um único dataframe
    df_RES_D = pd.concat(dfs_res, ignore_index=True)
   
   
# ELT
    # Renomeia colunas
    df_RES_D = df_RES_D.rename(columns={'Unnamed: 0': 'bacia',
                                 'Unnamed: 1': 'reservatorio',
                                 'Unnamed: 2': 'del',
                                 'Unnamed: 3': 'nivel_m',
                                 'Unnamed: 4': 'vol_util_%',
                                 'Unnamed: 5': 'afluencia',
                                 'Unnamed: 6': 'defluencia',
                                 'Unnamed: 7': 'vertida',
                                 'Unnamed: 8': 'transferida'
                                       })
    
    # remover valores duplicados na lista to_remove
    to_remove = list(set(['Dados Hidráulicos dos Reservatórios', 'Subsistema Sul', 'Subsistema Norte', 'Bacia', 'Subsistema Sul', 'Subsistema Nordeste', 'Subsistema Sudeste']))
    df_RES_D.drop('del', axis=1, inplace=True)
    
    # remover linhas do dataframe que contenham valores presentes em to_remove na coluna col_name
    df_RES_D = df_RES_D[~df_RES_D['bacia'].isin(to_remove)]
    df_RES_D.dropna(subset=['reservatorio'], inplace=True)
    df_RES_D = df_RES_D[df_RES_D['afluencia'] != 'Afluência']
    df_RES_D.drop_duplicates(subset=['reservatorio', 'data'], inplace=True)
    
    # Corrige falores da col 'bacia' que estavam mesclados no excel
    df_RES_D['bacia'].fillna(method='ffill', inplace=True)
    
    #atualizando formato dos dados
    df_RES_D['nivel_m'] = pd.to_numeric(df_RES_D['nivel_m'], errors='coerce')
    df_RES_D['vol_util_%'] = pd.to_numeric(df_RES_D['vol_util_%'], errors='coerce')
    df_RES_D['afluencia'] = pd.to_numeric(df_RES_D['afluencia'], errors='coerce')
    df_RES_D['defluencia'] = pd.to_numeric(df_RES_D['defluencia'], errors='coerce')
    df_RES_D['vertida'] = pd.to_numeric(df_RES_D['vertida'], errors='coerce')
#    df_RES_D['transferida'] = pd.to_numeric(df_RES_D['transferida'], errors='coerce')
    df_RES_D['data'] = pd.to_datetime(df_RES_D['data'],  format='%Y_%m_%d')
   
   

# Envia df_BEA_D para o bucket etl-ons
    bucket_name2 = 'etl-ons'
    prefix_RES_etl = 'RESD/'
        
    # Exporta o dataframe como um arquivo Excel
    excel_buffer = BytesIO()
    df_RES_D.to_excel(excel_buffer, index=False)
                
    # Faz o upload do arquivo Excel para o bucket S3
    file_name = 'df_RES_D.xlsx'
    file_path = os.path.join('/tmp', file_name)
    with open(file_path, 'wb') as f:
        f.write(excel_buffer.getvalue())
    s3.upload_file(file_path, bucket_name2, f'{prefix_RES_etl}{file_name}')



    return {
        'statusCode': 200,
        'body': 'sucesso'
    }