In [4]:
# Import the required packages

import boto3
import pandas as pd
from io import StringIO
import time
import io

In [5]:
# Load the details required to connect to different AWS Services

import configparser
config = configparser.ConfigParser()
config.read_file(open('config_file.config'))
AWS_ACCESS_KEY_ID=config.get("AWS","KEY")
AWS_SECRET_ACCESS_KEY=config.get("AWS","SECRET_KEY")
AWS_BUCKET_NAME=config.get("DWH","BUCKET_NAME")
FILE_PATH=config.get("AWS","FILE_PATH")
AWS_REGION=config.get("AWS","AWS_REGION")

In [6]:
# Connect to S3

s3 = boto3.resource(
    service_name='s3',
    region_name='eu-west-1',
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)

In [None]:
# Load the files to S3

s3.Bucket(AWS_BUCKET_NAME).upload_file(Filename=FILE_PATH + '/coaches.csv', Key='coaches.csv')
s3.Bucket(AWS_BUCKET_NAME).upload_file(Filename=FILE_PATH + '/technical_officials.csv', Key='technical_officials.csv')
s3.Bucket(AWS_BUCKET_NAME).upload_file(Filename=FILE_PATH + '/medals.csv', Key='medals.csv')
s3.Bucket(AWS_BUCKET_NAME).upload_file(Filename=FILE_PATH + '/athletes.csv', Key='athletes.csv')
s3.Bucket(AWS_BUCKET_NAME).upload_file(Filename=FILE_PATH + '/medals_total.csv', Key='medals_total.csv')

In [7]:
# Connect to Athena

athena = boto3.client(
    'athena',
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    region_name=AWS_REGION
)

In [6]:
# Code to query and load data from Athena to local


class QueryAthena:
    
    def __init__(self, query, database, input_folder):
        self.database = database
        self.folder = 'athena-query-results-1/'
        self.bucket_input = AWS_BUCKET_NAME
        self.bucket_output = 'athena-and-redshift-query-results-1'
        self.s3_input = 's3://' + self.bucket_input + input_folder
        self.s3_output =  's3://' + self.bucket_output + '/' + self.folder
        self.region_name = AWS_REGION
        self.aws_access_key_id = AWS_ACCESS_KEY_ID
        self.aws_secret_access_key = AWS_SECRET_ACCESS_KEY
        self.query = query
        
    def load_conf(self, q):
        try:
            self.client = boto3.client('athena', 
                              region_name = self.region_name, 
                              aws_access_key_id = self.aws_access_key_id,
                              aws_secret_access_key= self.aws_secret_access_key)
            response = self.client.start_query_execution(
                QueryString = q,
                    QueryExecutionContext={
                    'Database': self.database
                    },
                    ResultConfiguration={
                    'OutputLocation': self.s3_output,
                    }
            )
            self.filename = response['QueryExecutionId']
            print('Execution ID: ' + response['QueryExecutionId'])
            return response

        except Exception as e:
            print(e)                
  
    def run_query(self):
        queries = [self.query]
        for q in queries:
            res = self.load_conf(q)
        try:              
            query_status = None
            while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:
                query_status = self.client.get_query_execution(QueryExecutionId=res["QueryExecutionId"])['QueryExecution']['Status']['State']
                print(query_status)
                if query_status == 'FAILED' or query_status == 'CANCELLED':
                    raise Exception('Athena query with the string "{}" failed or was cancelled'.format(self.query))
                time.sleep(10)
            print('Query "{}" finished.'.format(self.query))
            
            df = self.obtain_data()
            return df
            
        except Exception as e:
            print(e)      
            
    def obtain_data(self):
        try:
            self.resource = boto3.resource('s3', 
                                  region_name = self.region_name, 
                                  aws_access_key_id = self.aws_access_key_id,
                                  aws_secret_access_key= self.aws_secret_access_key)

            response = self.resource \
            .Bucket(self.bucket_output) \
            .Object(key= self.folder + self.filename + '.csv') \
            .get()
            
            return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')   
        except Exception as e:
            print(e)  
    
# Retrieve athletes table
if __name__ == "__main__":       
    query = """
    SELECT * FROM "tokyo-olympics"."athletes"
    """
    qa = QueryAthena(query=query, database='tokyo-olympics', input_folder='/athletes')
    df_athletes = qa.run_query()

Execution ID: 6d3ab3a1-f488-4f04-8941-e22b8f9fa643
QUEUED
SUCCEEDED
Query "
    SELECT * FROM "tokyo-olympics"."athletes"
    " finished.


In [7]:
# Retrieve coaches table
if __name__ == "__main__":       
    query = """
    SELECT * FROM "tokyo-olympics"."coaches"
    """
    qa = QueryAthena(query=query, database='tokyo-olympics', input_folder='/coaches')
    df_coaches = qa.run_query()

Execution ID: ee456b76-2676-4d90-876e-9fa08d0b79cb
QUEUED
SUCCEEDED
Query "
    SELECT * FROM "tokyo-olympics"."coaches"
    " finished.


In [8]:
# Retrieve medals_total table
if __name__ == "__main__":       
    query = """
    SELECT * FROM "tokyo-olympics"."medals_total"
    """
    qa = QueryAthena(query=query, database='tokyo-olympics', input_folder='/medals_total')
    df_medals_total = qa.run_query()

Execution ID: 15f68261-502a-4d4d-a0e6-7c7626cfaa22
QUEUED
SUCCEEDED
Query "
    SELECT * FROM "tokyo-olympics"."medals_total"
    " finished.


In [9]:
# Retrieve medals table
if __name__ == "__main__":       
    query = """
    SELECT * FROM "tokyo-olympics"."medals"
    """
    qa = QueryAthena(query=query, database='tokyo-olympics', input_folder='/medals')
    df_medals = qa.run_query()

Execution ID: dc7698d6-1595-4586-8f42-2123aae56149
QUEUED
SUCCEEDED
Query "
    SELECT * FROM "tokyo-olympics"."medals"
    " finished.


In [10]:
# Retrieve technical_officials table
if __name__ == "__main__":       
    query = """
    SELECT * FROM "tokyo-olympics"."technical_officials"
    """
    qa = QueryAthena(query=query, database='tokyo-olympics', input_folder='/technical_officials')
    df_technical_officials = qa.run_query()

Execution ID: d303f155-3552-4e62-adfd-f21e0cd69d63
QUEUED
SUCCEEDED
Query "
    SELECT * FROM "tokyo-olympics"."technical_officials"
    " finished.


In [11]:
# Drop the first row in the below dataframes

df_athletes = df_athletes.drop(0).reset_index(drop=True)
df_coaches = df_coaches.drop(0).reset_index(drop=True)
df_technical_officials = df_technical_officials.drop(0).reset_index(drop=True)

In [12]:
# Clean the athletes dataframe

# Function to convert names to CamelCase
def to_camel_case(s):
    parts = s.split(' ')
    return ' '.join(x.capitalize() for x in parts)

df_athletes['name'] = df_athletes['name'].apply(to_camel_case)

In [13]:
# Clean the athletes dataframe

df_athletes['short_name'] = df_athletes['short_name'].apply(to_camel_case)

df_athletes['birth_place'] = df_athletes['birth_place'].astype(str)
df_athletes['birth_place'] = df_athletes['birth_place'].apply(to_camel_case)

df_athletes['residence_place'] = df_athletes['residence_place'].astype(str)
df_athletes['residence_place'] = df_athletes['residence_place'].apply(to_camel_case)
df_athletes = df_athletes.fillna('Not mentioned')

In [14]:
df_athletes.head()

Unnamed: 0,name,short_name,gender,birth_date,birth_place,birth_country,country,country_code,discipline,discipline_code,residence_place,residence_country,height_m_ft
0,Aalerud Katrine,Aalerud K,Female,04-12-1994,Vestby,Norway,Norway,NOR,Cycling Road,CRD,Nan,Not mentioned,Not mentioned
1,Abad Nestor,Abad N,Male,29-03-1993,Alcoi,Spain,Spain,ESP,Artistic Gymnastics,GAR,Madrid,Spain,1.65/5'4''
2,Abagnale Giovanni,Abagnale G,Male,11-01-1995,Gragnano,Italy,Italy,ITA,Rowing,ROW,Sabaudia,Italy,1.98/6'5''
3,Abalde Alberto,Abalde A,Male,15-12-1995,Ferrol,Spain,Spain,ESP,Basketball,BKB,Nan,Not mentioned,2.00/6'6''
4,Abalde Tamara,Abalde T,Female,06-02-1989,Vigo,Spain,Spain,ESP,Basketball,BKB,Nan,Not mentioned,1.92/6'3''


In [15]:
# Clean the coaches dataframe

df_coaches['name'] = df_coaches['name'].apply(to_camel_case)
df_coaches['short_name'] = df_coaches['short_name'].apply(to_camel_case)
df_coaches = df_coaches.fillna('Not mentioned')

In [16]:
df_coaches.head()

Unnamed: 0,name,short_name,gender,birth_date,country_code,discipline,function,event
0,Abdelmagid Wael,Abdelmagid W,Male,02-08-1982,EGY,Football,Head Coach,Not mentioned
1,Abe Junya,Abe J,Male,25-07-1990,JPN,Volleyball,Head Coach,Not mentioned
2,Abe Katsuhiko,Abe K,Male,23-09-1979,JPN,Basketball,Coach,Not mentioned
3,Adama Cherif,Adama C,Male,06-05-1962,CIV,Football,Head Coach,Not mentioned
4,Ageba Yuya,Ageba Y,Male,30-09-1983,JPN,Volleyball,Head Coach,Not mentioned


In [17]:
# Clean the medals_totals dataframe

df_medals_total = df_medals_total.fillna('Not mentioned')
df_medals_total.head()

Unnamed: 0,rank,country_code,gold_medal,silver_medal,bronze_medal,total,country
0,1,USA,39,41,33,113,United States of America
1,2,CHN,38,32,18,88,People's Republic of China
2,3,JPN,27,14,17,58,Japan
3,4,GBR,22,21,22,65,Great Britain
4,5,ROC,20,28,23,71,ROC


In [18]:
# Clean the medals dataframe

df_medals.drop('medal_date', axis=1, inplace=True)
df_medals['athlete_short_name'] = df_medals['athlete_short_name'].apply(to_camel_case)
df_medals['athlete_name'] = df_medals['athlete_name'].apply(to_camel_case)
df_medals['athlete_sex'] = df_medals['athlete_sex'].replace('X', 'Not mentioned')
df_medals = df_medals.fillna('Not mentioned')

In [20]:
df_medals.head()

Unnamed: 0,medal_type,medal_code,athlete_short_name,athlete_name,athlete_sex,country_code,discipline_code,event,country,discipline
0,Gold Medal,1,Kim Jd,Kim Je Deok,Not mentioned,KOR,ARC,Mixed Team,Republic of Korea,Archery
1,Gold Medal,1,An S,An San,Not mentioned,KOR,ARC,Mixed Team,Republic of Korea,Archery
2,Silver Medal,2,Schloesser G,Schloesser Gabriela,Not mentioned,NED,ARC,Mixed Team,Netherlands,Archery
3,Silver Medal,2,Wijler S,Wijler Steve,Not mentioned,NED,ARC,Mixed Team,Netherlands,Archery
4,Bronze Medal,3,Alvarez L,Alvarez Luis,Not mentioned,MEX,ARC,Mixed Team,Mexico,Archery


In [21]:
# Clean the technical_officials dataframe

df_technical_officials['name'] = df_technical_officials['name'].apply(to_camel_case)
df_technical_officials['short_name'] = df_technical_officials['short_name'].apply(to_camel_case)
df_technical_officials = df_technical_officials.fillna('Not mentioned')

In [22]:
df_technical_officials.head()

Unnamed: 0,name,short_name,gender,birth_date,country,discipline,function
0,Abaeva Elena,Abaeva E,Female,21-04-1966,Uzbekistan,Wrestling,Judge
1,Abbar Bachir,Abbar B,Male,03-05-1965,Morocco,Boxing,Judge
2,Abdellatif Makfouni,Abdellatif M,Male,23-11-1972,Morocco,Boxing,Judge
3,Abe Miya,Abe M,Female,27-10-1992,Japan,Beach Volleyball,Referee
4,Aciga Fula Antonio Stephen,Aciga Fula As,Male,28-11-1957,Uganda,Boxing,Judge


In [23]:
bucket_after_etl = 'data-after-etl'

In [25]:
# Load the cleaned athletes dataframe back to S3

csv_convert = StringIO()
df_athletes.to_csv(csv_convert)
s3.Object(bucket_after_etl, 'Output/cleaned_athletes.csv').put(Body=csv_convert.getvalue())

{'ResponseMetadata': {'RequestId': 'TZFZBVTTSQSJ2YAR',
  'HostId': '5Uj0W1UUbr2T1JDNVvSqI18I9gmSPrJ/rEFUspMpZd7gLD/TGudy2JsIYDwVbO9VHf8371iV9Ps=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': '5Uj0W1UUbr2T1JDNVvSqI18I9gmSPrJ/rEFUspMpZd7gLD/TGudy2JsIYDwVbO9VHf8371iV9Ps=',
   'x-amz-request-id': 'TZFZBVTTSQSJ2YAR',
   'date': 'Sun, 10 Mar 2024 14:34:29 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"8fe752391f260c9823569b649fd53965"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"8fe752391f260c9823569b649fd53965"',
 'ServerSideEncryption': 'AES256'}

In [27]:
# Load the cleaned coaches dataframe back to S3

csv_convert_1 = StringIO()
df_coaches.to_csv(csv_convert_1)
s3.Object(bucket_after_etl, 'Output/cleaned_coaches.csv').put(Body=csv_convert_1.getvalue())

{'ResponseMetadata': {'RequestId': 'GHVCP9BRAES1JYX9',
  'HostId': 'Teqx+RPN2ibzSMhFzYex2a4n7rNnoQaZXIW66Aiendxq9OVJgU7mF+uxzh/DAjNJ5fQZ+6L/OsI=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'Teqx+RPN2ibzSMhFzYex2a4n7rNnoQaZXIW66Aiendxq9OVJgU7mF+uxzh/DAjNJ5fQZ+6L/OsI=',
   'x-amz-request-id': 'GHVCP9BRAES1JYX9',
   'date': 'Sun, 10 Mar 2024 14:42:17 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"2fa5faee0ee63cdc4b429acc4faef203"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"2fa5faee0ee63cdc4b429acc4faef203"',
 'ServerSideEncryption': 'AES256'}

In [28]:
# Load the cleaned medals_totals dataframe back to S3

csv_convert_2 = StringIO()
df_medals_total.to_csv(csv_convert_2)
s3.Object(bucket_after_etl, 'Output/cleaned_medals_total.csv').put(Body=csv_convert_2.getvalue())

{'ResponseMetadata': {'RequestId': 'MJHAETPPWKYM3CWK',
  'HostId': '3QL2mn1IVp+YdBMMrLmhHJtnjpeL21h+K18hstIJXM1ZTuW7lZi/XiDAeNOqBCzpwjrDgkQbyTFNOoRzViZNfg==',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': '3QL2mn1IVp+YdBMMrLmhHJtnjpeL21h+K18hstIJXM1ZTuW7lZi/XiDAeNOqBCzpwjrDgkQbyTFNOoRzViZNfg==',
   'x-amz-request-id': 'MJHAETPPWKYM3CWK',
   'date': 'Sun, 10 Mar 2024 14:45:44 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"fd3348cfa82f4714795acdca3dcac4f2"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"fd3348cfa82f4714795acdca3dcac4f2"',
 'ServerSideEncryption': 'AES256'}

In [29]:
# Load the cleaned medals dataframe back to S3

csv_convert_3 = StringIO()
df_medals.to_csv(csv_convert_3)
s3.Object(bucket_after_etl, 'Output/cleaned_medals.csv').put(Body=csv_convert_3.getvalue())

{'ResponseMetadata': {'RequestId': '8GEPRB698C1FQGYM',
  'HostId': 'nhS/gLaJSsC5A2clip1Ait8701vfOJ4EtlOesIWZE36f/R9o2QURCRmn9gvsvk7GSwobB5aS/sF+PrBlehN6qQ==',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'nhS/gLaJSsC5A2clip1Ait8701vfOJ4EtlOesIWZE36f/R9o2QURCRmn9gvsvk7GSwobB5aS/sF+PrBlehN6qQ==',
   'x-amz-request-id': '8GEPRB698C1FQGYM',
   'date': 'Sun, 10 Mar 2024 14:47:50 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"6c027a38143b69f5e5c6a2772b453ac2"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"6c027a38143b69f5e5c6a2772b453ac2"',
 'ServerSideEncryption': 'AES256'}

In [30]:
# Load the cleaned technical_officials dataframe back to S3

csv_convert_4 = StringIO()
df_technical_officials.to_csv(csv_convert_4)
s3.Object(bucket_after_etl, 'Output/cleaned_technical_officials.csv').put(Body=csv_convert_4.getvalue())

{'ResponseMetadata': {'RequestId': '4RPPWGB37ADN62GB',
  'HostId': 'ErlW+3jJhQEh4T/o593hJcSGcGfXUe8KKylrpNg/mYJZlYkobWGsEspc0b6REVRvSyKslVpfi2raiyJht76ZbQ==',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'ErlW+3jJhQEh4T/o593hJcSGcGfXUe8KKylrpNg/mYJZlYkobWGsEspc0b6REVRvSyKslVpfi2raiyJht76ZbQ==',
   'x-amz-request-id': '4RPPWGB37ADN62GB',
   'date': 'Sun, 10 Mar 2024 14:53:05 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"3b5e3f387293e4f1055c89e43ac98150"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"3b5e3f387293e4f1055c89e43ac98150"',
 'ServerSideEncryption': 'AES256'}

In [31]:
# Retrieve the structure of the dataframe for creation on Redshift

df_athletes_structure = pd.io.sql.get_schema(df_athletes.reset_index(),'df_athletes')
print(''.join(df_athletes_structure))

CREATE TABLE "df_athletes" (
"index" INTEGER,
  "name" TEXT,
  "short_name" TEXT,
  "gender" TEXT,
  "birth_date" TEXT,
  "birth_place" TEXT,
  "birth_country" TEXT,
  "country" TEXT,
  "country_code" TEXT,
  "discipline" TEXT,
  "discipline_code" TEXT,
  "residence_place" TEXT,
  "residence_country" TEXT,
  "height_m_ft" TEXT
)


In [32]:
# Retrieve the structure of the dataframe for creation on Redshift

df_coaches_structure = pd.io.sql.get_schema(df_coaches.reset_index(),'df_coaches')
print(''.join(df_coaches_structure))

CREATE TABLE "df_coaches" (
"index" INTEGER,
  "name" TEXT,
  "short_name" TEXT,
  "gender" TEXT,
  "birth_date" TEXT,
  "country_code" TEXT,
  "discipline" TEXT,
  "function" TEXT,
  "event" TEXT
)


In [33]:
# Retrieve the structure of the dataframe for creation on Redshift

df_medals_total_structure = pd.io.sql.get_schema(df_medals_total.reset_index(),'df_medals_total')
print(''.join(df_medals_total_structure))

CREATE TABLE "df_medals_total" (
"index" INTEGER,
  "rank" INTEGER,
  "country_code" TEXT,
  "gold_medal" INTEGER,
  "silver_medal" INTEGER,
  "bronze_medal" INTEGER,
  "total" INTEGER,
  "country" TEXT
)


In [34]:
# Retrieve the structure of the dataframe for creation on Redshift

df_medals_structure = pd.io.sql.get_schema(df_medals.reset_index(),'df_medals')
print(''.join(df_medals_structure))

CREATE TABLE "df_medals" (
"index" INTEGER,
  "medal_type" TEXT,
  "medal_code" INTEGER,
  "athlete_short_name" TEXT,
  "athlete_name" TEXT,
  "athlete_sex" TEXT,
  "country_code" TEXT,
  "discipline_code" TEXT,
  "event" TEXT,
  "country" TEXT,
  "discipline" TEXT
)


In [35]:
# Retrieve the structure of the dataframe for creation on Redshift

df_technical_officials_structure = pd.io.sql.get_schema(df_technical_officials.reset_index(),'df_technical_officials')
print(''.join(df_technical_officials_structure))

CREATE TABLE "df_technical_officials" (
"index" INTEGER,
  "name" TEXT,
  "short_name" TEXT,
  "gender" TEXT,
  "birth_date" TEXT,
  "country" TEXT,
  "discipline" TEXT,
  "function" TEXT
)
