## AWS Athena Usage Examples
---

In [None]:
import pandas as pd
import boto3
import time

In [None]:
# Download and unzip the MovieLens 100k dataset
!wget http://www.grouplens.org/system/files/ml-100k.zip
!unzip ml-100k
!rm ml-100k.zip

In [None]:
# Define column names for the data in u.data file
column_names = ['user_id', 'item_id', 'rating', 'timestamp']

In [None]:
# Define the path to the u.data file
data_file_path = 'ml-100k/u.data'

In [None]:
# Load the u.data file into a DataFrame
df = pd.read_csv(data_file_path, sep = '\t', names = column_names)
df

In [None]:
# Load item information from u.item file
item_data = pd.read_csv('ml-100k/u.item',
                        sep = '|',
                        header = None,
                        encoding = 'ISO-8859-1',
                        usecols = [0, 1],
                        names = ['item_id', 'item_name'])

In [None]:
# Merge the rating data with item names on 'item_id'
df = pd.merge(df, item_data, on = 'item_id')
df

In [None]:
# Convert the DataFrame into a CSV format
merged_csv = df.to_csv(index = False, header = False)

In [None]:
# Set AWS region and S3 bucket details
AWS_REGION = 'us-east-1'
S3_BUCKET_NAME = 'csc555-jaewon'
S3_KEY = 'athena/'
filename = 'ml-100k.csv'

In [None]:
# Set up the session with credentials
session = boto3.Session(aws_access_key_id = '--',
                        aws_secret_access_key = '--',
                        aws_session_token = '--',
                        region_name = '--')

In [None]:
# Create an S3 client using the session
s3 = session.client('s3')

In [None]:
# Create an S3 client to interact with AWS S3
s3 = boto3.client('s3', region_name = AWS_REGION)

In [None]:
# Upload CSV file to S3
s3.put_object(Body = merged_csv, Bucket = S3_BUCKET_NAME, Key = f'{S3_KEY}{filename}')

In [None]:
# Define Athena-related parameters
DATABASE_NAME = 'default'
TABLE_NAME = 'test-table'

In [None]:
# Create an Athena client to interact with AWS Athena
athena_client = boto3.client('athena', region_name = AWS_REGION)

In [None]:
# Create the SQL query to create an external table in Athena
create_table_query = f"""CREATE EXTERNAL TABLE IF NOT EXISTS `{DATABASE_NAME}`.`{TABLE_NAME}`
                                (user_id INT,
                                 item_id INT,
                                 rating INT,
                                 timestamp INT,
                                 item_name STRING)
                         ROW FORMAT DELIMITED
                         FIELDS TERMINATED BY ','
                         LOCATION 's3://{S3_BUCKET_NAME}/athena'"""
print(create_table_query.strip())

In [None]:
# Submit the query to Athena to create the external table
response = athena_client.start_query_execution(QueryString = create_table_query.strip(),
                                               QueryExecutionContext = {'Database': DATABASE_NAME},
                                               ResultConfiguration = {'OutputLocation': f's3://{S3_BUCKET_NAME}/query_results/'})
print("Athena table creation query submitted.")

In [None]:
# Initialize the Athena client to interact with AWS Athena
athena_client = boto3.client('athena')

In [None]:
# Function to run an Athena query
def run_athena_query(query, database, output_location):

    # Start query execution and store the response
    response = athena_client.start_query_execution(QueryString = query, # The SQL query to execute
                                                   QueryExecutionContext = {'Database': database}, # Specify the database to use
                                                   ResultConfiguration = {'OutputLocation': output_location}) # Specify where to store the query results

    return response['QueryExecutionId']

In [None]:
# Function to check the status of a query execution
def check_query_status(query_execution_id):

    while True:
        # Get the current status of the query
        response = athena_client.get_query_execution(QueryExecutionId = query_execution_id)
        status = response['QueryExecution']['Status']['State']

        # If the query succeeded, break the loop
        if status == 'SUCCEEDED':
            print("Query succeeded")
            break

        # If the query failed or was cancelled, raise an exception
        elif status in ['FAILED', 'CANCELLED']:
            print(f"Query {status.lower()}")
            raise Exception(f"Query failed or was cancelled: {response}")

        # Sleep for 2 seconds before checking the status again
        time.sleep(2)

In [None]:
# Function to retrieve the results of a query and convert them into a pandas DataFrame
def get_query_results_as_dataframe(query_execution_id):

    # Get paginated results from the query
    results_paginator = athena_client.get_paginator('get_query_results')
    results_iter = results_paginator.paginate(QueryExecutionId = query_execution_id)

    columns = []
    rows = []

    for results_page in results_iter:

        # Extract the column names from the first results page
        if not columns:
            columns = [col['Label'] for col in results_page['ResultSet']['ResultSetMetadata']['ColumnInfo']]

        # Extract the data from the rows
        for row in results_page['ResultSet']['Rows'][1:]:
            rows.append([col.get('VarCharValue', None) for col in row['Data']])

    # Convert the results into a pandas DataFrame
    df = pd.DataFrame(rows, columns = columns)

    return df

In [None]:
# Example query
query = f'SELECT * FROM "default"."{TABLE_NAME}" limit 10;'
database = 'default'
output_location = f's3://{S3_BUCKET_NAME}/query_results/' # S3 bucket to store query results

In [None]:
# Run the query
query_execution_id = run_athena_query(query, database, output_location)

In [None]:
# Check the status of the query
check_query_status(query_execution_id)

In [None]:
# Fetch the results as a pandas DataFrame
df = get_query_results_as_dataframe(query_execution_id)
df