# Notebook: Thu th·∫≠p d·ªØ li·ªáu ƒë∆°n h√†ng t·ª´ Pancake POS API

### 1.1. Import th∆∞ vi·ªán & Config
### Gi·∫£i th√≠ch
- Import c√°c th∆∞ vi·ªán c·∫ßn thi·∫øt (`os`, `json`, `requests`, `pandas`, `sqlalchemy`, ‚Ä¶).  
- ƒê·ªçc c√°c bi·∫øn m√¥i tr∆∞·ªùng t·ª´ `.env` (API key, th√¥ng tin DB, shop_id).  
- Gi√∫p b·∫£o m·∫≠t th√¥ng tin thay v√¨ ghi tr·ª±c ti·∫øp trong code.  

In [9]:
import os
import json
import time
import random
import requests
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.dialects.mysql import LONGTEXT, VARCHAR
from sqlalchemy.types import BigInteger, DateTime
from dotenv import load_dotenv
from datetime import datetime

# Load bi·∫øn m√¥i tr∆∞·ªùng
load_dotenv()

API_KEY   = os.getenv("API_KEY")
DB_USER   = os.getenv("DB_USER")
DB_PASS   = os.getenv("DB_PASS")
DB_HOST   = os.getenv("DB_HOST")
DB_PORT   = os.getenv("DB_PORT")
DB_BRONZE = os.getenv("DB_BRONZE")
SHOP_ID   = os.getenv("SHOP_ID")  # nh·ªõ th√™m SHOP_ID v√†o file .env


### 1.2. K·∫øt n·ªëi t·ªõi database Bronze
### Gi·∫£i th√≠ch
- S·ª≠ d·ª•ng `SQLAlchemy` ƒë·ªÉ t·∫°o engine k·∫øt n·ªëi MySQL.  
- Database ƒë√≠ch: schema Bronze, n∆°i l∆∞u d·ªØ li·ªáu th√¥ (raw).  
- In ra th√¥ng b√°o khi k·∫øt n·ªëi th√†nh c√¥ng.  

In [10]:
# T·∫°o engine t·ªõi database Bronze
engine_bronze = create_engine(
    f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_BRONZE}"
)

print(f"‚úÖ K·∫øt n·ªëi th√†nh c√¥ng t·ªõi database Bronze: {DB_BRONZE}")


‚úÖ K·∫øt n·ªëi th√†nh c√¥ng t·ªõi database Bronze: winner_bronze


### 1.3. H√†m l·∫•y m·ªôt trang d·ªØ li·ªáu ƒë∆°n h√†ng t·ª´ API
### Gi·∫£i th√≠ch
- H√†m `get_orders_page`:
  - G·ªçi API `/orders` v·ªõi `shop_id`, `page`, `page_size`.
  - Tr·∫£ v·ªÅ: danh s√°ch kh√°ch h√†ng (`data`) v√† t·ªïng s·ªë trang (`total_pages`).  
  - C√≥ c∆° ch·∫ø retry t·ªëi ƒëa 3 l·∫ßn n·∫øu l·ªói (timeout ho·∫∑c HTTP error).  

In [11]:
def get_orders_page(shop_id, page=1, page_size=1000, max_retries=3):
    """
    G·ªçi API Pancake /shops/{shop_id}/orders ƒë·ªÉ l·∫•y m·ªôt trang.
    C√≥ retry + delay khi l·ªói.
    """
    url = f"https://pos.pages.fm/api/v1/shops/{shop_id}/orders"
    params = {"api_key": API_KEY, "page": page, "page_size": page_size}
    
    for attempt in range(1, max_retries + 1):
        try:
            resp = requests.get(url, params=params, timeout=20)
            if resp.status_code == 200:
                j = resp.json()
                data = j.get("data", [])
                total_pages = j.get("total_pages") or j.get("meta", {}).get("total_pages")
                return data, total_pages
            else:
                print(f"‚ö†Ô∏è L·ªói status {resp.status_code} t·∫°i page {page}, attempt {attempt}")
        except Exception as e:
            print(f"‚ö†Ô∏è Exception t·∫°i page {page}: {e}, attempt {attempt}")
        time.sleep(2 * attempt)  # exponential backoff
    return [], None


### 1.4. H√†m ghi d·ªØ li·ªáu batch v√†o MySQL
### Gi·∫£i th√≠ch
- H√†m `insert_orders_batch`:
  - Nh·∫≠n DataFrame `df_batch`.
  - Ghi d·ªØ li·ªáu v√†o b·∫£ng `orders_raw` trong schema Bronze.
  - C√°c c·ªôt g·ªìm: `shop_id`, `order_id`, `raw_json`, `extracted_at`.  

In [12]:
def insert_orders_batch(df_batch):
    df_batch.to_sql(
        "orders_raw",
        con=engine_bronze,
        if_exists="append",
        index=False,
        dtype={
            "shop_id": BigInteger(),
            "order_id": VARCHAR(50),
            "raw_json": LONGTEXT(),
            "extracted_at": DateTime()
        }
    )


### 1.5. H√†m fetch & load nhi·ªÅu trang ƒë∆°n h√†ng
### Gi·∫£i th√≠ch
- H√†m `fetch_and_load_orders`:
  - Ch·∫°y v√≤ng l·∫∑p qua t·ª´ng trang API.
  - Chuy·ªÉn ƒë·ªïi d·ªØ li·ªáu v·ªÅ DataFrame.
  - L∆∞u batch v√†o MySQL b·∫±ng `insert_orders_batch`.
  - C√≥ delay ng·∫´u nhi√™n (1-3s) ƒë·ªÉ tr√°nh b·ªã rate-limit.  

In [13]:
def fetch_and_load_orders(shop_id, start_page=1, page_size=1000):
    page = start_page
    while True:
        orders, total_pages = get_orders_page(shop_id, page, page_size)
        if not orders:
            print("üì¶ Kh√¥ng c√≤n d·ªØ li·ªáu t·∫°i page", page)
            break

        # T·∫°o DataFrame batch
        df_batch = pd.DataFrame([{
            "shop_id": shop_id,
            "order_id": order.get("id"),
            "raw_json": json.dumps(order, ensure_ascii=False),
            "extracted_at": datetime.now()
        } for order in orders])

        # Insert batch v√†o Bronze
        insert_orders_batch(df_batch)
        print(f"‚úÖ Page {page}/{total_pages} - Loaded {len(orders)} orders")

        if total_pages and page >= total_pages:
            break

        # Ng·ªß ng·∫´u nhi√™n 0.5‚Äì1.5s ƒë·ªÉ tr√°nh b·ªã ch·∫∑n
        time.sleep(random.uniform(0.5, 1.5))
        page += 1


### 1.6. Ch·∫°y ti·∫øn tr√¨nh thu th·∫≠p d·ªØ li·ªáu
### Gi·∫£i th√≠ch
- G·ªçi h√†m `fetch_and_load_orders` ƒë·ªÉ thu th·∫≠p to√†n b·ªô d·ªØ li·ªáu ƒë∆°n h√†ng.
- K·∫øt qu·∫£: b·∫£ng `orders_raw` trong schema Bronze ch·ª©a d·ªØ li·ªáu th√¥ t·ª´ API.  


In [14]:
fetch_and_load_orders(SHOP_ID, start_page=1, page_size=1000)


‚úÖ Page 1/41 - Loaded 1000 orders
‚úÖ Page 2/41 - Loaded 1000 orders
‚úÖ Page 3/41 - Loaded 1000 orders
‚úÖ Page 4/41 - Loaded 1000 orders
‚úÖ Page 5/41 - Loaded 1000 orders
‚úÖ Page 6/41 - Loaded 1000 orders
‚úÖ Page 7/41 - Loaded 1000 orders
‚ö†Ô∏è Exception t·∫°i page 8: HTTPSConnectionPool(host='pos.pages.fm', port=443): Read timed out. (read timeout=20), attempt 1
‚úÖ Page 8/41 - Loaded 1000 orders
‚ö†Ô∏è Exception t·∫°i page 9: HTTPSConnectionPool(host='pos.pages.fm', port=443): Read timed out. (read timeout=20), attempt 1
‚úÖ Page 9/41 - Loaded 1000 orders
‚ö†Ô∏è Exception t·∫°i page 10: HTTPSConnectionPool(host='pos.pages.fm', port=443): Read timed out. (read timeout=20), attempt 1
‚úÖ Page 10/41 - Loaded 1000 orders
‚ö†Ô∏è Exception t·∫°i page 11: HTTPSConnectionPool(host='pos.pages.fm', port=443): Read timed out. (read timeout=20), attempt 1
‚úÖ Page 11/41 - Loaded 1000 orders
‚ö†Ô∏è Exception t·∫°i page 12: HTTPSConnectionPool(host='pos.pages.fm', port=443): Read timed out.