## Data extraction using Athena from S3


In [1188]:
import boto3
import pandas as pd
import numpy as np
import io
import re
import time
import pickle 

pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

In [5]:
params = {
    'region': 'eu-west-2', #update region
    'database': '*****_staging', #update database
    'bucket': '******', #update bucket
    'path': 'athena-output',
    'query': "***************" #Update Query to fetch the records
}
session = boto3.session.Session(profile_name=None)
print(session)

def athena_query(client, params):
    
    response = client.start_query_execution(
        QueryString=params['query'],
        QueryExecutionContext={
            'Database': params['database']
        },
        ResultConfiguration={
            'OutputLocation': 's3://' + params['bucket'] + '/' + params['path']
        },
        WorkGroup = '*********', #update workgroup
    )
    return response

def athena_to_s3(session, params, max_execution = 1000000):
    client = session.client('athena', region_name=params["region"])
    execution = athena_query(client, params)
    execution_id = execution['QueryExecutionId']
    state = 'QUEUED'
    while (max_execution > 0 and state in ['RUNNING', 'QUEUED']):
        max_execution = max_execution - 1
        response = client.get_query_execution(QueryExecutionId = execution_id)
        if 'QueryExecution' in response and \
                'Status' in response['QueryExecution'] and \
                'State' in response['QueryExecution']['Status']:
            state = response['QueryExecution']['Status']['State']
            print(state)
            if state == 'FAILED':
                return False
            elif state == 'SUCCEEDED':
                s3_path = response['QueryExecution']['ResultConfiguration']['OutputLocation']
                filename = re.findall('.*\/(.*)', s3_path)[0]
                print(filename)
                return filename
        time.sleep(1)
    return False

def s3_to_pandas(session, params, s3_filename):    
    s3client = session.client('s3')
    obj = s3client.get_object(Bucket=params['bucket'],
                              Key=params['path'] + '/' + s3_filename)
    df = pd.read_csv(io.BytesIO(obj['Body'].read()))
    return df

Session(region_name='eu-west-2')


In [6]:
%%time 
s3_filename = athena_to_s3(session, params)
df = s3_to_pandas(session, params, s3_filename)

In [36]:
df.shape

(25721403, 21)

#### Save Data to pickle file

In [40]:
df.to_pickle('data_all.pkl')