## 프로젝트
* 외부 DB에 있는 데이터를 끌어 와서
* Glue가 이해할 수 있는 형태로? 데이터를 저장해 주고 
* 크롤러가 해당 데이터를 크롤링을 해서 
* 뭔가 하나 만들어 두면
* Athena로 쿼리를 질의해서 마치 DB가 있는것마냥 쓸 수 있게 해 보는 과정이다.

In [1]:
# .env 파일을 환경변수로 설정시켜주기 위한 패키지
# !pip install python-dotenv

Collecting python-dotenv
  Downloading python_dotenv-0.21.1-py3-none-any.whl (19 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-0.21.1
[0m

In [1]:
# 이렇게 사용한다.
from dotenv import load_dotenv

load_dotenv()

True

In [None]:
# 아래 코드를 통해 환경변수로 설정이 되었음을 확인할 수 있다. 
import os

# os.environ

In [4]:
# 이제는 이전처럼 직접 칠 필요가 없이 환경변수에 설정되어 있는 값을 불러 오면 된다. 
from easydict import EasyDict
import boto3

settings = EasyDict()

settings.AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID']     
settings.AWS_SECRET_ACCESS_KEY = os.environ["AWS_SECRET_ACCESS_KEY"]
settings.AWS_REGION_NAME = os.environ["AWS_REGION_NAME"]   
settings.AWS_ACCOUNT_ID = os.environ["AWS_ACCOUNT_ID"]      
settings.DB_HOST = os.environ["DB_HOST"]
settings.DB_USER = os.environ["DB_USER"]
settings.DB_PASSWORD = os.environ["DB_PASSWORD"]
settings.DB_NAME = os.environ["DB_NAME"]
settings.DB_PORT = os.environ["DB_PORT"]


In [8]:
import pymysql
import pandas as pd

db = pymysql.connect(
    host = settings.DB_HOST,
    user = settings.DB_USER,
    passwd = settings.DB_PASSWORD,
    db = settings.DB_NAME,
    port = int(settings.DB_PORT)
)

cursor = db.cursor()

cursor

<pymysql.cursors.Cursor at 0x7f141c69afa0>

### 여기부터는 직접 생각해본 부분
* 데이터가 2015년 7월부터 언제까지 존재하는가? 에 대해 알아보기

In [68]:
from datetime import datetime

def get_query_to_df(sql: str):
    cursor.execute(sql)
    data = cursor.fetchall()
    columns = [elem[0] for elem in cursor.description]
    return pd.DataFrame(data, columns = columns)

# create_query 함수에서 자료가 어디까지 존재하는지를 빠르게 알기 위해 LIMIT 1을 붙인 상태
# 자료가 하나만 있어도 있다는 것이므로...
def create_query(year, month):
    month = datetime.strptime(str(month), '%m').strftime('%B')
    return f"SELECT * FROM hotel WHERE arrival_date_year = {year} && arrival_date_month = '{month}' LIMIT 1;"

# 데이터는 몇년 몇월까지 존재하는가..?
# 아래 방법을 통해 확인해 보자.
len(get_query_to_df(create_query(2015, 1)))

0

In [69]:
# 시작 연월은 알고있으므로 고정
year = 2015
month = 7

# 해당 연월에 데이터가 없는 곳 까지 계속 아래 코드를 실행
# 12월이 되기 전까지는 월에만 1씩 더하기
# 12월이 되면 연도에 1을 더하고, 월은 1로 초기화
while len(get_query_to_df(create_query(year, month))) > 0:
    
    if month < 12:
        month += 1
        
    elif month >= 12:
        year += 1
        month = 1
        
# 코드가 끝나면 해당 연월을 출력
print(year, month)        

2017 8


In [70]:
# 2017년 8월달 정보가 없어서 해당 연월에서 멈췄음을 알 수 있다.
get_query_to_df(create_query(2017, 8))

Unnamed: 0,hotel,is_canceled,lead_time,arrival_date_year,arrival_date_month,arrival_date_week_number,arrival_date_day_of_month,stays_in_weekend_nights,stays_in_week_nights,adults,...,deposit_type,agent,company,days_in_waiting_list,customer_type,adr,required_car_parking_spaces,total_of_special_requests,reservation_status,reservation_status_date


In [71]:
# 그렇다면 2017년 7월까지는 정보가 존재한다는 의미이다. 
get_query_to_df(create_query(2017, 7))

Unnamed: 0,hotel,is_canceled,lead_time,arrival_date_year,arrival_date_month,arrival_date_week_number,arrival_date_day_of_month,stays_in_weekend_nights,stays_in_week_nights,adults,...,deposit_type,agent,company,days_in_waiting_list,customer_type,adr,required_car_parking_spaces,total_of_special_requests,reservation_status,reservation_status_date
0,Resort Hotel,1,130,2017,July,26,1,2,1,2,...,No Deposit,181.0,,0,Transient-Party,138.4,0,1,Canceled,2017-03-10


* 위 코드를 통해 데이터는 2017년 7월까지만 존재한다는 사실을 알 수 있다.

### 수업으로 진행한 코드
* 위에 정의한 함수는 밑에서 다시 사용하지는 않고, 사용하는 함수는 밑에 다시 적어두었습니다.

In [75]:
# 오우~ SQL 천재..
get_query_to_df("SELECT DISTINCT arrival_date_year, arrival_date_month FROM hotel")

Unnamed: 0,arrival_date_year,arrival_date_month
0,2015,July
1,2015,August
2,2015,September
3,2015,October
4,2015,November
5,2015,December
6,2016,January
7,2016,February
8,2016,March
9,2016,April


In [63]:
# 테스트를 위한 코드
from datetime import datetime

int(datetime.strptime('March', '%B').strftime('%m'))

3

In [60]:
# 테스트를 위한 코드
datetime.strptime(str(3), '%m').strftime('%B')

'March'

In [77]:
period_df = get_query_to_df("SELECT DISTINCT arrival_date_year, arrival_date_month FROM hotel")
period_df.head()

Unnamed: 0,arrival_date_year,arrival_date_month
0,2015,July
1,2015,August
2,2015,September
3,2015,October
4,2015,November


In [80]:
# 1번
for _, row in period_df.iterrows():
    print(row['arrival_date_year'], row['arrival_date_month'])

2015 July
2015 August
2015 September
2015 October
2015 November
2015 December
2016 January
2016 February
2016 March
2016 April
2016 May
2016 June
2016 July
2016 August
2016 September
2016 October
2016 November
2016 December
2017 January
2017 February
2017 March
2017 April
2017 May
2017 June
2017 July


In [81]:
# 2번
for i in range(len(period_df)):
    row = period_df.iloc[i]
    print(row['arrival_date_year'], row['arrival_date_month'])

2015 July
2015 August
2015 September
2015 October
2015 November
2015 December
2016 January
2016 February
2016 March
2016 April
2016 May
2016 June
2016 July
2016 August
2016 September
2016 October
2016 November
2016 December
2017 January
2017 February
2017 March
2017 April
2017 May
2017 June
2017 July


* 1번과 2번은 같은 일을 해준다. 하지만 차이가 있다.
* 1번의 경우 데이터프레임을 메모리에 전부 올리지 않아도 처리할 수 있다.
* 2번의 경우 데이터프레임이 메모리에 전부 올라와야 처리를 할 수 있다.

* 1번의 경우 for 문이 돌아갈 범위 지정이 불가능하다.
* 2번의 경우 for 문이 돌아갈 범위 지정이 가능하다.

In [82]:
# 이런 방식으로 할 수 있다.
quaries = [f"SELECT * FROM hotel WHERE arrival_date_year = {row['arrival_date_year']} && arrival_date_month = '{row['arrival_date_month']}';" for _, row in period_df.iterrows()]
quaries

["SELECT * FROM hotel WHERE arrival_date_year = 2015 && arrival_date_month = 'July';",
 "SELECT * FROM hotel WHERE arrival_date_year = 2015 && arrival_date_month = 'August';",
 "SELECT * FROM hotel WHERE arrival_date_year = 2015 && arrival_date_month = 'September';",
 "SELECT * FROM hotel WHERE arrival_date_year = 2015 && arrival_date_month = 'October';",
 "SELECT * FROM hotel WHERE arrival_date_year = 2015 && arrival_date_month = 'November';",
 "SELECT * FROM hotel WHERE arrival_date_year = 2015 && arrival_date_month = 'December';",
 "SELECT * FROM hotel WHERE arrival_date_year = 2016 && arrival_date_month = 'January';",
 "SELECT * FROM hotel WHERE arrival_date_year = 2016 && arrival_date_month = 'February';",
 "SELECT * FROM hotel WHERE arrival_date_year = 2016 && arrival_date_month = 'March';",
 "SELECT * FROM hotel WHERE arrival_date_year = 2016 && arrival_date_month = 'April';",
 "SELECT * FROM hotel WHERE arrival_date_year = 2016 && arrival_date_month = 'May';",
 "SELECT * FROM h

In [116]:
# 아래 사용되는 함수를 정의한 부분
import re

def get_query_to_df(sql: str):
    cursor.execute(sql)
    data = cursor.fetchall()
    columns = [elem[0] for elem in cursor.description]
    return pd.DataFrame(data, columns = columns)

# def get_query_to_data(sql: str):
#     cursor.execute(sql)
#     data = cursor.fetchall()
    
#     return data

def get_year(sql: str):
    year = re.search("=(.+?)&", sql).group()[2:6]
    return year

def get_month(sql: str):
    month_str = re.search("'(.+?)'", sql).group().strip("'")
    month_num = datetime.strptime(month_str, '%B').strftime('%m')
    
    if int(month_num) < 10:
        month_num = month_num.zfill(2)
        
    else:
        month_num = month_num   
        
    return month_num

def upload_s3_df(df: pd.DataFrame, year: str, month: str):
    try:
        
        file_name = f"s3://genia-bucket/raw/student1/hotel/yyyy={year}/mm={month}/hotel.parquet"
        
        with s_open(file_name, "wb", transport_params = dict(client = s3_client)) as out_file:
            df.to_parquet(out_file, engine = 'pyarrow', compression = 'gzip', index = False)
    
        return print(f"{year}년 {month}월 처리완료")
        
    except:
        
        sys.exit()
        return print(f"{year}년 {month}월 처리에 문제가 있습니다.")
        
    

In [117]:
import sys
from smart_open import open as s_open

s3_client = boto3.client(
    service_name = 's3',
    aws_access_key_id = settings.AWS_ACCESS_KEY_ID,
    aws_secret_access_key = settings.AWS_SECRET_ACCESS_KEY,
    region_name = settings.AWS_REGION_NAME
)

for sql in quaries:
    df = get_query_to_df(sql)
    year = get_year(sql)
    month = get_month(sql)
    upload_s3_df(df, year, month)

2015년 07월 처리완료


---

In [106]:
# 테스트를 위한 코드
import re

re.search("=(.+?)&", "SELECT * FROM hotel WHERE arrival_date_year = 2017 && arrival_date_month = 'February';").group()[2:6]

'2017'

In [88]:
# 테스트를 위한 코드
int(datetime.strptime(re.search("'(.+?)'", "SELECT * FROM hotel WHERE arrival_date_year = 2017 && arrival_date_month = 'February';").group().strip("'"), '%B').strftime('%m'))

2

In [118]:
# 테스트를 위한 코드
'2'.zfill(10)

'0000000002'

In [5]:
# 테스트를 위한 코드 
year = '2017'
month = '03'

file_name = f"yyyy={year}/mm={month}/hotel.parquet"

file_name

'yyyy=2017/mm=03/hotel.parquet'

---

In [None]:
# 아래 코드를 통해 Athena에 쿼리를 질의하고 해당 내용을 csv파일로 받아와서 Pandas DataFrame으로 보여줄 수 있다.
import time

RETRY_COUNT = 10
DATABASE = '데이터베이스명'

Athena_client = boto3.client(
    service_name = 'athena',
    aws_access_key_id = settings.AWS_ACCESS_KEY_ID,
    aws_secret_access_key = settings.AWS_SECRET_ACCESS_KEY,
    region_name = settings.AWS_REGION_NAME
)


response = Athena_client.start_query_execution(
        QueryString='''SELECT * FROM "student1-hotel" WHERE arrival_date_year = 2017 AND arrival_date_month = 'February';''',
        QueryExecutionContext={
            'Database': DATABASE
        },
        ResultConfiguration={
            'OutputLocation': 'S3에저장할경로',
        }
    )

query_execution_id = response['QueryExecutionId']

for i in range(1, 1 + RETRY_COUNT):

        # get query execution
        query_status = Athena_client.get_query_execution(QueryExecutionId=query_execution_id)
        query_execution_status = query_status['QueryExecution']['Status']['State']

        if query_execution_status == 'SUCCEEDED':
            print("STATUS:" + query_execution_status)
            break

        elif query_execution_status == 'FAILED':
            raise Exception("STATUS:" + query_execution_status)

        else:
            print("STATUS:" + query_execution_status)
            time.sleep(i)
else:
    Athena_client.stop_query_execution(QueryExecutionId=query_execution_id)
    raise Exception('TIME OVER')

result = Athena_client.get_query_results(QueryExecutionId=query_execution_id)

obj = s3_client.get_object(Bucket = 'genia-bucket', Key = f'raw/student1/output/{query_execution_id}.csv' )
res = pd.read_csv(obj['Body'], encoding = 'UTF8')
res

#### for...else??
* 위의 코드를 보면 for 다음에 else가 쓰이고 있다.
* 찾아보니 for문이 중간에 break되거나 raise Exception 되는 등으로 끝나지 않고 끝까지 돌았을 때 밖의 else로 넘어간다고 한다.
* 위의 코드에서는 RETRY_COUNT 만큼 쿼리를 날리면서 for문이 돌았는데도 성공이나 실패를 하지 않는 경우에 TIME OVER 를 띄워주는 것이다.
* 다른 언어에서는 거의 사용하지 않는 사용법인 듯 하다. 
---

In [None]:
# 위처럼 하면 아래 과정은 필요가 없다.
# 아래 부분은 직접 json을 잘라서 다시 DataFrame으로 만드는 과정이다. 
# 그냥 위처럼 csv파일을 받아오는것이 훨씬 낫다.
result

In [197]:
columns = [result['ResultSet']['Rows'][0]['Data'][i]['VarCharValue'] for i in range(len(result['ResultSet']['Rows'][0]['Data']))]

In [208]:
total_list = list()
for d in range(1, len(result['ResultSet']['Rows'])):
    row_list = list()
    
    for row_num in range(len(result['ResultSet']['Rows'][d]['Data'])):
        
        
        try:
            row_list.append(result['ResultSet']['Rows'][d]['Data'][row_num]['VarCharValue'])
            
        except:
            row_list.append('0')
            
    total_list.append(row_list)

In [209]:
total_list

[['Resort Hotel',
  '1',
  '76',
  '2017',
  'February',
  '5',
  '1',
  '0',
  '1',
  '2',
  '0.0',
  '0',
  'BB',
  'DEU',
  'Online TA',
  'TA/TO',
  '0',
  '0',
  '0',
  'A',
  'A',
  '0',
  'No Deposit',
  '240.0',
  '0',
  '0',
  'Transient',
  '42.0',
  '0',
  '1',
  'Canceled',
  '2016-11-21',
  '2017',
  '02'],
 ['Resort Hotel',
  '1',
  '11',
  '2017',
  'February',
  '5',
  '1',
  '0',
  '2',
  '2',
  '0.0',
  '0',
  'BB',
  'ESP',
  'Online TA',
  'TA/TO',
  '0',
  '0',
  '0',
  'A',
  'A',
  '0',
  'No Deposit',
  '240.0',
  '0',
  '0',
  'Transient',
  '48.0',
  '0',
  '0',
  'Canceled',
  '2017-01-21',
  '2017',
  '02'],
 ['Resort Hotel',
  '1',
  '0',
  '2017',
  'February',
  '5',
  '1',
  '2',
  '6',
  '2',
  '0.0',
  '0',
  'BB',
  'IRL',
  'Online TA',
  'TA/TO',
  '0',
  '0',
  '0',
  'D',
  'D',
  '0',
  'No Deposit',
  '240.0',
  '0',
  '0',
  'Transient',
  '59.0',
  '0',
  '0',
  'Canceled',
  '2017-02-01',
  '2017',
  '02'],
 ['Resort Hotel',
  '1',
  '123',
 

In [210]:
pd.DataFrame(total_list, columns = columns)

Unnamed: 0,hotel,is_canceled,lead_time,arrival_date_year,arrival_date_month,arrival_date_week_number,arrival_date_day_of_month,stays_in_weekend_nights,stays_in_week_nights,adults,...,company,days_in_waiting_list,customer_type,adr,required_car_parking_spaces,total_of_special_requests,reservation_status,reservation_status_date,yyyy,mm
0,Resort Hotel,1,76,2017,February,5,1,0,1,2,...,0,0,Transient,42.0,0,1,Canceled,2016-11-21,2017,02
1,Resort Hotel,1,11,2017,February,5,1,0,2,2,...,0,0,Transient,48.0,0,0,Canceled,2017-01-21,2017,02
2,Resort Hotel,1,0,2017,February,5,1,2,6,2,...,0,0,Transient,59.0,0,0,Canceled,2017-02-01,2017,02
3,Resort Hotel,1,123,2017,February,5,1,4,10,2,...,0,0,Transient,37.8,0,0,Canceled,2016-12-24,2017,02
4,Resort Hotel,1,85,2017,February,5,1,0,4,2,...,0,0,Transient,72.0,0,1,Canceled,2016-11-13,2017,02
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
994,Resort Hotel,0,207,2017,February,7,13,1,3,1,...,0,113,Transient-Party,56.0,0,0,Check-Out,2017-02-17,2017,02
995,Resort Hotel,0,94,2017,February,7,13,1,3,3,...,0,0,Transient-Party,88.0,0,0,Check-Out,2017-02-17,2017,02
996,Resort Hotel,0,94,2017,February,7,13,1,3,2,...,0,0,Transient-Party,62.0,0,1,Check-Out,2017-02-17,2017,02
997,Resort Hotel,0,94,2017,February,7,13,1,3,1,...,0,0,Transient-Party,44.0,0,0,Check-Out,2017-02-17,2017,02
