# 0. Development Setting

In [None]:
#  !pip install pymysql boto3

In [1]:
!pip list

Package                      Version
---------------------------- ---------
absl-py                      1.4.0
anyio                        3.6.2
argon2-cffi                  21.3.0
argon2-cffi-bindings         21.2.0
arrow                        1.2.3
asttokens                    2.2.1
astunparse                   1.6.3
attrs                        22.2.0
backcall                     0.2.0
beautifulsoup4               4.11.2
bleach                       6.0.0
boto3                        1.26.72
botocore                     1.29.72
cachetools                   5.3.0
certifi                      2022.12.7
cffi                         1.15.1
charset-normalizer           3.0.1
comm                         0.1.2
debugpy                      1.6.6
decorator                    5.1.1
defusedxml                   0.7.1
easydict                     1.10
executing                    1.2.0
fastjsonschema               2.16.2
flatbuffers                  23.1.21
fqdn                         1.5.1

In [2]:
!python -V

Python 3.9.16


In [3]:
import pandas as pd 
import pymysql
import boto3

from smart_open import open as s_open
from dotenv import load_dotenv
from easydict import EasyDict

from datetime import datetime
import time
import os


load_dotenv()

True

In [4]:
settings = EasyDict()

settings.AWS_ACCESS_KEY_ID = os.environ["AWS_ACCESS_KEY_ID"]
settings.AWS_SECRET_ACCESS_KEY = os.environ["AWS_SECRET_ACCESS_KEY"]
settings.AWS_REGION_NAME = os.environ["REGION_NAME"]
settings.AWS_ACCOUNT_ID = os.environ["AWS_ACCOUNT_ID"]
settings.AWS_BUCKET_NAME = "genia-bucket"
settings.AWS_ATHENA_OUTPUT_LOCATION = "athena/quries"
settings.AWS_ATHENA_DATABASE = "mini_db"

In [5]:
class Boto3Client(object):
    aws_access_key_id = settings.AWS_ACCESS_KEY_ID
    aws_secret_access_key = settings.AWS_SECRET_ACCESS_KEY
    region_name = settings.AWS_REGION_NAME
    bucket_name = settings.AWS_BUCKET_NAME
    athena_database = settings.AWS_ATHENA_DATABASE
    athena_output_location = settings.AWS_ATHENA_OUTPUT_LOCATION
    
    service_name = None
    
    @classmethod
    def get_client(cls):
        options = dict(
            aws_access_key_id=Boto3Client.aws_access_key_id,
            aws_secret_access_key=Boto3Client.aws_secret_access_key,
            region_name=Boto3Client.region_name,
        )
        return boto3.client(cls.service_name, **options)

In [7]:
class S3Client(Boto3Client):
    
    service_name = "s3"
    
    @staticmethod
    def get_s3_df(file_name: str):
        
        clnt = S3Client.get_client()
        obj = clnt.get_object(
                Bucket=S3Client.bucket_name,
                Key=file_name
            )
        return pd.read_csv(obj["Body"])
    
    @staticmethod
    def upload_s3_df(df: pd.DataFrame, file_name: str):
        try:
            clnt = S3Client.get_client()
            file_name = f"s3://{S3Client.bucket_name}/{file_name}"
            with s_open(file_name, "wb", transport_params=dict(client=clnt)) as out_file:
                df.to_parquet(out_file, engine="pyarrow", compression="gzip", index=False)
            return True
        except Exception as e:
            print("Error occured: ", str(e))
            return False

In [30]:
# S3 csv 데이터 불러오기 예시
# df = S3Client.get_s3_df("raw/abalone.csv")
# df.head()

Unnamed: 0,Type,LongestShell,Diameter,Height,WholeWeight,ShuckedWeight,VisceraWeight,ShellWeight,Rings
0,M,0.455,0.365,0.095,0.514,0.2245,0.101,0.15,15
1,M,0.35,0.265,0.09,0.2255,0.0995,0.0485,0.07,7
2,F,0.53,0.42,0.135,0.677,0.2565,0.1415,0.21,9
3,M,0.44,0.365,0.125,0.516,0.2155,0.114,0.155,10
4,I,0.33,0.255,0.08,0.205,0.0895,0.0395,0.055,7


In [31]:
# parquet 파일 업로드 예시
# S3Client.upload_s3_df(df, file_name="upload-test/teacher.parquet")

True

In [8]:
class AthenaClient(Boto3Client):
    
    service_name = "athena"
    
    output_location = f"s3://{Boto3Client.bucket_name}/{Boto3Client.athena_output_location}"
    
    @staticmethod
    def get_athena_query_exec_id(sql: str):
        
        clnt = AthenaClient.get_client()
        response = clnt.start_query_execution(
            QueryString=sql,
            QueryExecutionContext={"Database": AthenaClient.athena_database},
            ResultConfiguration={"OutputLocation": AthenaClient.output_location},
        )
        # response 내의 StatusCode == 200 확인
        return response["QueryExecutionId"]
    
    @staticmethod
    def collect_query_result(query_exec_id: str):
        
        clnt = AthenaClient.get_client()
        
        WAIT = ["QUEUED", "RUNNING"]
        SUCCESS = ["SUCCEEDED"]
        FAILED = ["FAILED", "CANCELLED"]
        
        while True:
            try:
                result = clnt.get_query_execution(QueryExecutionId=query_exec_id)
                status = result["QueryExecution"]["Status"]["State"]

                if status in SUCCESS:
                    query_result_path = f"{AthenaClient.output_location}/{query_exec_id}.csv"
                    query_result_path = query_result_path.replace(f"s3://{AthenaClient.bucket_name}/", "")
                    return S3Client.get_s3_df(query_result_path)

                if status in FAILED:
                    print(f"FAILED!!! -> {status}")
                    break

                if status in WAIT:
                    print(f"Still Running... -> {status}")
                    time.sleep(0.5)
                    continue

                print(f"unexpected status... -> {status}")
                break

            except Exception as e:
                print(str(e))
                break
        return False
    
    @staticmethod
    def get_athena_sql(sql: str):
        query_exec_id = AthenaClient.get_athena_query_exec_id(sample_sql)
        return AthenaClient.collect_query_result(query_exec_id)

In [9]:
sample_sql = 'SELECT * FROM "mini_db"."teacher-hotel" limit 10;'
result = AthenaClient.get_athena_sql(sample_sql)

result

Still Running... -> QUEUED
Still Running... -> RUNNING
Still Running... -> RUNNING


Unnamed: 0,hotel,is_canceled,lead_time,arrival_date_year,arrival_date_month,arrival_date_week_number,arrival_date_day_of_month,stays_in_weekend_nights,stays_in_week_nights,adults,...,company,days_in_waiting_list,customer_type,adr,required_car_parking_spaces,total_of_special_requests,reservation_status,reservation_status_date,yyyy,mm
0,Resort Hotel,0,129,2015,August,31,1,1,1,2,...,,0,Transient,96.3,0,0,Check-Out,2015-08-03,2015,8
1,Resort Hotel,1,84,2015,August,31,1,2,1,2,...,,0,Transient-Party,118.06,0,0,Canceled,2015-05-12,2015,8
2,Resort Hotel,1,84,2015,August,31,1,2,1,2,...,,0,Transient-Party,118.06,0,0,Canceled,2015-05-12,2015,8
3,Resort Hotel,1,89,2015,August,31,1,0,1,2,...,,0,Transient,154.0,0,0,Canceled,2015-05-04,2015,8
4,Resort Hotel,1,10,2015,August,31,1,0,1,2,...,,0,Transient-Party,202.0,0,0,Canceled,2015-07-29,2015,8
5,Resort Hotel,1,80,2015,August,31,1,0,1,2,...,,0,Transient,134.0,0,2,Canceled,2015-07-16,2015,8
6,Resort Hotel,1,10,2015,August,31,1,0,1,2,...,,0,Transient-Party,202.0,0,0,Canceled,2015-07-29,2015,8
7,Resort Hotel,1,10,2015,August,31,1,0,1,2,...,,0,Transient-Party,252.0,0,0,Canceled,2015-07-29,2015,8
8,Resort Hotel,1,24,2015,August,31,1,0,1,2,...,,0,Transient,233.0,0,1,Canceled,2015-07-16,2015,8
9,Resort Hotel,0,1,2015,August,31,1,0,1,2,...,,0,Transient,211.0,0,2,Check-Out,2015-08-02,2015,8


# 1. Load Dataset (Amazon Athena)

# 2. Preprocessing (EDA)

# 3. DeepLearning Model Architectrure

# 4. Train Model

# 5. Evaluate Model & Hyperparameter Tuning

# 6. Inference Model & Upload S3