In [25]:
import boto3
import pandas as pd
import pyarrow.parquet as pq
from pyarrow.fs import S3FileSystem
from io import BytesIO

In [13]:
# variáveis de entrada
nome_bucket = 'elasticbeanstalk-us-east-2-047710767346'
subpasta = 'Case_A3'
pasta_origem = f'{subpasta}/arquivos_extraidos'
pasta_destino = f'{subpasta}/arquivos_extraidos_parquet'

s3 = boto3.client('s3')

In [14]:
def verifica_ou_cria_pasta(s3, nome_bucket, pasta_destino):
    # Verifica se a pasta de destino existe
    try:
        s3.head_object(Bucket=nome_bucket, Key=f'{pasta_destino}/')
        print(f'Pasta {pasta_destino} já existe! Caminho: s3://{nome_bucket}/{pasta_destino}/')
    except Exception as e:
        # Se não existir, cria a pasta
        s3.put_object(Body='', Bucket=nome_bucket, Key=f'{pasta_destino}/')
        print(f'Pasta {pasta_destino} criada com sucesso! Caminho: s3://{nome_bucket}/{pasta_destino}/')
verifica_ou_cria_pasta(s3, nome_bucket, pasta_destino)

Pasta Case_A3/arquivos_extraidos_parquet já existe! Caminho: s3://elasticbeanstalk-us-east-2-047710767346/Case_A3/arquivos_extraidos_parquet/


In [None]:
def converte_csv_para_parquet(s3, nome_bucket, pasta_origem, pasta_destino):
    # Configura o sistema de arquivos S3
    s3_fs = S3FileSystem()

    # Lista os objetos no bucket e pasta de origem
    try:
        objetos = s3.list_objects(Bucket=nome_bucket, Prefix=pasta_origem)
    except Exception as e:
        print(f'Erro ao listar objetos em {pasta_origem}: {str(e)}')
        return

    for obj in objetos.get('Contents', []):
        # Verifica se o objeto é uma subpasta chamada "dados" (sem distinção de maiúsculas e minúsculas)
        if obj['Key'].lower() == f'{pasta_origem.lower()}dados/':
            print(f'Processando subpasta: {obj["Key"]}')

            # Lista os objetos dentro da subpasta
            try:
                objetos_csv = s3.list_objects(Bucket=nome_bucket, Prefix=obj['Key'])
            except Exception as e:
                print(f'Erro ao listar objetos em {obj["Key"]}: {str(e)}')
                continue

            for obj_csv in objetos_csv.get('Contents', []):
                # Verifica se o objeto é um arquivo CSV
                if obj_csv['Key'].lower().endswith('.csv'):
                    print(f'Convertendo arquivo CSV: {obj_csv["Key"]}')

                    try:
                        # Lê o conteúdo do arquivo CSV
                        csv_content = s3.get_object(Bucket=nome_bucket, Key=obj_csv['Key'])['Body'].read()

                        # Converte para DataFrame
                        df = pd.read_csv(BytesIO(csv_content))

                        # Converte para Parquet usando PyArrow
                        parquet_key = obj_csv['Key'].replace(pasta_origem, pasta_destino).replace('.csv', '.parquet')
                        
                        print(f'Convertendo para: {parquet_key}')

                        with s3_fs.open_output_stream(f'{nome_bucket}/{parquet_key}') as out_file:
                            pq.write_table(pq.Table.from_pandas(df), out_file)

                        print(f'Arquivo convertido e salvo como Parquet: {parquet_key}')

                    except Exception as e:
                        print(f'Erro ao converter {obj_csv["Key"]}: {str(e)}')

In [29]:
# Chama a função
converte_csv_para_parquet(s3, nome_bucket, pasta_origem, pasta_destino)
