In [11]:
'''

@Author: Vighnesh Harish Bilgi
@Date: 2022-11-13
@Last Modified by: Vighnesh Harish Bilgi
@Last Modified time: 2022-11-13
@Title : Athena and Glue BOTO3

'''

'\n\n@Author: Vighnesh Harish Bilgi\n@Date: 2022-11-13\n@Last Modified by: Vighnesh Harish Bilgi\n@Last Modified time: 2022-11-13\n@Title : Athena and Glue BOTO3\n\n'

In [12]:
import boto3
import time
import json
import pandas as pd

In [13]:
import os
os.environ['AWS_DEFAULT_REGION'] = 'ap-south-1'
os.environ['AWS_ACCESS_KEY_ID'] = os.environ.get('test1_access_key')
os.environ['AWS_SECRET_ACCESS_KEY'] = os.environ.get('test1_secret_access_key')

In [14]:
DATABASE_NAME = 'glue_db'
OUTPUT_LOCATION = "s3://boto3-athena-output/"
TABLE_NAME = 'input'
NEW_TABLE_NAME = 'airline_history'
INPUT_LOCATION = 's3://athena-glue-bucket/input/'
S3_BUCKET_NAME = "athena-glue-bucket"

### 1.a. Create/Get Table after starting crawler on .csv file in S3 bucket.

In [16]:
def connect_to_glue():
    """

    Description:
        To connect to AWS Glue service.
    Parameter:
        No parameters
    Return:
        ServiceResource glue

    """
    glue =  boto3.client('glue')
    return glue

def connect_to_iam():
    """

    Description:
        To connect to AWS IAM service.
    Parameter:
        No parameters
    Return:
        ServiceResource IAM

    """
    iam =  boto3.client('iam')
    return iam

def connect_to_athena():
    """

    Description:
        To connect to AWS Athena service.
    Parameter:
        No parameters
    Return:
        ServiceResource athena

    """
    athena =  boto3.client('athena')
    return athena

def create_iam_policy(iam,s3_bucket_name):
    # Create IAM client
    # iam = boto3.client("iam")

    # Create a policy
    glue_s3_crawler_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": ["s3:ListBucket"],
                "Resource": [f"arn:aws:s3:::{s3_bucket_name}"]
            },
            {
                "Effect": "Allow",
                "Action": ["s3:GetObject"],
                "Resource": [f"arn:aws:s3:::{s3_bucket_name}/*"]
            }
        ]
    }

    response = iam.create_policy(
        PolicyName='glueS3CrawlerPolicy',
        PolicyDocument=json.dumps(glue_s3_crawler_policy)
    )

    return response["Policy"]["Arn"]


def create_iam_role(iam):
    # iam = boto3.client("iam")

    assume_role_policy_document = json.dumps({
        "Version": "2012-10-17",
        "Statement": [
            {
            "Effect": "Allow",
            "Principal": {
                "Service": "glue.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
            }
        ]
    })

    response = iam.create_role(
        RoleName = "glueS3CrawlerRole",
        AssumeRolePolicyDocument = assume_role_policy_document
    )

    return response["Role"]["RoleName"]


def attach_iam_policy(iam,policy_arn, role_name):
    # iam = boto3.client("iam")

    response = iam.attach_role_policy(
        RoleName=role_name,
        PolicyArn=policy_arn
    )

    print(response)


def create_glue_crawler(glue,crawler_name, iam_role_name, db_name, s3_path, s3_path_exclusions):
    # glue_client = boto3.client("glue")

    # try:
        response = glue.create_crawler(
            Name=crawler_name,
            Role=iam_role_name,
            DatabaseName=db_name,
            Targets={
                "S3Targets": [
                    {
                        "Path": s3_path,
                        "Exclusions": s3_path_exclusions
                    }
                ]
            }
        )

        print(response)

    # except:
    #     pass


def start_crawler(glue,crawler_name):
    # glue_client = boto3.client("glue")

    response = glue.start_crawler(
        Name=crawler_name
    )

    print(response)



def get_all_rows(athena,database_name, table_name, output_location):
    # athena_client = boto3.client("athena")

    query = f"SELECT * from {database_name}.{table_name}"
    response = athena.start_query_execution(
        QueryString=query,
        ResultConfiguration={"OutputLocation": output_location}
    )

    return response["QueryExecutionId"]


def get_query_results(athena,execution_id):
    # athena_client = boto3.client("athena")

    response = athena.get_query_results(
        QueryExecutionId=execution_id
    )

    results = response["ResultSet"]["Rows"]
    return results

def has_query_succeeded(athena,execution_id):
    state = "RUNNING"
    max_execution = 5

    while max_execution > 0 and state in ["RUNNING", "QUEUED"]:
        max_execution -= 1
        response = athena.get_query_execution(QueryExecutionId=execution_id)
        if (
            "QueryExecution" in response
            and "Status" in response["QueryExecution"]
            and "State" in response["QueryExecution"]["Status"]
        ):
            state = response["QueryExecution"]["Status"]["State"]
            if state == "SUCCEEDED":
                return True

        time.sleep(30)

def query_to_df(query_result):
    header = []
    header_result = query_result[0]
    for key in header_result:
        for record in header_result[key]:
            for data_type in record:
                header.append(record[data_type]) 

    record_result = query_result[1:]
    data = []
    for result in record_result:
        row = []
        for key in result:
            for record in result[key]:
                for data_type in record:
                    row.append(record[data_type])
        data.append(row)             

    # Create the pandas DataFrame
    df = pd.DataFrame(data, columns=header)
    df = df.dropna()
    # print dataframe.
    print(df)

def main():

    athena = connect_to_athena()
    glue = connect_to_glue()
    iam= connect_to_iam()

    # 1. Create IAM Policy
    # print("Creating IAM policy")
    policy_arn = create_iam_policy(iam,S3_BUCKET_NAME)

    # 2. Create IAM Role
    # print("Creating IAM role")
    role_name = create_iam_role(iam)

    # 3. Attach IAM policy
    # print("Attaching IAM policy")
    attach_iam_policy(iam,policy_arn=policy_arn, role_name=role_name)

    # 4. Attach AWS Managed Role
    service_role_arn = "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole"
    attach_iam_policy(iam,policy_arn=service_role_arn, role_name=role_name)


    # 5. Create Glue Crawler for path
    # print("Creating Glue crawler")
    crawler_name = "GlueAirlineCrawler"
    glue_database_name = DATABASE_NAME

    create_glue_crawler(glue=glue,
        crawler_name=crawler_name,
        iam_role_name="glueS3CrawlerRole",
        db_name=DATABASE_NAME,
        s3_path=INPUT_LOCATION,
        s3_path_exclusions=[],
    )

    # 6. Start crawler
    print("Starting Glue crawler")
    start_crawler(glue,crawler_name=crawler_name)

    # 7. Make athena query
    database_name = DATABASE_NAME
    table_name = TABLE_NAME
    output_location =OUTPUT_LOCATION

    print("Querying athena:")
    execution_id = get_all_rows(athena,database_name=database_name, table_name=table_name, output_location=output_location)

    query_status = has_query_succeeded(athena,execution_id=execution_id)
    print(f"Query state: {query_status}")

    # 8. Retrieve results
    print("retrieving results")
    # print(get_query_results(athena,execution_id=execution_id))
    query_result = get_query_results(athena,execution_id=execution_id)

    # 9. Print query result as a pandas dataframe
    print("Reading all records of the table:")
    query_to_df(query_result)

if __name__ == "__main__":
    main()



Querying athena:
Query state: True
retrieving results
Reading all records of the table:
  travel_id            airline from_country to_country hours_travelled  \
0      T001            Vistara        India     France             9.5   
1      T002           Aeroflot       France     Russia             3.5   
2      T003      China Eastern       Russia      China             4.5   
3      T004  Juneyao Airlines         China      Japan             3.5   
4      T005          Air India        Japan      India           10.17   

  amount_paid  
0     24547.4  
1    69806.66  
2   109016.75  
3    44251.07  
4     46431.0  


### 1.b. Creating new Table 'airline_history' from 'input' table.

In [None]:
def connect_to_athena():
    """

    Description:
        To connect to AWS Athena service.
    Parameter:
        No parameters
    Return:
        ServiceResource athena

    """
    athena =  boto3.client('athena')
    return athena
  

def has_query_succeeded(athena,execution_id):
    """

    Description:
        To check the status of the query.
    Parameter:
        ServiceResource athena,
        string execution_id
    Return:
        bool 

    """

    state = "RUNNING"
    max_execution = 5

    while max_execution > 0 and state in ["RUNNING", "QUEUED"]:
        max_execution -= 1
        response = athena.get_query_execution(QueryExecutionId=execution_id)
        if (
            "QueryExecution" in response
            and "Status" in response["QueryExecution"]
            and "State" in response["QueryExecution"]["Status"]
        ):
            state = response["QueryExecution"]["Status"]["State"]
            if state == "SUCCEEDED":
                return True

        time.sleep(30)


def run_query(athena,query):
    """

    Description:
        To run query the query and store result of query in the output bucket location.
    Parameter:
        ServiceResource athena,
        string query
    Return:
        bool 

    """
    response = athena.start_query_execution(
        QueryString=query,
        ResultConfiguration={"OutputLocation": OUTPUT_LOCATION}
    )

    return response["QueryExecutionId"]

def get_query_results(athena,execution_id):
    """

    Description:
        To fetch the records of the query as a list of dictionaries with each dictionary being one record including header.
    Parameter:
        ServiceResource athena,
        string execution_id
    Return:
        list results 

    """
    response = athena.get_query_results(
        QueryExecutionId=execution_id
    )

    results = response['ResultSet']['Rows']
    return results

def query_to_df(query_result):
    """

    Description:
        To extract the records from 'query_result' and display it as a Pandas DataFrame
    Parameter:
        list query_result
    Return:
        No values returned.

    """
    header = []
    header_result = query_result[0]
    for key in header_result:
        for record in header_result[key]:
            for data_type in record:
                header.append(record[data_type]) 

    record_result = query_result[1:]
    data = []
    for result in record_result:
        row = []
        for key in result:
            for record in result[key]:
                for data_type in record:
                    row.append(record[data_type])
        data.append(row)             

    # Create the pandas DataFrame
    df = pd.DataFrame(data, columns=header)
    df = df.dropna()
    # print dataframe.
    print(df)     

def main():

    athena = connect_to_athena()

    query = f"Create table {DATABASE_NAME}.{NEW_TABLE_NAME} as Select *, ROUND(amount_paid / hours_travelled, 2) as rate_of_travel From {DATABASE_NAME}.{TABLE_NAME}"
    execution_id = run_query(athena,query)
    print(f"Get query execution id: {execution_id}")

    query_status = has_query_succeeded(athena,execution_id=execution_id)
    print(f"Query state: {query_status}")

    query = f"Select * from DATABASE_NAME}.{NEW_TABLE_NAME}"
    execution_id = run_query(athena,query)
    print(f"Get query execution id: {execution_id}")

    query_status = has_query_succeeded(athena,execution_id=execution_id)
    print(f"Query state: {query_status}")

    query_result = get_query_results(athena,execution_id=execution_id)
    query_to_df(query_result)

if __name__ == "__main__":
    main()

Get query execution id: 021893e4-1e31-4e08-be9b-f3c87ab6ef40
Query state: True
Get query execution id: 9122885c-4188-4d0f-a4cc-5be9f4391ee7
Query state: True
  travel_id            airline from_country to_country hours_travelled  \
0      T001            Vistara        India     France             9.5   
1      T002           Aeroflot       France     Russia             3.5   
2      T003      China Eastern       Russia      China             4.5   
3      T004  Juneyao Airlines         China      Japan             3.5   
4      T005          Air India        Japan      India           10.17   

  amount_paid rate_of_travel  
0     24547.4        2583.94  
1    69806.66       19944.76  
2   109016.75       24225.94  
3    44251.07       12643.16  
4     46431.0        4565.49  


### 2. Performing Aggregation queries

In [None]:
def connect_to_athena():
    """

    Description:
        To connect to AWS Athena service.
    Parameter:
        No parameters
    Return:
        ServiceResource athena

    """
    athena =  boto3.client('athena')
    return athena
  

def has_query_succeeded(athena,execution_id):
    """

    Description:
        To check the status of the query.
    Parameter:
        ServiceResource athena,
        string execution_id
    Return:
        bool 

    """

    state = "RUNNING"
    max_execution = 5

    while max_execution > 0 and state in ["RUNNING", "QUEUED"]:
        max_execution -= 1
        response = athena.get_query_execution(QueryExecutionId=execution_id)
        if (
            "QueryExecution" in response
            and "Status" in response["QueryExecution"]
            and "State" in response["QueryExecution"]["Status"]
        ):
            state = response["QueryExecution"]["Status"]["State"]
            if state == "SUCCEEDED":
                return True

        time.sleep(30)


def run_query(athena,query):
    """

    Description:
        To run query the query and store result of query in the output bucket location.
    Parameter:
        ServiceResource athena,
        string query
    Return:
        bool 

    """
    response = athena.start_query_execution(
        QueryString=query,
        ResultConfiguration={"OutputLocation": OUTPUT_LOCATION}
    )

    return response["QueryExecutionId"]

def get_query_results(athena,execution_id):
    """

    Description:
        To fetch the records of the query as a list of dictionaries with each dictionary being one record including header.
    Parameter:
        ServiceResource athena,
        string execution_id
    Return:
        list results 

    """
    response = athena.get_query_results(
        QueryExecutionId=execution_id
    )

    results = response['ResultSet']['Rows']
    return results

def query_to_df(query_result):
    """

    Description:
        To extract the records from 'query_result' and display it as a Pandas DataFrame
    Parameter:
        list query_result
    Return:
        No values returned.

    """
    header = []
    header_result = query_result[0]
    for key in header_result:
        for record in header_result[key]:
            for data_type in record:
                header.append(record[data_type]) 

    record_result = query_result[1:]
    data = []
    for result in record_result:
        row = []
        for key in result:
            for record in result[key]:
                for data_type in record:
                    row.append(record[data_type])
        data.append(row)             

    # Create the pandas DataFrame
    df = pd.DataFrame(data, columns=header)
    df = df.dropna()
    # print dataframe.
    print(df)     

def main():

    athena = connect_to_athena()

    # Find average number of hours spent in travelling all filghts
    print("Find average number of hours spent in travelling all filghts")
    query = f"SELECT AVG(hours_travelled) AS AverageTimeTravelled FROM {DATABASE_NAME}.{NEW_TABLE_NAME}"
    execution_id = run_query(athena,query)
    print(f"Get query execution id: {execution_id}")

    query_status = has_query_succeeded(athena,execution_id=execution_id)
    print(f"Query state: {query_status}")

    query_result = get_query_results(athena,execution_id=execution_id)
    query_to_df(query_result)

    # Find total amount spent in travelling all flights
    print("Find total amount spent in travelling all flights")
    query = f"SELECT SUM(amount_paid) AS TotalAmountPaid FROM {DATABASE_NAME}.{NEW_TABLE_NAME}"
    execution_id = run_query(athena,query)
    print(f"Get query execution id: {execution_id}")

    query_status = has_query_succeeded(athena,execution_id=execution_id)
    print(f"Query state: {query_status}")

    query_result = get_query_results(athena,execution_id=execution_id)
    query_to_df(query_result)

    # Find flight that had the highest rate of travel
    print("Find flight that had the highest rate of travel")
    query = f"""SELECT airline, rate_of_travel 
                FROM {DATABASE_NAME}.{NEW_TABLE_NAME} ORDER BY rate_of_travel desc
                LIMIT 1;"""
    execution_id = run_query(athena,query)
    print(f"Get query execution id: {execution_id}")

    query_status = has_query_succeeded(athena,execution_id=execution_id)
    print(f"Query state: {query_status}")

    query_result = get_query_results(athena,execution_id=execution_id)
    query_to_df(query_result)

if __name__ == "__main__":
    main()

Find average number of hours spent in travelling all filghts
Get query execution id: 3fc847c2-3efd-4b73-b56e-9ab789a30c6b
Query state: True
  AverageTimeTravelled
0                6.234
Find total amount spent in travelling all flights
Get query execution id: 0933361e-9620-4102-ae1b-8b2f79b8ebef
Query state: True
  TotalAmountPaid
0       294052.88
Find flight that had the highest rate of travel
Get query execution id: eec66bb1-82b7-401c-bcd8-1bcaea2e9d46
Query state: True
         airline rate_of_travel
0  China Eastern       24225.94


### 3. Dropping Table

In [None]:
def connect_to_athena():
    """

    Description:
        To connect to AWS Athena service.
    Parameter:
        No parameters
    Return:
        ServiceResource athena

    """
    athena =  boto3.client('athena')
    return athena
  

def has_query_succeeded(athena,execution_id):
    """

    Description:
        To check the status of the query.
    Parameter:
        ServiceResource athena,
        string execution_id
    Return:
        bool 

    """
    state = "RUNNING"
    max_execution = 5

    while max_execution > 0 and state in ["RUNNING", "QUEUED"]:
        max_execution -= 1
        response = athena.get_query_execution(QueryExecutionId=execution_id)
        if (
            "QueryExecution" in response
            and "Status" in response["QueryExecution"]
            and "State" in response["QueryExecution"]["Status"]
        ):
            state = response["QueryExecution"]["Status"]["State"]
            if state == "SUCCEEDED":
                return True

        time.sleep(30)


def run_query(athena,query):
    """

    Description:
        To run query the query and store result of query in the output bucket location.
    Parameter:
        ServiceResource athena,
        string query
    Return:
        bool 

    """

    response = athena.start_query_execution(
        QueryString=query,
        ResultConfiguration={"OutputLocation": OUTPUT_LOCATION}
    )

    return response["QueryExecutionId"]

def get_query_results(athena,execution_id):
    response = athena.get_query_results(
        QueryExecutionId=execution_id
    )

    results = response['ResultSet']['Rows']
    return results

def query_to_df(query_result):
    """

    Description:
        To extract the records from 'query_result' and display it as a Pandas DataFrame
    Parameter:
        list query_result
    Return:
        No values returned.

    """

    if query_result != []:

        header = []
        header_result = query_result[0]
        for key in header_result:
            for record in header_result[key]:
                for data_type in record:
                    header.append(record[data_type]) 

        record_result = query_result[1:]
        data = []
        for result in record_result:
            row = []
            for key in result:
                for record in result[key]:
                    for data_type in record:
                        row.append(record[data_type])
            data.append(row)             

        # Create the pandas DataFrame
        df = pd.DataFrame(data, columns=header)
        df = df.dropna()
        # print dataframe.
        print(df)

    else:
        print("No records exist in table or table is dropped.")      

def main():

    athena = connect_to_athena()

    # DROPPING TABLE.
    print(f"DROPPING TABLE {NEW_TABLE_NAME}")
    query = f"""DROP TABLE IF EXISTS {DATABASE_NAME}.{NEW_TABLE_NAME}"""
    execution_id = run_query(athena,query)
    print(f"Get query execution id: {execution_id}")

    query_status = has_query_succeeded(athena,execution_id=execution_id)
    print(f"Query state: {query_status}")

    query_result = get_query_results(athena,execution_id=execution_id)
    query_to_df(query_result)


if __name__ == "__main__":
    main()

DROPPING TABLE airline_history
Get query execution id: 6d20b180-a369-4e47-abd7-bc8a770fa0ed
Query state: True
No records exist in table or table is dropped.
