In [2]:
# we are trying to extract 2024 weather data for our 2024 sample data set
# to test whether the weather data extraction can be done successfully 
# we are gonna use dataset "2024_weather_data_with_coords_complete.csv"

In [5]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import requests
import time
import json
import sqlite3
from pathlib import Path
from tqdm import tqdm
import asyncio
import aiohttp
import nest_asyncio
nest_asyncio.apply()  # this allows nested async loops

class WeatherDataProcessor:
    def __init__(self, api_key='023826bb6ba471680c4069db1a508424', cache_db='weather_cache.db', max_concurrent=5):
        self.api_key = api_key
        self.cache_db = cache_db
        self.max_concurrent = max_concurrent
        self.setup_cache()
        self.session = None
    
    def setup_cache(self):
        with sqlite3.connect(self.cache_db) as conn:
            conn.execute('''
                CREATE TABLE IF NOT EXISTS weather_cache (
                    location_date TEXT PRIMARY KEY,
                    weather_data TEXT,
                    timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            conn.execute('CREATE INDEX IF NOT EXISTS idx_location_date ON weather_cache(location_date)')

    async def get_weather_data(self, lat, lon, date_str):
        cache_key = f"{lat}_{lon}_{date_str}"
        
        # check cache first
        cached_weather = self._get_cached_weather(cache_key)
        if cached_weather:
            return cached_weather

        date = datetime.strptime(date_str, '%Y-%m-%d')
        timestamp = int(date.timestamp())
        
        url = "https://history.openweathermap.org/data/2.5/history/city"
        params = {
            'lat': lat,
            'lon': lon,
            'type': 'hour',
            'start': timestamp,
            'end': timestamp + 86400,
            'appid': self.api_key,
            'units': 'metric'
        }
        
        try:
            async with self.session.get(url, params=params) as response:
                if response.status == 200:
                    data = await response.json()
                    if data.get('list'):
                        weather_data = self._process_weather_data(data)
                        if weather_data:
                            self._cache_weather(cache_key, weather_data)
                        return weather_data
                return None
        except Exception as e:
            print(f"Error fetching weather data: {str(e)}")
            return None

    async def process_batch(self, batch_df):
        if self.session is None:
            self.session = aiohttp.ClientSession()

        weather_columns = ['temperature', 'humidity', 'wind_speed', 
                         'clouds', 'precipitation', 'pressure',
                         'weather_description', 'data_type']
        
        semaphore = asyncio.Semaphore(self.max_concurrent)
        
        async def process_row(row):
            async with semaphore:
                date_str = row['date'].strftime('%Y-%m-%d')
                weather_data = await self.get_weather_data(row['latitude'], row['longitude'], date_str)
                await asyncio.sleep(0.2)  # Rate limiting
                return row.name, weather_data

        tasks = [process_row(row) for _, row in batch_df.iterrows()]
        results = await asyncio.gather(*tasks)
        
        for idx, weather_data in results:
            if weather_data:
                for col in weather_columns:
                    batch_df.loc[idx, col] = weather_data.get(col)
        
        return batch_df

    async def process_all_data(self, df, batch_size):
        processed_dfs = []
        
        with tqdm(total=len(df), desc="Processing", unit="records") as pbar:
            for start_idx in range(0, len(df), batch_size):
                end_idx = min(start_idx + batch_size, len(df))
                batch = df.iloc[start_idx:end_idx].copy()
                
                processed_batch = await self.process_batch(batch)
                processed_dfs.append(processed_batch)
                
                pbar.update(len(batch))
                
                # save interim results incase any unexpected things happeen
                if len(processed_dfs) % 10 == 0:
                    interim_df = pd.concat(processed_dfs)
                    interim_df.to_csv(f'weather_data_interim_{len(processed_dfs)}.csv', index=False)
        
        if self.session:
            await self.session.close()
            self.session = None
            
        return pd.concat(processed_dfs)

    def process_by_date_range(self, input_file, start_date='2024-02-02', end_date='2025-02-01', batch_size=50):
        print(f"\nProcessing date range: {start_date} to {end_date}")
        
        # read and prepare data
        df = pd.read_csv(input_file)
        df['date'] = pd.to_datetime(df['date'])
        
        # filter date range
        mask = (df['date'] >= start_date) & (df['date'] <= end_date)
        df = df[mask].copy()
        
        total_records = len(df)
        print(f"\nTotal records to process: {total_records}")
        
        estimated_hours = (total_records * 0.2) / 3600
        print(f"Estimated processing time: {estimated_hours:.1f} hours\n")

        # process the data using asyncio
        loop = asyncio.get_event_loop()
        result_df = loop.run_until_complete(self.process_all_data(df, batch_size))
        
        return result_df

    def _process_weather_data(self, api_response):
        try:
            if not api_response.get('list'):
                return None
                
            daily_data = api_response['list'][0]
            weather_info = daily_data.get('weather', [{}])[0]
            
            return {
                'temperature': daily_data.get('main', {}).get('temp'),
                'feels_like': daily_data.get('main', {}).get('feels_like'),
                'humidity': daily_data.get('main', {}).get('humidity'),
                'pressure': daily_data.get('main', {}).get('pressure'),
                'wind_speed': daily_data.get('wind', {}).get('speed'),
                'clouds': daily_data.get('clouds', {}).get('all'),
                'precipitation': daily_data.get('rain', {}).get('1h', 0),
                'weather_description': weather_info.get('description'),
                'data_type': 'historical'
            }
        except Exception as e:
            print(f"Error processing weather data: {str(e)}")
            return None

    def _get_cached_weather(self, cache_key):
        with sqlite3.connect(self.cache_db) as conn:
            result = conn.execute(
                "SELECT weather_data FROM weather_cache WHERE location_date = ?",
                (cache_key,)
            ).fetchone()
            return json.loads(result[0]) if result else None

    def _cache_weather(self, cache_key, weather_data):
        with sqlite3.connect(self.cache_db) as conn:
            conn.execute(
                "INSERT OR REPLACE INTO weather_cache (location_date, weather_data) VALUES (?, ?)",
                (cache_key, json.dumps(weather_data))
            )

In [7]:
# initialize the processor
processor = WeatherDataProcessor()

# process the data
result_df = processor.process_by_date_range(
    input_file='2024_weather_data_with_coords_complete.csv',
    start_date='2024-02-05',
    end_date='2025-02-01',
    batch_size=50  # Optimized batch size
)

# save the final results
result_df.to_csv('2024_weather_data_final.csv', index=False)


Processing date range: 2024-02-05 to 2025-02-01

Total records to process: 28314
Estimated processing time: 1.6 hours



Processing: 100%|████████████████████| 28314/28314 [44:18<00:00, 10.65records/s]
