In [7]:
import boto3
import time
import psycopg2

In [8]:
# The use case for this implementation will be to calculate the total number of visits per country for a specific data range (e.g. 1st of January - 1st of February 2018).
# These values will be needed for the boto library to access resources on AWS
AWS_ACCESS_KEY = '<your-access-key>'
AWS_SECRET_KEY = '<your-secret-key>'

# We will use these values to configure how Athena will run the query.
# Make sure to replace these with your own.
# The database name (created in Glue)
ATHENA_DATABASE_NAME = 'christiana-athena-parquet'

# The full location of the folder for your query results
S3_OUTPUT_LOCATION = 's3://athena-learners-etl-bite05/christiana/'

# Set up the Athena client
athena_client = boto3.client(
    'athena',
    region_name='eu-west-2',
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY)

# Write the SQL query
sql_query = """
    SELECT server_request_country_code, COUNT(*) as total_visits
    FROM vod_clickstream
    WHERE datetime >= CAST('2018-01-01' AS timestamp) AND datetime < CAST('2018-02-01' AS timestamp)
    GROUP BY server_request_country_code;
"""

# Execute the Athena query
#
# You will be able to see the query execution through the Athena web console:
# https://eu-west-2.console.aws.amazon.com/athena/home?region=eu-west-2#/query-editor/history
query_execution = athena_client.start_query_execution(
    QueryString=sql_query,
    QueryExecutionContext={
        "Database": ATHENA_DATABASE_NAME
    },
    ResultConfiguration={
        "OutputLocation": S3_OUTPUT_LOCATION
    }
)

# Poll the query status until it is either successful or failed
query_status = "QUEUED"
query_execution_id = query_execution["QueryExecutionId"]

while query_status in ["QUEUED", "RUNNING"]:
    query_execution = athena_client.get_query_execution(
        QueryExecutionId=query_execution_id
    )
    query_status = query_execution["QueryExecution"]["Status"]["State"]
    if query_status == "FAILED":
        raise Exception("Athena query failed!")
    time.sleep(1)

# Retrieve the query results
results = athena_client.get_query_results(
    QueryExecutionId=query_execution_id
)["ResultSet"]["Rows"]

In [10]:
# Transform & Load

# Connect to the local Postgres database
conn = psycopg2.connect(database="etl_bites", user="", password="", host="localhost", port="5432")
cursor = conn.cursor()

# Create the table if it doesn't exist
cursor.execute("""
    CREATE TABLE IF NOT EXISTS visits_per_country (
        country_code VARCHAR(2) PRIMARY KEY,
        total_visits INTEGER
    );
""")

# Process the query results
for row in results[1:]:
    if not row["Data"][0] or not row["Data"][1]:
        # You could also print what a `row` has if you are curious!
        print(f"Skipping row: {row}")
        continue

    country_code = row["Data"][0]["VarCharValue"]
    total_visits = int(row["Data"][1]["VarCharValue"])

    # Insert the data into the local PostgreSQL database
    insert_query = """
        INSERT INTO visits_per_country (country_code, total_visits)
        VALUES (%s, %s)
        ON CONFLICT (country_code)
        DO UPDATE SET total_visits = EXCLUDED.total_visits;
    """

    try:
        cursor.execute(insert_query, (country_code, total_visits))

    except Exception as e:
        print("Error occurred inserting into analytical DB: %s"% e)
        conn.rollback()  # Rollback the transaction if there's an error

# Commit the changes and close the cursor and connection outside the loop
conn.commit()
cursor.close()
conn.close()


Skipping row: {'Data': [{}, {'VarCharValue': '243775'}]}
