In [1]:
import os
import sys
import openmeteo_requests
import requests_cache
import pandas as pd
import dask.dataframe as dd
from retry_requests import retry
from datetime import datetime, timedelta
import logging
import json
# from tqdm.notebook import trange, tqdm
import pyarrow

## Define Functions

In [2]:
# TODO: Add error handling
def fetch_weather(latitudes, longitudes, start_date, end_date):
    """
    Get weather data for (latitudes, longitudes) from start_date to end_date

    Args:
        latitudes (list): List of latitudes [1st_lat, 2nd_lat, ...]
        longitudes (list): List of longitudes corresponding to the latitudes [1st_long, 2nd_long, ...]
        start_date (str): String of starting date in the ISO 8601 format (i.e. YYYY-MM-DD)
        end_date (str): String of starting date in the same format as start_date

    Returns:
        int: NOTSURE YET
    """

    url = "https://archive-api.open-meteo.com/v1/archive"
    # Setup the Open-Meteo API client with cache and retry on error
    cache_session = requests_cache.CachedSession('.cache', expire_after = 3600)
    retry_session = retry(cache_session, retries = 5, backoff_factor = 0.2)
    openmeteo = openmeteo_requests.Client(session = retry_session)

    latitudes = list(latitudes)
    longitudes = list(longitudes)
    
    params = {
	"latitude": latitudes,
	"longitude": longitudes,
	"start_date": start_date,
	"end_date": end_date,
	"hourly": ["temperature_2m", "rain", "relative_humidity_2m"],
	"timezone": "auto"   
    }

    responses = openmeteo.weather_api(url, params=params)
    return responses

In [3]:
# Function for processing the responses
def process_response(response):
    # Hourly
    hourly = response.Hourly()
    hourly_index = pd.date_range(
        start = pd.to_datetime(hourly.Time(), unit = "s", utc = True),
        end = pd.to_datetime(hourly.TimeEnd(), unit = "s", utc = True),
        freq = pd.Timedelta(seconds = hourly.Interval()),
        inclusive = "left"
        )
    
    hourly_temperature_2m = hourly.Variables(0).ValuesAsNumpy()
    rain = hourly.Variables(1).ValuesAsNumpy()
    relative_humidity_2m = hourly.Variables(2).ValuesAsNumpy()

    # Turn response into df
    hourly_data = {
        "Datetime" : hourly_index,
        "Temperature" : hourly_temperature_2m,
        "Rain" : rain,
        "RelativeHumidity" : relative_humidity_2m
    }

    hourly_df= pd.DataFrame(hourly_data)

    # Aggregate for daily data
    daily_df = hourly_df.groupby(pd.Grouper(key='Datetime', freq='D')).agg({
        'Temperature': ['mean', 'max', 'min'],
        'Rain': ['sum'],
        'RelativeHumidity' :['mean', 'max', 'min']
    })

    daily_df.columns = [x + y.capitalize() for x,y in daily_df.columns.values]
    daily_df = daily_df.reset_index()
    
    return daily_df

In [4]:
def read_or_create_json(file_path, default_data):
    # Check if the file exists
    if os.path.exists(file_path):
        # Read the existing JSON file
        with open(file_path, 'r') as file:
            data = json.load(file)
    else:
        # Create the JSON file with default data
        with open(file_path, 'w') as file:
            json.dump(default_data, file, indent=4)
        data = default_data
    
    return data

def update_json(file_path, update_data):
    # Read or create the JSON file
    data = read_or_create_json(file_path, {})

    # Update the data with the new values
    data.update(update_data)
    
    # Write the updated data back to the file
    with open(file_path, 'w') as file:
        json.dump(data, file, indent=4)

In [5]:
def get_logger(logname):
    # Create a log file in case we reach max api calls or random failure
    logname = "APIcalls.log"

    # If log file doesn't exist, create new one
    if not os.path.exists(logname):
        logger = logging.getLogger()
        fhandler = logging.FileHandler(filename=logname, encoding="utf-8")
        formatter = logging.Formatter('%(asctime)s [%(levelname)s] > %(message)s')
        fhandler.setFormatter(formatter)
        logger.addHandler(fhandler)
        logger.setLevel(logging.DEBUG)
        print("Created log " + logname)

    # Else use the existing file
    else:
        # Configure the logger
        logging.basicConfig(
            filename= logname,    # Use the existing log file
            filemode='a',          # Append mode, to add to the existing log file. 'w' for overwrite
            format='%(asctime)s [%(levelname)s] - %(name)s:%(filename)s > %(message)s', # Format using %() for backward compatibility
            level=logging.DEBUG     # Set the logging level as needed. DEBUG means log everything above DEBUG
        )

        # Create a logger object
        logger = logging.getLogger()
        print("Loaded log " + logname)

    return logger

In [6]:
# insert(0, "ClusterID", )

In [7]:
def collect_data(df):
    pass

## Collect Data

In [8]:
df_cluster = pd.read_csv('../Datasets/Clusters.csv')

In [9]:
# Store last iteration
log_path  = 'data.json'
default_log = {'iteration': 0}
log = read_or_create_json(log_path, default_log)

#update_json(file_path, update_data)

In [36]:
responses = []

In [37]:
start = 0
end = len(df_cluster)
step = 100
start_date = '2000-01-01'
end_date = '2024-07-31'
logger = get_logger("APIcalls.log")

for i in range(start, end, step):
    try:
        latitudes = df_cluster.Latitude[i:i+step]
        longitudes = df_cluster.Longitude[i:i+step]
        responses += fetch_weather(latitudes, longitudes, start_date, end_date)
        logger.debug(f"Responses fetched for rows {i}-{i+step}")
    
    except Exception as err:
        s = f"{err} Encountered at iteration {i}"
        print(s)
        logger.error(s)
        log['iteration'] = i
        # Load from a file
        update_json(log_path, log)
        break

Loaded log APIcalls.log
{'reason': 'Hourly API request limit exceeded. Please try again in the next hour.', 'error': True} Encountered at iteration 100


## Process Response and Add to Dataset

In [39]:
len(responses)

100

In [40]:
clusterID = df_cluster.ClusterID

In [41]:
# Create the file
# parquet_path = '../Datasets/WeatherData.parquet'
csv_path = "../Datasets/WeatherData.csv"
if not os.path.exists(csv_path):
    df_out = process_response(responses[0])
    df_out.insert(0, 'ClusterID', clusterID[0])
    df_out.to_csv(csv_path, index=False)


In [43]:
start = 1
end = 100
clusterID = df_cluster.ClusterID

for i in range(start, end):
    res = responses[i]
    df_out = process_response(res)
    df_out.insert(0, 'ClusterID', clusterID[i])
    # dd.from_pandas(df_out, npartitions=1).to_parquet(parquet_path, append=True,ignore_divisions = True)
    df_out.to_csv(csv_path, mode='a', index=False, header=False)
    print(i, end=" ")
print("YIPPPEEE")

YIPPPEEE


In [44]:
df_weather = pd.read_csv(csv_path)

In [46]:
df_weather.ClusterID.nunique()

100