In [None]:
import pandas as pd, re, json, os, boto3, time
from botocore.client import BaseClient
from botocore.exceptions import ClientError as AwsClientError


In [None]:
"""
bronze (partition):
- create external table if not exists ...
- alter table ... add partition (date=date(...)) location 's3://...'
silver (partition):
- create external table if not exists ...
- insert into table... select...
gold (partition only on fact):
- delete table if exists ...
- create external table ...
- insert overwrite into
"""

In [None]:
def athena_sql_executor(
    query: str,
    *,
    client: BaseClient | None = None,
    database: str | None = None,
    output_location: str | None = None,
    encrypt_config: dict | None = None,
):
    """
    Execute SQL query on AWS Athena.
    """
    data = {}
    is_select = (
        True
        if re.search(r"^\s*select|^\s*with.*?as.*?\(.*\)\s*select", query.lower(), re.S)
        else False
    )

    # get aws credentials
    with open("/home/jh97/MyWorks/Documents/.aws_cdt.json", "r") as file:
        content = json.load(file)
        key_id = content["access_key"]
        secret_key = content["secrect_access_key"]

    # initialize client
    if not client:
        client = boto3.client(
            "athena",
            region_name="us-east-1",
            aws_access_key_id=key_id,
            aws_secret_access_key=secret_key,
        )

    # execute the query
    try:
        resp = client.start_query_execution(
            QueryString=query,
            QueryExecutionContext={
                "Database": "default" if not database else database,
                "Catalog": "AwsDataCatalog",
            },
            ResultConfiguration={
                "OutputLocation": (
                    "s3://c2dwh-athena-queries/"
                    if not output_location
                    else output_location
                ),
                "EncryptionConfiguration": (
                    {"EncryptionOption": "SSE_S3"}
                    if not encrypt_config
                    else encrypt_config
                ),
            },
        )
    except AwsClientError as e:
        print(f"Cannot execute the query >> {e.response}")
        return

    # wait for executing
    while True:
        execution = client.get_query_execution(
            QueryExecutionId=resp["QueryExecutionId"]
        )

        if execution["QueryExecution"]["Status"]["State"] in [
            "FAILED",
            "CANCELLED",
        ]:
            print(
                f'Execution {execution["QueryExecution"]["Status"]["State"]} with error >>',
                execution["QueryExecution"]["Status"]
                .get("AthenaError", {})
                .get("ErrorMessage"),
            )
            return
        if execution["QueryExecution"]["Status"]["State"] == "SUCCEEDED":
            if is_select:
                break
            else:
                print(f'Execution {execution["QueryExecution"]["Status"]["State"]}.')
                return

        time.sleep(0.2)

    # normalize result (only for SELECT queries)
    data["query_execution_id"] = resp["QueryExecutionId"]
    data["data"] = []
    paginator = client.get_paginator("get_query_results")

    columns = None
    for page in paginator.paginate(QueryExecutionId=resp["QueryExecutionId"]):
        rows = page["ResultSet"]["Rows"]

        if not columns:  # get list of columns and skip 1st row from 1st page only
            columns = [
                i.get("VarCharValue") for i in page["ResultSet"]["Rows"][0]["Data"]
            ]
            rows = rows[1:]

        for row in rows:
            data["data"].append(
                {
                    columns[i]: row["Data"][i].get("VarCharValue")
                    for i in range(len(columns))
                }
            )

    return data

In [None]:
test = athena_sql_executor(
    "select column_name "
    + "from information_schema.columns "
    + "where table_schema='c2dwh_silver'and table_name='laptops'",
    database="c2dwh_silver",
)

test



In [None]:
res=athena_sql_executor("drop table if exists earphones", database="c2dwh_gold")
res