In [None]:
import pandas as pd
from datetime import datetime
from meteostat import Point, Daily
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

# ---- 缓存 ----
weather_cache = {}

# ---- 天气获取函数 ----
def get_todays_forecast(lat, long, date, retries=3):
    key = (round(lat, 2), round(long, 2), date)
    if key in weather_cache:
        return weather_cache[key]

    for attempt in range(retries + 1):
        try:
            date_obj = datetime.strptime(date, '%Y-%m-%d')
            location = Point(lat, long)
            data = Daily(location, date_obj, date_obj).fetch()

            if data.empty:
                weather_cache[key] = None
                return None

            forecast = data.iloc[0]
            result = {
                "temperature_avg_C": forecast['tavg'],
                "temperature_min_C": forecast['tmin'],
                "temperature_max_C": forecast['tmax'],
                "precipitation_mm": forecast['prcp'],
                "wind_speed_kph": forecast['wspd'],
                "snow_mm": forecast['snow']
            }
            weather_cache[key] = result
            return result

        except Exception:
            if attempt < retries:
                time.sleep(1)
            else:
                weather_cache[key] = None
                return None

# ---- 单行天气处理 ----
def process_row(index, row):
    origin = get_todays_forecast(row['Latitude_origin'], row['Longitude_origin'], row['DATE'])
    dest = get_todays_forecast(row['Latitude_dest'], row['Longitude_dest'], row['DATE'])

    result = {}
    if origin:
        result.update({f'origin_{k}': v for k, v in origin.items()})
    if dest:
        result.update({f'dest_{k}': v for k, v in dest.items()})
    return index, result

# ---- 并行获取天气 ----
def parallel_weather_fetch(df, max_workers=10):
    results = {}
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(process_row, idx, row) for idx, row in df.iterrows()]
        for future in as_completed(futures):
            idx, result = future.result()
            results[idx] = result

    weather_df = pd.DataFrame.from_dict(results, orient='index')
    weather_df = weather_df.sort_index()
    return pd.concat([df.reset_index(drop=True), weather_df.reset_index(drop=True)], axis=1)

# ---- 主函数 ----
def process_flight_data(flight_csv, airport_csv, max_workers=10):
    # 加载数据
    flight_data = pd.read_csv(flight_csv)
    airport_info = pd.read_csv(airport_csv)

    # 补全出发地经纬度
    flight_data = flight_data.merge(
        airport_info,
        left_on="ORIGIN_IATA",
        right_on="Airport",
        how="left"
    ).rename(columns={
        "Latitude": "Latitude_origin",
        "Longitude": "Longitude_origin"
    }).drop(columns=["Airport"])

    # 补全目的地经纬度
    flight_data = flight_data.merge(
        airport_info,
        left_on="DEST_IATA",
        right_on="Airport",
        how="left"
    ).rename(columns={
        "Latitude": "Latitude_dest",
        "Longitude": "Longitude_dest"
    }).drop(columns=["Airport"])

    # 统一时间格式
    flight_data['DATE'] = pd.to_datetime(flight_data['DATE']).dt.strftime('%Y-%m-%d')

    # 获取天气数据
    start_time = time.time()
    enriched_df = parallel_weather_fetch(flight_data, max_workers=max_workers)
    print(f"Weather data enrichment completed in {time.time() - start_time:.2f} seconds")

    return enriched_df


In [None]:
if __name__ == "__main__":
    processed_df = process_flight_data("processed_flights_May2024.csv", "airports_info_.csv", max_workers=10)
    processed_df.to_csv("flights_with_weather.csv", index=False)
    print("Saved enriched flight data.")