# OpenMeteo DuckDB Cache
Claude 3.7 Sonnet

In [3]:
import requests
import pandas as pd
import numpy as np
import time
import os
import hashlib
from datetime import datetime, timedelta
import duckdb

class OpenMeteoDuckDBCache:
    """
    A class to efficiently query the OpenMeteo API by avoiding redundant calls
    for locations within the same grid cell/pixel, using DuckDB for storage.
    """
    
    def __init__(self, db_path='openmeteo_cache.duckdb', grid_resolution=0.1):
        """
        Initialize the OpenMeteo cache system with DuckDB storage.
        
        Parameters:
        -----------
        db_path : str
            Path to the DuckDB database file
        grid_resolution : float
            The resolution of the OpenMeteo grid in degrees (default is 0.1°)
        """
        self.db_path = db_path
        self.grid_resolution = grid_resolution
        self.memory_cache = {}
        
        # Initialize the DuckDB connection
        self.conn = duckdb.connect(db_path)
        
        # Create necessary tables if they don't exist
        self._initialize_database()
    
    def _initialize_database(self):
        """
        Set up the database schema for the cache.
        """
        # Create cache metadata table to store query information
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS cache_metadata (
                cache_key VARCHAR PRIMARY KEY,
                grid_lat DOUBLE,
                grid_lon DOUBLE,
                start_date DATE,
                end_date DATE,
                variables VARCHAR,
                created_at TIMESTAMP,
                last_accessed TIMESTAMP
            )
        """)
        
        # Create cache data table to store actual weather data
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS cache_data (
                cache_key VARCHAR,
                timestamp TIMESTAMP,
                variable VARCHAR,
                value DOUBLE,
                PRIMARY KEY (cache_key, timestamp, variable),
                FOREIGN KEY (cache_key) REFERENCES cache_metadata(cache_key)
            )
        """)
        
        # Create index for faster retrieval
        self.conn.execute("""
            CREATE INDEX IF NOT EXISTS idx_grid_location 
            ON cache_metadata(grid_lat, grid_lon, start_date, end_date)
        """)
    
    def _get_grid_cell(self, lat, lon):
        """
        Convert exact coordinates to grid cell coordinates by rounding to 
        the nearest grid point based on the resolution.
        """
        grid_lat = round(lat / self.grid_resolution) * self.grid_resolution
        grid_lon = round(lon / self.grid_resolution) * self.grid_resolution
        return (grid_lat, grid_lon)
    
    def _create_cache_key(self, grid_cell, start_date, end_date, variables):
        """
        Create a unique cache key based on location and query parameters.
        """
        # Sort variables to ensure consistent key regardless of order
        if isinstance(variables, list):
            variables = sorted(variables)
        
        # Create a string that represents all query parameters
        params_str = f"{grid_cell}_{start_date}_{end_date}_{','.join(variables)}"
        
        # Create a hash for the parameters
        return hashlib.md5(params_str.encode()).hexdigest()
    
    def _check_cache(self, grid_lat, grid_lon, start_date, end_date, variables):
        """
        Check if data for the given parameters exists in cache.
        """
        variables_str = ','.join(sorted(variables)) if isinstance(variables, list) else variables
        
        result = self.conn.execute("""
            SELECT cache_key 
            FROM cache_metadata 
            WHERE grid_lat = ? AND grid_lon = ? 
                AND start_date <= ? AND end_date >= ?
                AND variables = ?
        """, [grid_lat, grid_lon, start_date, end_date, variables_str]).fetchone()
        
        return result[0] if result else None
    
    def _store_in_cache(self, cache_key, grid_lat, grid_lon, start_date, end_date, variables, data):
        """
        Store API response in the DuckDB cache.
        """
        variables_str = ','.join(sorted(variables)) if isinstance(variables, list) else variables
        now = datetime.now()
        
        # Store metadata
        self.conn.execute("""
            INSERT INTO cache_metadata (
                cache_key, grid_lat, grid_lon, start_date, end_date, 
                variables, created_at, last_accessed
            )
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        """, [cache_key, grid_lat, grid_lon, start_date, end_date, 
              variables_str, now, now])
        
        # Process and store hourly data
        if 'hourly' in data:
            hourly_times = data['hourly']['time']
            
            # Prepare batch insert data
            insert_data = []
            
            for var in variables:
                if var in data['hourly']:
                    hourly_values = data['hourly'][var]
                    
                    for i, time_str in enumerate(hourly_times):
                        timestamp = datetime.fromisoformat(time_str.replace('Z', '+00:00'))
                        insert_data.append((cache_key, timestamp, var, hourly_values[i]))
            
            # Batch insert all data
            if insert_data:
                self.conn.executemany("""
                    INSERT INTO cache_data (cache_key, timestamp, variable, value)
                    VALUES (?, ?, ?, ?)
                """, insert_data)
        
        # Store in memory cache too for faster access
        self.memory_cache[cache_key] = data
    
    def _retrieve_from_cache(self, cache_key, variables):
        """
        Retrieve data from the DuckDB cache and format it like the OpenMeteo API response.
        """
        # Update last accessed timestamp
        self.conn.execute("""
            UPDATE cache_metadata 
            SET last_accessed = ? 
            WHERE cache_key = ?
        """, [datetime.now(), cache_key])
        
        # Check memory cache first (faster)
        if cache_key in self.memory_cache:
            return self.memory_cache[cache_key]
        
        # Query the database for all data with this cache key
        result = self.conn.execute("""
            SELECT timestamp, variable, value
            FROM cache_data
            WHERE cache_key = ?
            ORDER BY timestamp, variable
        """, [cache_key]).fetchdf()
        
        if result.empty:
            return None
        
        # Reconstruct the API response format
        response = {"hourly": {"time": []}}
        
        # Group by timestamp to organize data
        grouped = result.groupby('timestamp')
        
        # Format timestamps and prepare data structure
        for timestamp, group in grouped:
            time_str = timestamp.isoformat().replace('+00:00', 'Z')
            
            # Add timestamp to the list if it's not already there
            if time_str not in response["hourly"]["time"]:
                response["hourly"]["time"].append(time_str)
            
            # For each variable in this timestamp group, add to response
            for _, row in group.iterrows():
                var = row['variable']
                val = row['value']
                
                if var not in response["hourly"]:
                    response["hourly"][var] = []
                
                # Ensure we're adding in the correct position
                time_index = response["hourly"]["time"].index(time_str)
                
                # Expand list if needed
                while len(response["hourly"][var]) <= time_index:
                    response["hourly"][var].append(None)
                
                response["hourly"][var][time_index] = val
        
        # Store in memory cache for faster future access
        self.memory_cache[cache_key] = response
        
        return response
    
    def get_weather_data(self, lat, lon, start_date, end_date, variables=None, force_refresh=False):
        """
        Get weather data for a specific location and time range.
        Checks cache first to avoid redundant API calls.
        
        Parameters:
        -----------
        lat : float
            Latitude of the location
        lon : float
            Longitude of the location
        start_date : str
            Start date in YYYY-MM-DD format
        end_date : str
            End date in YYYY-MM-DD format
        variables : list
            List of weather variables to retrieve (default: temperature and precipitation)
        force_refresh : bool
            If True, force a fresh API call even if data is in cache
            
        Returns:
        --------
        dict
            Weather data from OpenMeteo API or cache
        """
        if variables is None:
            variables = ["temperature_2m", "precipitation"]
        
        # Determine which grid cell this location falls into
        grid_cell = self._get_grid_cell(lat, lon)
        grid_lat, grid_lon = grid_cell
        
        # Create cache key
        cache_key = self._create_cache_key(grid_cell, start_date, end_date, variables)
        
        if not force_refresh:
            # Check if data exists in cache
            existing_key = self._check_cache(grid_lat, grid_lon, start_date, end_date, variables)
            
            if existing_key:
                print(f"Using cache for {grid_cell} from {start_date} to {end_date}")
                return self._retrieve_from_cache(existing_key, variables)
        
        # If not in cache or force refresh, call the API
        print(f"Calling OpenMeteo API for {grid_cell} from {start_date} to {end_date}")
        
        # Add a small delay to avoid rate limiting
        time.sleep(0.1)
        
        # Construct the API URL
        base_url = "https://archive-api.open-meteo.com/v1/archive"
        params = {
            "latitude": grid_lat,
            "longitude": grid_lon,
            "start_date": start_date,
            "end_date": end_date,
            "hourly": variables,
            "timezone": "auto"
        }
        
        # Make the request
        response = requests.get(base_url, params=params)
        
        if response.status_code == 200:
            data = response.json()
            
            # Save to cache
            self._store_in_cache(
                cache_key, grid_lat, grid_lon, 
                start_date, end_date, variables, data
            )
            
            return data
        else:
            print(f"API request failed with status code {response.status_code}")
            print(f"Response: {response.text}")
            return None
    
    def process_mobility_dataset(self, df, date_column='timestamp', lat_column='latitude', 
                                lon_column='longitude', variables=None, batch_size=1000):
        """
        Process a mobility dataset, adding weather data to each point efficiently.
        
        Parameters:
        -----------
        df : pandas.DataFrame
            Mobility dataset with timestamp and coordinates
        date_column : str
            Name of the column containing timestamps
        lat_column : str
            Name of the column containing latitude
        lon_column : str
            Name of the column containing longitude
        variables : list
            Weather variables to retrieve
        batch_size : int
            Number of records to process in each batch (for large datasets)
            
        Returns:
        --------
        pandas.DataFrame
            Original dataframe with added weather data columns
        """
        if variables is None:
            variables = ["temperature_2m", "precipitation"]
        
        # Create a copy to avoid modifying the original
        df_result = df.copy()
        
        # Ensure timestamp is datetime
        df_result[date_column] = pd.to_datetime(df_result[date_column])
        
        # Add grid cell columns for grouping
        df_result['grid_lat'] = df_result[lat_column].apply(
            lambda x: round(x / self.grid_resolution) * self.grid_resolution
        )
        df_result['grid_lon'] = df_result[lon_column].apply(
            lambda x: round(x / self.grid_resolution) * self.grid_resolution
        )
        
        # Extract date strings needed for API
        df_result['date'] = df_result[date_column].dt.date.astype(str)
        df_result['hour'] = df_result[date_column].dt.hour
        
        # Initialize weather data columns
        for var in variables:
            df_result[var] = np.nan
        
        # Process by unique grid cell and date combinations
        unique_cells = df_result[['grid_lat', 'grid_lon', 'date']].drop_duplicates()
        
        print(f"Processing {len(unique_cells)} unique grid cell/date combinations")
        
        # Process in batches for large datasets
        total_batches = (len(unique_cells) + batch_size - 1) // batch_size
        
        for batch_num in range(total_batches):
            start_idx = batch_num * batch_size
            end_idx = min((batch_num + 1) * batch_size, len(unique_cells))
            
            batch = unique_cells.iloc[start_idx:end_idx]
            print(f"Processing batch {batch_num+1}/{total_batches}")
            
            for idx, row in batch.iterrows():
                grid_lat = row['grid_lat']
                grid_lon = row['grid_lon']
                date = row['date']
                
                # Each API call covers one day
                weather_data = self.get_weather_data(
                    grid_lat, grid_lon, date, date, variables
                )
                
                if weather_data and 'hourly' in weather_data:
                    # Create a mapping of hour to weather values for this day
                    hour_to_values = {}
                    
                    for hour_idx, time_str in enumerate(weather_data['hourly']['time']):
                        hour = int(time_str.split('T')[1].split(':')[0])
                        hour_values = {}
                        
                        for var in variables:
                            if var in weather_data['hourly'] and hour_idx < len(weather_data['hourly'][var]):
                                hour_values[var] = weather_data['hourly'][var][hour_idx]
                        
                        hour_to_values[hour] = hour_values
                    
                    # Update dataframe for this grid cell and date
                    mask = (
                        (df_result['grid_lat'] == grid_lat) & 
                        (df_result['grid_lon'] == grid_lon) & 
                        (df_result['date'] == date)
                    )
                    
                    # Apply values based on hour
                    for hour, values in hour_to_values.items():
                        hour_mask = mask & (df_result['hour'] == hour)
                        
                        for var, value in values.items():
                            df_result.loc[hour_mask, var] = value
        
        # Clean up temporary columns
        df_result.drop(['grid_lat', 'grid_lon', 'date', 'hour'], axis=1, inplace=True)
        
        return df_result
    
    def optimize_database(self):
        """
        Optimize the database by removing duplicates and vacuum-analyzing.
        """
        # Find duplicate data and keep only the most recent
        self.conn.execute("""
            DELETE FROM cache_data 
            WHERE rowid NOT IN (
                SELECT MAX(rowid) 
                FROM cache_data 
                GROUP BY cache_key, timestamp, variable
            )
        """)
        
        # Find orphaned data (without metadata) and remove it
        self.conn.execute("""
            DELETE FROM cache_data 
            WHERE cache_key NOT IN (
                SELECT cache_key FROM cache_metadata
            )
        """)
        
        # Vacuum and analyze to reclaim space and optimize
        self.conn.execute("VACUUM")
        self.conn.execute("ANALYZE")
    
    def clear_cache(self, older_than_days=None):
        """
        Clear the cache, optionally only removing entries older than specified days.
        
        Parameters:
        -----------
        older_than_days : int or None
            If specified, only clear cache entries older than this many days
        """
        if older_than_days is not None:
            cutoff_date = datetime.now() - timedelta(days=older_than_days)
            
            # Get keys to remove from memory cache
            keys_to_remove = self.conn.execute("""
                SELECT cache_key 
                FROM cache_metadata 
                WHERE created_at < ?
            """, [cutoff_date]).fetchall()
            
            # Delete old data
            self.conn.execute("""
                DELETE FROM cache_data 
                WHERE cache_key IN (
                    SELECT cache_key 
                    FROM cache_metadata 
                    WHERE created_at < ?
                )
            """, [cutoff_date])
            
            self.conn.execute("""
                DELETE FROM cache_metadata 
                WHERE created_at < ?
            """, [cutoff_date])
            
            # Remove from memory cache
            for key in keys_to_remove:
                if key[0] in self.memory_cache:
                    del self.memory_cache[key[0]]
        else:
            # Clear all data
            self.conn.execute("DELETE FROM cache_data")
            self.conn.execute("DELETE FROM cache_metadata")
            self.memory_cache = {}
        
        # Vacuum the database to reclaim space
        self.conn.execute("VACUUM")
    
    def get_cache_stats(self):
        """
        Get statistics about the cache.
        
        Returns:
        --------
        dict
            Statistics about the cache
        """
        # Total entries
        total_entries = self.conn.execute("""
            SELECT COUNT(*) FROM cache_metadata
        """).fetchone()[0]
        
        # Data points
        data_points = self.conn.execute("""
            SELECT COUNT(*) FROM cache_data
        """).fetchone()[0]
        
        # Unique grid cells
        unique_grid_cells = self.conn.execute("""
            SELECT COUNT(DISTINCT (grid_lat || ',' || grid_lon)) 
            FROM cache_metadata
        """).fetchone()[0]
        
        # Date range
        date_range = self.conn.execute("""
            SELECT MIN(start_date), MAX(end_date) 
            FROM cache_metadata
        """).fetchone()
        
        # Most queried locations
        popular_locations = self.conn.execute("""
            SELECT grid_lat, grid_lon, COUNT(*) AS query_count
            FROM cache_metadata
            GROUP BY grid_lat, grid_lon
            ORDER BY query_count DESC
            LIMIT 5
        """).fetchdf()
        
        return {
            "total_entries": total_entries,
            "data_points": data_points,
            "unique_grid_cells": unique_grid_cells,
            "date_range": date_range,
            "popular_locations": popular_locations
        }
    
    def close(self):
        """
        Close the database connection properly.
        """
        if self.conn:
            self.optimize_database()
            self.conn.close()

# Example usage
# if __name__ == "__main__":
#     # Create a sample mobility dataset
#     data = {
#         'user_id': range(1, 101),
#         'timestamp': pd.date_range(start='2023-01-01', periods=100, freq='H'),
#         'latitude': np.random.uniform(40.0, 41.0, 100),  # Sample coordinates for Madrid
#         'longitude': np.random.uniform(-3.8, -3.6, 100),
#         'activity': np.random.choice(['walking', 'cycling', 'driving'], 100)
#     }
    
#     mobility_df = pd.DataFrame(data)
    
#     # Initialize the cache with DuckDB
#     cache = OpenMeteoDuckDBCache(db_path='openmeteo_cache.duckdb', grid_resolution=0.1)
    
#     try:
#         # Process the mobility dataset to add weather data
#         print("\nProcessing mobility dataset...")
#         result_df = cache.process_mobility_dataset(
#             mobility_df,
#             variables=["temperature_2m", "precipitation", "windspeed_10m"]
#         )
        
#         # Show the result
#         print("\nSample of processed data:")
#         print(result_df.head())
        
#         # Get cache statistics
#         stats = cache.get_cache_stats()
#         print("\nCache statistics:")
#         print(f"Total cache entries: {stats['total_entries']}")
#         print(f"Total data points stored: {stats['data_points']}")
#         print(f"Unique grid cells: {stats['unique_grid_cells']}")
#         print(f"Date range: {stats['date_range']}")
#         print("\nMost queried locations:")
#         print(stats['popular_locations'])
        
#         # Show efficiency statistics
#         unique_points = len(mobility_df)
#         unique_cells = stats['unique_grid_cells']
        
#         print("\nEfficiency statistics:")
#         print(f"Total data points: {unique_points}")
#         print(f"API calls made: {unique_cells}")
#         print(f"API calls saved: {unique_points - unique_cells}")
#         if unique_points > 0:
#             print(f"Efficiency: {(1 - unique_cells/unique_points) * 100:.1f}% reduction in API calls")
        
#     finally:
#         # Always close the database connection properly
#         cache.close()