<a href="https://colab.research.google.com/github/Theetat-Saejaew/Cloud-Data-Pipeline/blob/main/Data_Collection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Data Collection: มาเก็บรวบรวมข้อมูลจากแหล่งต่าง ๆ (DB & REST API) กันเถอะ!

# อ่านข้อมูลจาก MySQL database

## Install PyMySQL 
ซึ่งเป็น package สำหรับเชื่อมต่อ MySQL database

In [None]:
! pip install pymysql

## Config DB credential: การใช้ config สำหรับเชื่อต่อ database

 

In [None]:
# ขั้นนตอนแรกสำหรับการต่อ database คือการสร้าง connection ซึ่งต้องอาศัย config ต่าง ๆ เช่น Host (IP address), Username, Password ในการเชื่อมต่อ
import os

class Config:
  MYSQL_HOST = 'xxxxxx'
  MYSQL_PORT = xxxxxx              # default สำหรับ port MySQL
  MYSQL_USER = 'xxxxxx'
  MYSQL_PASSWORD = 'xxxxxx'
  MYSQL_DB = 'xxxxxx'
  MYSQL_CHARSET = 'xxxxxx'

In [None]:
# ทดลอง print จาก config
print(Config.MYSQL_PORT)

## Connect to DB
หลังจากที่มี Credential ของ database แล้วก็สร้าง connection โดยการ connect ไปที่ DB ด้วย Config ของเรา

In [None]:
import pymysql

# Connect to the database
connection = pymysql.connect(host=Config.MYSQL_HOST,
                             port=Config.MYSQL_PORT,
                             user=Config.MYSQL_USER,
                             password=Config.MYSQL_PASSWORD,
                             db=Config.MYSQL_DB,
                             charset=Config.MYSQL_CHARSET,
                             cursorclass=pymysql.cursors.DictCursor)

In [None]:
connection

## Query Table

การใช้ `with connection.cursor() as cursor:` จะจัดการ scope ของการเรียกใช้งาน cursor ให้  ในที่นี้ถือว่าได้สร้างตัวแปร cursor แล้วในคำสั่ง with และ ไม่ต้องใช้ cursor.close()

In [None]:
with connection.cursor() as cursor:
  # query ข้อมูลจาก table audible_data
  cursor.execute("SELECT * FROM audible_data;")
  result = cursor.fetchall()

print("number of rows: ", len(result))

In [None]:
# สามารถดูผลลัพธ์ที่อ่าน result มาได้ 
result

In [None]:
# ดูประเภทของ result
type(result)

ประเภทของตัวแปร คือ list (เป็น list ของ dictionary แต่ละบรรทัด)

## Convert to Pandas

In [None]:
import pandas as pd

In [None]:
audible_data = pd.DataFrame(result)

In [None]:
type(audible_data)

In [None]:
audible_data

**ข้อสังเกต**
ตัวเลขข้างหน้าสุดของ pandas ที่เป็น 0 ถึง (จำนวน rows - 1) ในที่นี้คือ 0 - 2268 เรียกว่า **index** 

index คือ สิ่งที่ pandas เอาไว้ใช้เก็บ key ในแต่ละ row เอาไว้ โดยถ้าไม่กำหนด index มาก็จะสร้างให้เหมือนในตัวอย่าง

แต่ในที่นี้เรามี Book_ID ที่เป็นตัวเลย unique ประจำแถวอยู่แล้ว สามารถกำหนด index เป็น Book_ID ได้ เพื่อลดความซ้ำซ้อน

In [None]:
audible_data.set_index("Book_ID")

เท่านี้ก็สามารถ เอา Book_ID มาเป็น index ได้แล้ว

ถ้าไม่อยาก set_index() ทีหลังก็ สามารถใส่ `index_col="Book_ID"` เพิ่มเข้าไปในบรรทัดที่สร้าง DataFrame เลยได้ 
```
audible_data = pd.DataFrame(result), index_col="Book_ID"
``` 

## อีกวิธีหนึ่งในการ query โดยใช้ Pandas สะดวกมาก ๆ

แต่ว่า เนื่องจากว่า table เรามีสอง table เรามาดูอีกวิธีหนึ่งที่สะดวกขึ้น โดยใช้ `read_sql()` ของ pandas

In [None]:
sql = "SELECT * FROM audible_transaction"
audible_transaction = pd.read_sql(sql, connection)
audible_transaction

# Join table: audible_transaction & audible_data

ใน transaction dataframe เราจะไม่เห็นราคาและชื่อสินค้า ถ้าเราอยากรู้ว่าแต่ละ transaction มีจำนวนเงินเท่าไร จึงต้อง merge data รวมกับ dataframe ของ audible_data 

คีย์ที่ใช้ในการ merge คือ
- audible_transaction: `book_id`
- audible_data: `Book_ID`

In [None]:
transaction = audible_transaction.merge(audible_data, how="left", left_on="book_id", right_on="Book_ID")

ดูผลลัพธ์จากการ join 

In [None]:
transaction

ตอนนี้เราได้ข้อมูล transaction มาแล้ว แต่ว่าข้อมูล price เป็น USD (แถมยังเป็น string ที่มี $ ด้วย) 

ในส่วนถัดไป เราจะมาอ่าน data จาก API แปลงค่าเงิน เพื่อแปลงเป็นเงินบาท ตาม rate ของแต่ละวันในอดีตกัน ʕ•́ᴥ•̀ʔ



---


# Get data from REST API

หลังจากต่อกับ Database ได้แล้ว ก็อ่าน data จาก REST API กัน

Package `requests` ใช้สำหรับการเรียกใช้ REST API


วิธีการ install: `pip install requests`

In [None]:
import requests



## Requests library
สามารถศึกษาวิธีการสร้าง request และการใช้งาน package `requests` [ได้ที่นี่](https://requests.readthedocs.io/en/master/)

In [None]:
url = "---"
# ต้องการผลลัพธ์ให้อยู่ในรูปแบบของ dictionary ที่ชื่อว่า result_conversion_rate
r = requests.get(url)
result_conversion_rate = r.json()


In [None]:
result_conversion_rate

มาเช็คประเภทข้อมูล

In [None]:
print(type(result_conversion_rate))
assert isinstance(result_conversion_rate, dict) #assert เป็นการเช็คว่าเป็นจริงหรือไม่

<class 'dict'>


 ## Convert to Pandas
 แปลงกันอีกครั้งหนึ่ง ʕ•́ᴥ•̀ʔ

In [None]:
conversion_rate = pd.DataFrame(result_conversion_rate)

In [None]:
conversion_rate

แปลงจาก index เป็น column date ธรรมดาเพื่อความสะดวกในการ join กับ table transaction

In [None]:
conversion_rate = conversion_rate.reset_index().rename(columns={"index": "date"})
conversion_rate[:3]

# Join the data

ในตอนนี้เราจะนำข้อมูลการซื้อขายและข้อมูล Rate การแปลงค่าเงิน เราจะรวมข้อมูลจากทั้งสอง Dataframe มารวมกัน

เราจะนำข้อมูลจากทั้งสองมารวมกันผ่าน column date ใน transaction และ date ใน conversion_rate 

แต่ถ้าสังเกตดี ๆ แล้วจะพบว่า timestamp ใน retail จะเก็บข้อมูลในรูปแบบ timestamp ส่วน date ใน conversion_rate จะเก็บข้อมูลในรูปแบบ date (ที่เป็น string) เท่านั้น

In [None]:
transaction

In [None]:
# ก็อปปี้ column timestamp เก็บเอาไว้ใน column ใหม่ชื่อ date เพื่อที่จะแปลงวันที่เป็น date เพื่อที่จะสามารถนำมา join กับข้อมูลค่าเงินได้
transaction['date'] = transaction['timestamp']
transaction

In [None]:
# แปลงให้จาก timestamp เป็น date ในทั้ง 2 dataframe (transaction, conversion_rate)
transaction['date'] = pd.to_datetime(transaction['date']).dt.date
conversion_rate['date'] = pd.to_datetime(conversion_rate['date']).dt.date
transaction.head()

In [None]:
# ผลลัพธ์สุดท้ายตั้งชื่อว่า final_df
final_df = transaction.merge(conversion_rate, how="left", left_on="date", right_on="date")
final_df

แต่ตอนนี้ column Price เรายังเป็น string (มีเครื่องหมาย $ อยู่ ต้องเอาออก)
ในที่นี้จะใช้ function apply ของ DataFrame ภายใน apply จะเขียนในรูปแบบของ function หรือเป็น lambda function คือ function ที่สร้างขึ้นมา เพื่อประมวลผลในแต่ละแถว

สุดท้าย แปลงประเภทตัวแปลง เป็น float เพื่อรองรับ จำนวนที่มีทศนิยม

In [None]:
final_df["Price"] = final_df.apply(lambda x: x["Price"].replace("$",""), axis=1)
final_df["Price"] = final_df["Price"].astype(float) #astype แปลงเป็น float

In [None]:
final_df

พอ join ข้อมูลได้แล้ว เราก็ มา คูณ currency conversion กัน (Price * convertsion_rate)

In [None]:
final_df["THBPrice"] = final_df["Price"] * final_df["conversion_rate"]
final_df

อีกวิธีหนึ่ง

In [None]:
def convert_rate(price, rate):
  return price * rate

final_df["THBPrice"] = final_df.apply(lambda x: x["Price"] * x["conversion_rate"], axis=1)
final_df

#ใช้ function
# final_df["THBPrice"] = final_df.apply(lambda x: convert_rate(x["Price"], x["conversion_rate"]), axis=1)

สามารถ drop column ที่ไม่จำเป็นต้องใช้ได้ เช่น date ที่ซ้ำซ้อนกับ timestamp

axis = 1 หมายถึง drop column (ถ้า axis=0 จะใช้ drop row ได้)


In [None]:
final_df = final_df.drop("date", axis=1)

In [None]:
final_df 

## Save to CSV

เซฟ final_df เป็นไฟล์ csv
โดยปกติ pandas จะเซฟ index (0,1,2,3) ติดมาให้ด้วย ถ้าไม่ต้องการจะต้องใส่ `index=False`

In [None]:
final_df.to_csv('output.csv',index=False)

หรือสามารถเปิดดูไฟล์ด้วย bash command `head` ได้ด้วย

In [None]:
!head output.csv