In [59]:
from meteostat import Stations, Monthly
import pandas as pd
from datetime import datetime
import matplotlib.pyplot as plt
from mpl_toolkits.basemap import Basemap
import numpy as np
from sqlalchemy import create_engine, text
from dotenv import dotenv_values

print_distribution = False
push_stations = False

In [60]:
# Retrieve all Meteostat weather stations
stations = Stations()

# Fetch the list of all stations
# Convert the result to a DataFrame
df_stations = stations.fetch()
df_stations = df_stations.reset_index()

In [61]:
print(f'data contains {df_stations.shape[0]} stations')
print(f'oft {df_stations.shape[1]} parameters each')
print('----------------')
display(df_stations.head(5))

data contains 15791 stations
oft 16 parameters each
----------------


Unnamed: 0,id,name,country,region,wmo,icao,latitude,longitude,elevation,timezone,hourly_start,hourly_end,daily_start,daily_end,monthly_start,monthly_end
0,00FAY,Holden Agdm,CA,AB,71227.0,CXHD,53.19,-112.25,688.0,America/Edmonton,2020-01-01,2024-10-28,2002-11-01,2024-03-13,2003-01-01,2022-01-01
1,00TG6,Athabasca 1,CA,AB,,,54.72,-113.29,515.0,America/Edmonton,NaT,NaT,2000-01-01,2022-07-12,2000-01-01,2010-01-01
2,01001,Jan Mayen,NO,,1001.0,ENJA,70.9333,-8.6667,10.0,Europe/Oslo,1931-01-01,2024-08-20,1921-12-31,2024-10-21,1922-01-01,2022-01-01
3,01002,Grahuken,NO,SJ,1002.0,,79.7833,14.4667,0.0,Europe/Oslo,1986-11-09,2024-08-20,2010-10-07,2020-08-17,NaT,NaT
4,01003,Hornsund,NO,,1003.0,,77.0,15.5,10.0,Europe/Oslo,1985-06-01,2024-08-20,2009-11-26,2020-08-31,2016-01-01,2017-01-01


In [62]:
# Create the Basemap figure
plt.figure(figsize=(14, 8))
m = Basemap(projection='mill', llcrnrlat=-60, urcrnrlat=85, llcrnrlon=-180, urcrnrlon=180, resolution='c')

# Draw map details
m.drawcoastlines()
m.drawcountries()
m.drawparallels(range(-90, 91, 30), labels=[1, 0, 0, 0])
m.drawmeridians(range(-180, 181, 60), labels=[0, 0, 0, 1])

# Extract latitude and longitude from the DataFrame
lats = df_stations['latitude'].values
lons = df_stations['longitude'].values

# Convert latitude and longitude to map projection coordinates
x, y = m(lons, lats)

# Plot the stations as red dots
m.scatter(x, y, s=1, color='darkred', alpha=1, marker='.', label='Weather Stations')

# Add a legend and title
plt.legend(loc='lower left')
plt.title('Global Distribution of Meteostat Weather Stations')

# Show the plot
if print_distribution:
    plt.show();
else: 
    plt.close();

In [63]:
from datetime import datetime

def adjust_dates_for_full_years(start, end):
    # Check for NaN values and handle them
    if pd.isna(start) or pd.isna(end):
        return None, None

    # Convert to datetime if not already
    start = pd.to_datetime(start)
    end = pd.to_datetime(end)

    # Adjust start to the next January 1st if not already on that date
    if start.month != 1 or start.day != 1:
        start = datetime(start.year + 1, 1, 1)

    # Adjust end to the previous January 1st if not already on that date
    if end.month != 12 or end.day != 31:
        end = datetime(end.year-1, 12, 31)

    # Ensure start is before or the same as end
    if start > end:
        return None, None

    return start, end


In [64]:
# Fetch all the bulk data for all the stations and calculate the yearly averages
fetch_data = False
custom_start = pd.to_datetime("2020-01-01")
custom_end = pd.to_datetime("2020-12-31")
df_meta = pd.DataFrame()

# Loop through individual stations
for i, row in df_stations.iterrows():
    station_id = row["id"]
    name = row["name"]
    country = row["country"]

    print(f"{station_id} | {name} | {country}:")

    # Get station parameters
    start_i = row["monthly_start"]
    end_i = row["monthly_end"]
    
    # If there is a custom start date check if the startion start is before that 
    if custom_start != [] and start_i <= custom_start:
        start_i= custom_start
        print(f'START earlier than CUSTOM START: setting to custom value')
    if custom_start != [] and start_i > custom_start:
        print(f'START after CUSTOM START: using earliest possible recording')
        
    # If there is a custom end date check if the startion end is after that
    if custom_end != [] and end_i >= custom_end:
        end_i = custom_end
        print(f'END later than CUSTOM END: setting to custom value')
    if custom_end != [] and end_i < custom_end:
        print(f'END before CUSTOM END: using latest possible recording')       


    # Adjust the dates to only take full years into account
    start_i_adj, end_i_adj = adjust_dates_for_full_years(start_i, end_i)       
  
  
   # Log adjustments, if any
    if start_i != start_i:
        print(f"Recording START was adjusted from {start_i} to {start_i_adj}")
    else:
        print(f"Recording START {start_i_adj}")
        
      # Log adjustments, if any
    if end_i != end_i_adj:
        print(f"Recording END was adjusted from {end_i} to {end_i_adj}")
    else:
        print(f"Recording END {end_i_adj}")

    # Check if the adjusted dates are valid (not None)
    if start_i_adj is None or end_i_adj is None:
        print(f"Error1: Invalid date range for station {station_id}. Skipping...")
        print('------')
        continue
    
    # Check if the adjusted dates are valid (logical range)
    if start_i_adj > end_i_adj:
        print(f"Error02: Invalid date range for station {station_id}. Skipping...")
        print('------')
        continue

    
    # Calculate the number of extracted months
    extracted_years = (1 + end_i_adj.year - start_i_adj.year)
    print(f"Extracting {extracted_years} years")

    # Fetch data if the flag is set to True
    if fetch_data:
        # Fetch data for station 'station_id' from 'start_i_adj' to 'end_i_adj'
        data_i = Monthly(
            station_id,
            start=pd.to_datetime(start_i_adj, format='%Y-%m-%d'),
            end=pd.to_datetime(end_i_adj, format='%Y-%m-%d')
        )

    print('------')

    # Create a DataFrame for the station metadata
    df_i = pd.DataFrame({
        "id": [station_id],
        "name": [name],
        "country": [country],
        "start": [start_i_adj],
        "end": [end_i_adj],
        "n_years": [extracted_years]
    })

    # Concatenate the station metadata to the main DataFrame
    df_meta = pd.concat([df_meta, df_i], ignore_index=True)
    
    #delete loop variables:
    del [station_id, name, country, start_i, start_i_adj, end_i, end_i_adj, extracted_years, row]


00FAY | Holden Agdm | CA:
START earlier than CUSTOM START: setting to custom value
END later than CUSTOM END: setting to custom value
Recording START 2020-01-01 00:00:00
Recording END 2020-12-31 00:00:00
Extracting 1 years
------
00TG6 | Athabasca 1 | CA:
START earlier than CUSTOM START: setting to custom value
END before CUSTOM END: using latest possible recording
Recording START None
Recording END was adjusted from 2010-01-01 00:00:00 to None
Error1: Invalid date range for station 00TG6. Skipping...
------
01001 | Jan Mayen | NO:
START earlier than CUSTOM START: setting to custom value
END later than CUSTOM END: setting to custom value
Recording START 2020-01-01 00:00:00
Recording END 2020-12-31 00:00:00
Extracting 1 years
------
01002 | Grahuken | NO:
Recording START was adjusted from NaT to None
Recording END was adjusted from NaT to None
Error1: Invalid date range for station 01002. Skipping...
------
01003 | Hornsund | NO:
START earlier than CUSTOM START: setting to custom value


In [65]:
#print out results
print(f'total amound of stations = {df_meta.shape[0]}')
print(f'total amount of years = {sum(df_meta["n_years"])}')
print(f'min starting {min(df_meta["start"])}')
print(f'max starting {max(df_meta["start"])}')
display(df_meta.head(5))

#ask for user input whether the data shall be extracted and aggregated
def continue_or_exit():
    while True:
        user_input = input("Do you want to extract the data now? (y/n): ").strip().lower()
        if user_input == 'y':
            print("Continuing the script...")
            return True
        elif user_input == 'n':
            print("Exiting the script...")
            return False
        else:
            print("Invalid input. Please enter 'y' for yes or 'n' for no.")

# Usage in script
if continue_or_exit():
    # Proceed with the script
    data_extraction = True
else:
    data_extraction = False

total amound of stations = 9448
total amount of years = 9448
min starting 2020-01-01 00:00:00
max starting 2020-01-01 00:00:00


Unnamed: 0,id,name,country,start,end,n_years
0,00FAY,Holden Agdm,CA,2020-01-01,2020-12-31,1
1,01001,Jan Mayen,NO,2020-01-01,2020-12-31,1
2,01007,New Alesund,NO,2020-01-01,2020-12-31,1
3,01008,Svalbard Lufthavn,NO,2020-01-01,2020-12-31,1
4,01015,Hekkingen Lighthouse,NO,2020-01-01,2020-12-31,1


Continuing the script...


In [66]:
#define conditional aggreagations to only take years into account that lack one months but not more
def conditional_aggregate_sum(x):
    if x.dropna().count() >= 11:  # Count only non-NaN values
        return x.sum()  # Or use another aggregation, like mean, etc.
    else:
        return float('nan')  # Return NaN if less than 11 non-NaN records

def conditional_aggregate_mean(x):
    if x.dropna().count() >= 11:  # Count only non-NaN values
        return x.mean() 
    else:
        return float('nan')  # Return NaN if less than 11 non-NaN records
    
def conditional_aggregate_min(x):
    if x.dropna().count() >= 11:  # Count only non-NaN values
        return x.min() 
    else:
        return float('nan')  # Return NaN if less than 11 non-NaN records
    
def conditional_aggregate_max(x):
    if x.dropna().count() >= 11:  # Count only non-NaN values
        return x.max() 
    else:
        return float('nan')  # Return NaN if less than 11 non-NaN records

#define conditional aggreagations to only take years into account that lack one months but not more
def conditional_aggregate_sd(x):
    if x.dropna().count() >= 11:  # Count only non-NaN values
        return np.std(x.dropna())  
    else:
        return float('nan')  # Return NaN if less than 11 non-NaN records



In [None]:
if data_extraction:
    df_agg = pd.DataFrame()
    total_rows = len(df_meta)
    bar_length = 30
    #iterrate through dataframe
    for i, row in df_meta.iterrows():
        station_id = row["id"]
        name = row["name"]
        country = row["country"]
        
        # Calculate and print progress percentage
        percent_complete = (i + 1) / total_rows
        filled_length = int(bar_length * percent_complete)
        bar = '█' * filled_length + '-' * (bar_length - filled_length)

        # Display the progress bar and the current station info
        print(f"Progress: |{bar}| {percent_complete * 100:.2f}% \t\t Extracting data for: {station_id} | {name} | {country} |                           ", end="\r")
        
        #fetch data for respective station with the previously defined start and end date
        df_i = Monthly(row["id"], #station 
                    start = row["start"], #start
                    end= row["end"] #end
                    )
        df_i = df_i.fetch().reset_index()
        #aggregate the data for each year
        df_yearly = df_i.groupby(df_i['time'].dt.year).agg({
        'tavg': conditional_aggregate_mean, 
        'tmin': conditional_aggregate_min,
        'tmax': conditional_aggregate_max,
        'prcp': conditional_aggregate_sum,
        'wspd': conditional_aggregate_mean,
        'pres': conditional_aggregate_mean,
        'tsun': conditional_aggregate_sum
        })
        df_yearly = df_yearly.reset_index()
        df_yearly["id"] = station_id
        # Concatenate the station metadata to the main DataFrame
        df_agg = pd.concat([df_agg, df_yearly], ignore_index=True)
    

Progress: |------------------------------| 1.48% 		 Extracting data for: 03026 | Stornoway | GB |                                  |                           

In [28]:
#print out results
print(f'total amound of stations = {df_agg.shape[0]}')
display(df_meta.head(5))

#ask for user input whether the data shall be extracted and aggregated
def continue_or_exit():
    while True:
        user_input = input("Do you want to push the data to sql? (y/n): ").strip().lower()
        if user_input == 'y':
            print("Continuing the script...")
            return True
        elif user_input == 'n':
            print("Exiting the script...")
            return False
        else:
            print("Invalid input. Please enter 'y' for yes or 'n' for no.")

# Usage in script
if continue_or_exit():
    # Proceed with the script
    data_2_sql = True
else:
    data_2_sql = False

#df_meta

total amound of stations = 9056


Unnamed: 0,id,name,country,start,end,n_years
0,00FAY,Holden Agdm,CA,2020-01-01,2020-12-31,1
1,01001,Jan Mayen,NO,2020-01-01,2020-12-31,1
2,01007,New Alesund,NO,2020-01-01,2020-12-31,1
3,01008,Svalbard Lufthavn,NO,2020-01-01,2020-12-31,1
4,01015,Hekkingen Lighthouse,NO,2020-01-01,2020-12-31,1


Continuing the script...


In [47]:
#Setup SQL connection

# Let's load values from the .env file
config = dotenv_values()

# define variables for the login
pg_user = config['POSTGRES_USER']  # align the key label with your .env file !
pg_host = config['POSTGRES_HOST']
pg_port = config['POSTGRES_PORT']
pg_db = config['POSTGRES_DB']
pg_pass = config['POSTGRES_PASS']
pg_schema = config['POSTGRES_SCHEMA']

# Now building the URL with the values from the .env file

url = f'postgresql://{pg_user}:{pg_pass}@{pg_host}:{pg_port}/{pg_db}'

engine = create_engine(url, echo=False)

with engine.begin() as conn: 
    result = conn.execute(text(f'SET search_path TO {pg_schema};'))  #possible to cascade with ,public; then it will search in my_schema first and then in public

In [48]:
if push_stations:
    df_stations.to_sql("meteostat_stations", engine, if_exists='replace', index=True)

791

In [49]:
df_agg.to_sql('meteostat_2020', engine, if_exists='replace', index=True)



56