In [1]:
import requests
import pathlib

pathlib.Path('data').mkdir(exist_ok=True)

# 楓葉紅葉日期
if not pathlib.Path('data/maple/maple_foliage.csv').exists():
    response = requests.get('https://www.data.jma.go.jp/sakura/ruinenchi/015.csv')
    response.encoding = 'shift-jis'
    with open('data/maple/maple_foliage.csv', 'w', encoding='utf-8') as file:
        file.write(response.text)


# 楓葉落葉日期
if not pathlib.Path('data/maple/maple_fall.csv').exists():
    response = requests.get('https://www.data.jma.go.jp/sakura/ruinenchi/016.csv')
    response.encoding = 'shift-jis'
    with open('data/maple/maple_shedding.csv', 'w', encoding='utf-8') as file:
        file.write(response.text)

In [None]:
import pandas as pd

foliage_data = pd.read_csv('data/maple/maple_foliage.csv', skiprows=1, encoding='utf-8')

foliage_data.head()

In [None]:
def drop_unused_columns(df):
    columns = ['番号', '平年値', '最早値', '最早年', '最晩値', '最晩年']
    remark_columns = [col for col in df.columns if col.startswith('rm')]
    return df.drop(columns=columns + remark_columns)

foliage_data = drop_unused_columns(foliage_data)

foliage_data.head()


In [None]:
foliage_data['地点名'] = foliage_data['地点名'].str.strip()
location_names = foliage_data['地点名'].unique()
pd.Series(location_names)

In [5]:
from time import sleep

def fetch_history_weather_data(latitude, longtitude, start_date, end_date, variables):
    url = (
        f'https://archive-api.open-meteo.com/v1/archive?'
        f'latitude={latitude}&'
        f'&longitude={longtitude}&'
        f'start_date={start_date}&'
        f'end_date={end_date}&'
        f'{variables}&'
        'timezone=Asia/Tokyo'
    )
    for _ in range(3):
        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.Timeout:
            print('Timeout, retrying...')
            sleep(5)
        except requests.exceptions.RequestException as e:
            print(f'Request failed: {e}')
            break
    return None
    

In [6]:
from tqdm import tqdm
import json

def download_weather_data(geolocator, location_names, directory):
    for location_name in (pbar := tqdm(location_names)):
        pbar.set_description('Downloading')
        pbar.set_postfix_str(location_name)
        
        pathlib.Path(f'{directory}/{location_name}').mkdir(exist_ok=True, parents=True)

        location = geolocator.geocode(location_name)
        if location is None:
            print(f'{location_name} not found')
            continue

        for year in range(1953, 2024):
            if pathlib.Path(f'{directory}/{location_name}/{year}.json').exists():
                continue
            response = fetch_history_weather_data(
                location.latitude, 
                location.longitude, 
                f'{year}-09-01', 
                f'{year}-12-31', 
                'daily=temperature_2m_max,temperature_2m_min,temperature_2m_mean,daylight_duration,precipitation_sum'
            )

            if response is None:
                print(f'Failed to fetch data for {location_name} in {year}')
                return

            # 確認是否有錯誤 ( 如: API 每小時的限制 )
            if response.get('error'):
                print(response.get('reason'))
                return

            with open(f'{directory}/{location_name}/{year}.json', 'w', encoding='utf-8') as file:
                file.write(json.dumps(response))
            
            sleep(1)

In [7]:
from geopy.geocoders import Nominatim
import numpy as np


geolocator = Nominatim(user_agent="kaede")

download_completed = True

location_names = np.array_split(location_names, 4)

id = 2
if not download_completed:
    download_weather_data(geolocator, location_names[id], 'data/weather')

In [8]:
def get_date_index(year, date):
    """
    獲取日期在某段時間的索引
    """
    start_date = f'{year}-09-01'
    end_date = f'{year}-12-31'
    date_range = pd.date_range(start_date, end_date)

    return date_range.get_loc(date)

def number_to_date(num):
    """
    將數字轉換為日期格式
    """
    return f'{num // 100:02d}-{num % 100:02d}'

def get_weather_data(data, metric, index, offset, mode=None):
    """
    取得天氣資料中某個指標的指定範圍平均值、最大值或最小值

    Args:
        data (dict): 天氣資料
        metric (str): 指標名稱
        index (int): 日期索引
        offset (int): 日期偏移量
        mode (str): 計算模式 (mean, max, min)
    Returns:
        float: 計算結果
    """
    # 確認是否有足夠資料
    if index < offset:
        return None
    
    if offset != 0: # 如果偏移量不為 0 則進行不同模式的計算
        if mode == 'mean':
            return sum(data['daily'][metric][index-offset:index]) / (offset + 1)
        elif mode == 'max':
            return max(data['daily'][metric][index-offset:index])
        elif mode == 'min':
            return min(data['daily'][metric][index-offset:index])
    else:
        return data['daily'][metric][index]

In [9]:
# 天氣指標
metrics = ['temperature_2m_max', 'temperature_2m_min', 'temperature_2m_mean', 'daylight_duration', 'precipitation_sum']

# 將天氣指標轉換為縮寫
metric_mapping = {
    'temperature_2m_max': 't',
    'temperature_2m_min': 't',
    'temperature_2m_mean': 't',
    'daylight_duration': 'd',
    'precipitation_sum': 'p'
}

# 週期所對應的日數 (offset)
periods = {
    '': 0,
    '1w': 6,
    '2w': 13
}

# 計算模式
mode_mapping = {
    'temperature_2m_max': ['max'],
    'temperature_2m_min': ['min'],
    'temperature_2m_mean': ['mean'],
    'daylight_duration': ['max', 'mean', 'min'],
    'precipitation_sum': ['max', 'mean', 'min']

}

In [None]:
data_column = [
    'latitude',
    'd', 'p', 't_max', 't_min', 't_mean',
    't_1w_max', 't_1w_min', 't_1w_mean',
    't_2w_max', 't_2w_min', 't_2w_mean',
    'd_1w_max', 'd_1w_min', 'd_1w_mean',
    'd_2w_max', 'd_2w_min', 'd_2w_mean',
    'p_1w_max', 'p_1w_min', 'p_1w_mean',
    'p_2w_max', 'p_2w_min', 'p_2w_mean'
]

data = pd.DataFrame(columns=data_column)

record_data = foliage_data.to_dict(orient='records')
for record in (pbar := tqdm(record_data)):
    pbar.set_postfix_str(record['地点名'])
    location_name = record['地点名']
    location = geolocator.geocode(location_name)

    for year in range(1953, 2024):
        foliage_day = record[str(year)]
        if foliage_day == 0 or len(str(foliage_day)) == 3: # 沒有資料
            continue

        data_index = get_date_index(year, f'{year}-{number_to_date(foliage_day)}')

        # 讀取天氣資料
        with open(f'data/weather/{location_name}/{year}.json', 'r', encoding='utf-8') as file:
            weather_data = json.load(file)
        
        # 取紅葉當天的資料當作 true，前 6 天的資料當作 false
        for index in range(data_index - 6, data_index + 1):
            new_row = { 
                'latitude': location.latitude, # 緯度
                'is_foliage': 1 if index == data_index else 0, # 是否為紅葉當天
            }

            # 對於所有的組合 (metric, period, mode)
            for metric in metrics:
                for period, offset in periods.items():
                    for mode in mode_mapping[metric]:
                        # 跳過沒有用的組合 Ex: d_max, d_min
                        if metric in ['daylight_duration', 'precipitation_sum'] and mode in ['max', 'min'] and offset == 0:
                            continue

                        # 設定欄位名稱
                        if offset == 0 and metric in ['daylight_duration', 'precipitation_sum']:
                            key = f'{metric_mapping[metric]}' # Ex: d, p
                        elif offset == 0:
                            key = f'{metric_mapping[metric]}_{mode}' # Ex: t_max, t_min, t_mean
                        else:
                            key = f'{metric_mapping[metric]}_{period}_{mode}'
                        
                        # 取得資料
                        value = get_weather_data(weather_data, metric, index, offset, mode)
                        if value is None:
                            continue
                        new_row[key] = value

            new_row = pd.DataFrame(new_row, index=[0])
            data = pd.concat([data, new_row], ignore_index=True)
        
data.to_csv('data/data.csv', index=False)

In [None]:
import os
import pandas as pd
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE

# 避免多核心檢測錯誤的環境變數設定
os.environ["LOKY_MAX_CPU_COUNT"] = "6"  # 使用 6 個核心

# 讀取原始資料並隨機打亂
data = pd.read_csv('./data/data.csv').sample(frac=1, random_state=42).reset_index(drop=True)

# 將資料分為 train.csv 和 test.csv
train_data, test_data = train_test_split(data, test_size=0.2, random_state=42, stratify=data['is_foliage'])
train_data.to_csv('./data/train.csv', index=False)
test_data.to_csv('./data/test.csv', index=False)

print("資料已分割並儲存為 train.csv 和 test.csv")

# 讀取 train.csv 和 test.csv
train_data = pd.read_csv('./data/train.csv')
test_data = pd.read_csv('./data/test.csv')

# 特徵與目標變數
X_train = train_data.drop(columns=['is_foliage'])
y_train = train_data['is_foliage'].apply(lambda x: 1 if x == 1.0 else 0)

X_test = test_data.drop(columns=['is_foliage'])
y_test = test_data['is_foliage'].apply(lambda x: 1 if x == 1.0 else 0)

# 確保特徵數據全為數值型
X_train = X_train.select_dtypes(include=['float64', 'int64'])
X_test = X_test.select_dtypes(include=['float64', 'int64'])

# 使用 SMOTE 平衡 train.csv 的類別分佈
smote = SMOTE(random_state=42)
X_train, y_train = smote.fit_resample(X_train, y_train)

# 標準化數值特徵
scaler = StandardScaler()
X_train = pd.DataFrame(scaler.fit_transform(X_train), columns=X_train.columns)
X_test = pd.DataFrame(scaler.transform(X_test), columns=X_test.columns)

# 設定 XGBClassifier 模型，使用最佳參數
best_model = XGBClassifier(
    random_state=42,
    eval_metric='error',  # 使用準確率作為評估指標
    tree_method='hist',  # 使用 GPU 加速的話改為 'gpu_hist'
    gpu_id=0,            # 指定使用第 0 塊 GPU，如果有多塊 GPU 改為 0,1,2,3...
    scale_pos_weight=len(y_train[y_train == 0]) / len(y_train[y_train == 1]),
    subsample=0.7,
    reg_lambda=1,
    reg_alpha=0.1,
    n_estimators=1200,
    min_child_weight=8,
    max_depth=15,
    learning_rate=0.05,
    gamma=0.3,
    colsample_bytree=0.7
)

# 訓練模型
best_model.fit(X_train, y_train)

# 使用 test.csv 預測並計算準確率
y_pred = best_model.predict(X_test)
test_accuracy = accuracy_score(y_test, y_pred)
print(f"Train Accuracy: {best_model.score(X_train, y_train):.4f}")
print(f"Test Accuracy: {test_accuracy:.4f}")


資料已分割並儲存為 train.csv 和 test.csv
Train Accuracy: 0.9998
Test Accuracy: 0.8208
