In [2]:
import os
import json
import time
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from datetime import datetime, timedelta

In [3]:
def get_lottery_result_mien_nam(date_str):
    options = Options()
    options.add_argument('--headless')
    options.add_argument('--no-sandbox')
    options.add_argument('--disable-dev-shm-usage')
    options.add_argument('--log-level=3')
    options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')

    service = Service(log_output=os.devnull)
    driver = None

    try:
        driver = webdriver.Chrome(options=options, service=service)
        url = f"https://xoso.com.vn/xsmn-{date_str}.html"
        driver.get(url)
        time.sleep(2)

        soup = BeautifulSoup(driver.page_source, 'html.parser')
        driver.quit()

        result_table = soup.find('table', class_='table-result')
        if not result_table:
            print(f"[{date_str}] ❌ Không tìm thấy bảng.")
            return None

        print(f"[{date_str}] ✅ Đã tìm thấy bảng!")

        province_headers = result_table.find("thead").find_all("th")[1:]
        provinces = [th.get_text(strip=True) for th in province_headers]

        all_results = {province: {} for province in provinces}

        tbody = result_table.find("tbody")
        rows = tbody.find_all("tr")
        for row in rows:
            prize_name = row.find("th").get_text(strip=True)
            cells = row.find_all("td")
            for i, cell in enumerate(cells):
                spans = cell.find_all("span")
                values = [span.get_text(strip=True) for span in spans]
                all_results[provinces[i]][prize_name] = values

        special_prizes = []
        for province in provinces:
            special = all_results[province].get("ĐB", [])
            special_prizes.extend(special)

        final_results = {
            "date": date_str,
            "special_prize": special_prizes,
            "all_results": all_results
        }

        return final_results

    except Exception as e:
        print(f"[{date_str}] ⚠ Lỗi:", e)
        if driver:
            driver.quit()
        return None

In [4]:
def collect_lottery_data_range(date_start, date_end, output_path, overwrite=False):
    # Khởi tạo dữ liệu
    if overwrite or not os.path.exists(output_path):
        all_data = []
    else:
        with open(output_path, "r", encoding="utf-8") as f:
            try:
                all_data = json.load(f)
                if not isinstance(all_data, list):
                    all_data = []
            except json.JSONDecodeError:
                all_data = []

    # Tạo khoảng ngày
    start_date = datetime.strptime(date_start, "%d-%m-%Y")
    end_date = datetime.strptime(date_end, "%d-%m-%Y")
    current_date = start_date

    existing_dates = {entry['date'] for entry in all_data}

    while current_date <= end_date:
        date_str = current_date.strftime("%d-%m-%Y")
        if overwrite or date_str not in existing_dates:
            result = get_lottery_result_mien_nam(date_str)
            if result:
                all_data.append(result)
        else:
            print(f"[{date_str}] ⏩ Bỏ qua vì đã có trong file.")
        current_date += timedelta(days=1)

    # Ghi toàn bộ dữ liệu vào file
    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(all_data, f, ensure_ascii=False, indent=2)
        print(f"\n✅ Đã lưu toàn bộ {len(all_data)} kết quả vào '{output_path}'")

In [5]:
def collect_lottery_data_auto(output_path, end_date=None, overwrite=False):
    """
    Tự động tìm ngày mới nhất đã có trong file JSON, tiếp tục thu thập từ ngày tiếp theo đến hôm nay hoặc đến end_date (nếu có).
    """
    from datetime import date

    # Đọc dữ liệu cũ
    if os.path.exists(output_path):
        with open(output_path, "r", encoding="utf-8") as f:
            try:
                all_data = json.load(f)
                if not isinstance(all_data, list):
                    all_data = []
            except json.JSONDecodeError:
                all_data = []
    else:
        all_data = []

    # Tìm ngày lớn nhất đã có trong file
    if all_data:
        existing_dates = [datetime.strptime(entry['date'], "%d-%m-%Y") for entry in all_data]
        last_date = max(existing_dates)
        start_date = last_date + timedelta(days=1)
    else:
        print("⚠ File chưa có dữ liệu. Hãy dùng collect_lottery_data_range() để khởi tạo.")
        return

    # Ngày kết thúc: mặc định là hôm nay nếu không truyền
    if end_date is None:
        end_date = datetime.today()
    else:
        end_date = datetime.strptime(end_date, "%d-%m-%Y")

    # Không có gì để thu thập
    if start_date > end_date:
        print("⏩ Không có ngày mới để cập nhật.")
        return

    # Gọi lại collect_lottery_data_range() với overwrite=False
    date_start_str = start_date.strftime("%d-%m-%Y")
    date_end_str = end_date.strftime("%d-%m-%Y")
    print(f"📅 Đang thu thập từ {date_start_str} đến {date_end_str}...")
    collect_lottery_data_range(date_start_str, date_end_str, output_path, overwrite=overwrite)


In [6]:
def collect_lottery_data_range(date_start=None, date_end=None, output_path="xoso_mien_nam.json", overwrite=False):
    from datetime import date

    # Đọc dữ liệu cũ nếu có
    if os.path.exists(output_path) and not overwrite:
        with open(output_path, "r", encoding="utf-8") as f:
            try:
                all_data = json.load(f)
                if not isinstance(all_data, list):
                    all_data = []
            except json.JSONDecodeError:
                all_data = []
    else:
        all_data = []

    # Nếu không truyền date_start → tự lấy từ file
    if date_start is None:
        if all_data:
            existing_dates = [datetime.strptime(entry["date"], "%d-%m-%Y") for entry in all_data]
            last_date = max(existing_dates)
            start_date = last_date + timedelta(days=1)
        else:
            raise ValueError("⚠️ File rỗng hoặc không tồn tại. Bạn cần truyền date_start để khởi tạo dữ liệu.")
    else:
        start_date = datetime.strptime(date_start, "%d-%m-%Y")

    # Nếu không truyền date_end → mặc định là hôm nay - 1
    if date_end is None:
        end_date = datetime.today() - timedelta(days=1)
    else:
        end_date = datetime.strptime(date_end, "%d-%m-%Y")

    # Kiểm tra logic thời gian
    if start_date > end_date:
        print(f"⏩ Không có ngày mới để thu thập. (start: {start_date.date()}, end: {end_date.date()})")
        return

    # Chuẩn bị danh sách ngày đã có (để tránh trùng)
    existing_dates = {entry["date"] for entry in all_data}

    # Bắt đầu thu thập
    current_date = start_date
    while current_date <= end_date:
        date_str = current_date.strftime("%d-%m-%Y")
        if overwrite or date_str not in existing_dates:
            result = get_lottery_result_mien_nam(date_str)
            if result:
                all_data.append(result)
        else:
            print(f"[{date_str}] ⏩ Đã có dữ liệu. Bỏ qua.")
        current_date += timedelta(days=1)

    # Ghi kết quả ra file
    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(all_data, f, ensure_ascii=False, indent=2)
        print(f"\n✅ Đã lưu {len(all_data)} kết quả vào '{output_path}'")


In [None]:
output_path = r"D:\TechByte_LoDe\Lo-De-Prediction\xs_data\xsmn_data.json"
collect_lottery_data_range(output_path=output_path)

[30-06-2025] ✅ Đã tìm thấy bảng!

✅ Đã lưu 7080 kết quả vào 'D:\TechByte_LoDe\Lo-De-Prediction\xsmn_data.json'


In [None]:
output_path_2 = r"D:\TechByte_LoDe\Lo-De-Prediction\xs_data\xsmt_data.json"
collect_lottery_data_range(output_path=output_path_2)

[30-06-2025] ✅ Đã tìm thấy bảng!

✅ Đã lưu 6866 kết quả vào 'D:\TechByte_LoDe\Lo-De-Prediction\xsmt_data.json'
