# Data Ingestion to Bronze Layer

Bronze layer เป็นชั้นแรกของ Medallion Architecture ที่เก็บ raw data โดยไม่มีการแปลงใด ๆ

## Step 1: Import Libraries ที่จำเป็น

In [13]:
import boto3
import json
import requests
from datetime import datetime
import pandas as pd
from io import StringIO

## Step 2: ตั้งค่าการเชื่อมต่อ MinIO

MinIO ใช้ protocol เดียวกับ AWS S3

In [14]:
# MinIO configuration
minio_endpoint = 'http://localhost:9010'
minio_access_key = 'minioadmin'
minio_secret_key = 'minioadmin'

# สร้าง S3 client สำหรับ MinIO
s3_client = boto3.client(
    's3',
    endpoint_url=minio_endpoint,
    aws_access_key_id=minio_access_key,
    aws_secret_access_key=minio_secret_key,
    use_ssl=False
)

print("เชื่อมต่อ MinIO สำเร็จ")

เชื่อมต่อ MinIO สำเร็จ


## Step 3: ตรวจสอบ Buckets ที่มีอยู่

In [16]:
# List all buckets
response = s3_client.list_buckets()
buckets = [bucket['Name'] for bucket in response['Buckets']]
print("Buckets ที่มีอยู่")
for bucket in buckets:
    print(f"  - {bucket}")

ReadTimeoutError: Read timeout on endpoint URL: "http://localhost:9010/"

## Step 4: ดึงข้อมูลจาก API (Coffee Image)

In [6]:
# ดึงข้อมูลรูปกาแฟ
def fetch_coffee_data():
    url = "https://coffee.alexflipnote.dev/random.json"
    headers = {
        'User-Agent': 'Brew-Right Data Collector 1.0',
        'Accept': 'application/json'
    }
    
    try:
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()
        
        # เพิ่ม metadata
        data = response.json()
        data['fetched_at'] = datetime.now().isoformat()
        data['source'] = url
        
        return data
    except Exception as e:
        print(f"Error: {e}")
        return None

# ทดสอบดึงข้อมูล
coffee_data = fetch_coffee_data()
print(json.dumps(coffee_data, indent=2, ensure_ascii=False))

{
  "file": "https://coffee.alexflipnote.dev/QlzxRWv5JME_coffee.jpg",
  "fetched_at": "2025-07-23T15:22:43.845040",
  "source": "https://coffee.alexflipnote.dev/random.json"
}


## Step 5: บันทึกข้อมูลลง Bronze Layer (JSON Format)

In [8]:
# สร้าง path ใน bronze bucket
# Format: bronze/coffee_images/year=2024/month=01/day=23/timestamp.json
now = datetime.now()
file_path = f"coffee_images/year={now.year}/month={now.month:02d}/day={now.day:02d}/{now.strftime('%Y%m%d_%H%M%S')}.json"

# แปลงข้อมูลเป็น JSON string
json_data = json.dumps(coffee_data, ensure_ascii=False)

# Upload ไปยัง MinIO
try:
    s3_client.put_object(
        Bucket='bronze',
        Key=file_path,
        Body=json_data,
        ContentType='application/json'
    )
    print(f"บันทึกข้อมูลสำเร็จ bronze/{file_path}")
except Exception as e:
    print(f"Error uploading: {e}")

Error uploading: Connection was closed before we received a valid response from endpoint URL: "http://localhost:9010/bronze/coffee_images/year%3D2025/month%3D07/day%3D23/20250723_152842.json".


## Step 6: ดึงข้อมูลราคากาแฟและบันทึกแบบ CSV

In [9]:
# ดึงข้อมูลราคากาแฟจาก ธปท.
from bs4 import BeautifulSoup

def fetch_coffee_prices():
    headers = {
        'User-Agent': 'Mozilla/5.0 (compatible; Brew-Right Bot 1.0; +https://www.plearnjai.com/dstools/bot.html)'
    }
    url = 'https://app.bot.or.th/BTWS_STAT/statistics/BOTWEBSTAT.aspx?language=TH&reportID=588'
    
    try:
        resp = requests.get(url, headers=headers, timeout=10)
        resp.raise_for_status()
        soup = BeautifulSoup(resp.content, 'html.parser')

        prices = []
        for row in soup.find_all('tr'):
            if 'เมล็ดกาแฟ' in row.get_text():
                cols = row.find_all('td')
                if len(cols) >= 3:
                    prices.append({
                        'product': cols[0].get_text(strip=True),
                        'unit': cols[1].get_text(strip=True),
                        'price': cols[2].get_text(strip=True),
                        'scraped_at': datetime.now().isoformat(),
                        'source_url': url
                    })
        
        return prices
    except Exception as e:
        print(f"Error: {e}")
        return []

# ดึงข้อมูล
coffee_prices = fetch_coffee_prices()
coffee_prices

[{'product': '4',
  'unit': 'เมล็ดกาแฟคละ (บาท/กิโลกรัม) 3/',
  'price': '74.51',
  'scraped_at': '2025-07-23T15:35:46.277004',
  'source_url': 'https://app.bot.or.th/BTWS_STAT/statistics/BOTWEBSTAT.aspx?language=TH&reportID=588'}]

In [10]:
# บันทึกเป็น CSV ใน Bronze layer
if coffee_prices:
    # แปลงเป็น DataFrame
    df = pd.DataFrame(coffee_prices)
    
    # สร้าง path
    csv_path = f"coffee_prices/year={now.year}/month={now.month:02d}/day={now.day:02d}/{now.strftime('%Y%m%d_%H%M%S')}.csv"
    
    # แปลงเป็น CSV string
    csv_buffer = StringIO()
    df.to_csv(csv_buffer, index=False)
    csv_data = csv_buffer.getvalue()
    
    # Upload
    try:
        s3_client.put_object(
            Bucket='bronze',
            Key=csv_path,
            Body=csv_data,
            ContentType='text/csv'
        )
        print(f"บันทึก CSV สำเร็จ: bronze/{csv_path}")
    except Exception as e:
        print(f"Error: {e}")

Error: Connection was closed before we received a valid response from endpoint URL: "http://localhost:9010/bronze/coffee_prices/year%3D2025/month%3D07/day%3D23/20250723_152842.csv".


## Step 7: List ไฟล์ที่บันทึกใน Bronze

In [11]:
# List objects in bronze bucket
response = s3_client.list_objects_v2(Bucket='bronze', MaxKeys=10)

if 'Contents' in response:
    print("ไฟล์ล่าสุดใน Bronze layer")
    for obj in response['Contents']:
        print(f"  - {obj['Key']} ({obj['Size']} bytes)")
else:
    print("ยังไม่มีไฟล์ใน Bronze bucket")

ReadTimeoutError: Read timeout on endpoint URL: "http://localhost:9010/bronze?list-type=2&max-keys=10&encoding-type=url"

## Step 8: อ่านข้อมูลกลับมาตรวจสอบ

In [12]:
# อ่าน JSON file ล่าสุด
try:
    # หา JSON file ล่าสุด
    response = s3_client.list_objects_v2(
        Bucket='bronze',
        Prefix='coffee_images/',
        MaxKeys=10
    )
    
    if 'Contents' in response:
        latest_key = response['Contents'][1]['Key']
        
        # อ่านไฟล์
        obj = s3_client.get_object(Bucket='bronze', Key=latest_key)
        content = obj['Body'].read().decode('utf-8')
        data = json.loads(content)
        
        print(f"อ่านไฟล์: {latest_key}")
        print(json.dumps(data, indent=2, ensure_ascii=False))
except Exception as e:
    print(f"Error reading: {e}")

Error reading: Read timeout on endpoint URL: "http://localhost:9010/bronze?list-type=2&prefix=coffee_images%2F&max-keys=10&encoding-type=url"


## สรุป

Bronze Layer คือชั้นแรกที่เก็บ raw data โดย
1. **ไม่แปลงข้อมูล** - เก็บในรูปแบบดั้งเดิม
2. **Partition by date** - จัดเก็บตามวันที่
3. **เพิ่ม metadata** - timestamp, source
4. **รองรับหลายรูปแบบ** - JSON, CSV, Parquet

ข้อมูลใน Bronze พร้อมสำหรับการ transform ไป Silver layer ในขั้นตอนถัดไป!