Create e Delete Table Athena utilziando Boto3

In [None]:
import boto3

def create_athena_table(database_name, table_name, s3_location, columns, format='CSV'):
    """Cria uma tabela no AWS Athena.

    Args:
        database_name (str): Nome do banco de dados.
        table_name (str): Nome da tabela.
        s3_location (str): Localização dos dados no S3.
        columns (list): Lista de tuplas (nome_coluna, tipo_dado).
        format (str): Formato dos dados (CSV, PARQUET, etc.).
    """
    athena = boto3.client('athena')

    columns_sql = ', '.join([f'{name} {data_type}' for name, data_type in columns])

    query = f"""
    CREATE EXTERNAL TABLE IF NOT EXISTS {database_name}.{table_name} (
        {columns_sql}
    )
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    LINES TERMINATED BY '\\n'
    LOCATION '{s3_location}'
    TBLPROPERTIES ('skip.header.line.count'='1')
    """

    response = athena.start_query_execution(
        QueryString=query,
        QueryExecutionContext={'Database': database_name},
        ResultConfiguration={'OutputLocation': 's3://seu-bucket-de-resultados/'} # Substitua pelo seu bucket de resultados
    )

    query_execution_id = response['QueryExecutionId']

    state = 'RUNNING'
    while state in ['RUNNING', 'QUEUED']:
        response = athena.get_query_execution(QueryExecutionId=query_execution_id)
        state = response['QueryExecution']['Status']['State']

    if state == 'FAILED':
        raise Exception(f"Falha na criação da tabela: {response['QueryExecution']['Status']['StateChangeReason']}")

def delete_athena_table(database_name, table_name):
    """Exclui uma tabela no AWS Athena."""
    athena = boto3.client('athena')
    query = f"DROP TABLE IF EXISTS {database_name}.{table_name}"
    response = athena.start_query_execution(
        QueryString=query,
        QueryExecutionContext={'Database': database_name},
        ResultConfiguration={'OutputLocation': 's3://seu-bucket-de-resultados/'} # Substitua pelo seu bucket de resultados
    )

    query_execution_id = response['QueryExecutionId']

    state = 'RUNNING'
    while state in ['RUNNING', 'QUEUED']:
        response = athena.get_query_execution(QueryExecutionId=query_execution_id)
        state = response['QueryExecution']['Status']['State']

    if state == 'FAILED':
        raise Exception(f"Falha ao deletar a tabela: {response['QueryExecution']['Status']['StateChangeReason']}")

Teste com pytest

In [None]:
import pytest
import boto3
from athena_utils import create_athena_table, delete_athena_table

DATABASE_NAME = 'seu_banco_de_dados' # Substitua pelo seu banco de dados
TABLE_NAME = 'teste_tabela'
S3_LOCATION = 's3://seu-bucket-de-dados/dados/' # Substitua pela localização dos seus dados no S3
COLUMNS = [('id', 'INT'), ('nome', 'STRING')]

def test_create_athena_table():
    create_athena_table(DATABASE_NAME, TABLE_NAME, S3_LOCATION, COLUMNS)

    # Verifica se a tabela foi criada
    athena = boto3.client('athena')
    response = athena.get_table_metadata(DatabaseName=DATABASE_NAME, TableName=TABLE_NAME)
    assert response['TableMetadata']['Name'] == TABLE_NAME

    # Limpa a tabela após o teste
    delete_athena_table(DATABASE_NAME, TABLE_NAME)

def test_delete_athena_table():
    create_athena_table(DATABASE_NAME, TABLE_NAME, S3_LOCATION, COLUMNS)
    delete_athena_table(DATABASE_NAME, TABLE_NAME)

    # Verifica se a tabela foi deletada
    athena = boto3.client('athena')
    with pytest.raises(athena.exceptions.InvalidRequestException):
        athena.get_table_metadata(DatabaseName=DATABASE_NAME, TableName=TABLE_NAME)