In [2]:
import cdsapi
import cfgrib
import pandas as pd
import os
import numpy as np
import xarray as xr
from functools import reduce
import datetime as dt
import multiprocessing
from datetime import datetime, timedelta
from sqlalchemy import create_engine, inspect
import csv  
from scipy.stats import zscore
import matplotlib.pyplot as plt
import seaborn as sns

In [3]:
client = cdsapi.Client()
grib_file = f"prueba_era5.grib"
dataset = "reanalysis-era5-single-levels"
request = {
    "product_type": ["reanalysis"],
    "variable": [
        "2m_dewpoint_temperature",
        "2m_temperature",
        "sea_surface_temperature",
        "total_column_water_vapour",
        "skin_temperature",
        "mean_total_precipitation_rate",
        "total_cloud_cover",
        "runoff",
        "surface_runoff"
    ],
    "year": ["1940"],
    "month": ["01"],
    "day": ["01"],
    "time": [
        "00:00", "01:00", "02:00",
        "03:00", "04:00", "05:00",
        "06:00", "07:00", "08:00",
        "09:00", "10:00", "11:00",
        "12:00", "13:00", "14:00",
        "15:00", "16:00", "17:00",
        "18:00", "19:00", "20:00",
        "21:00", "22:00", "23:00"
    ],
    "data_format": "grib",
    "download_format": "unarchived",
    "area": [90, -180, -90, 180],
    'grid:': [1.0, 1.0],
}
# Download GRIB file
print(f"Downloading {grib_file}...")
client.retrieve(dataset, request).download(grib_file)
# Open GRIB file
dataset = cfgrib.open_datasets(grib_file)
    # Initialize a list to store DataFrames

2025-05-29 15:11:49,853 INFO [2024-09-26T00:00:00] Watch our [Forum](https://forum.ecmwf.int/) for Announcements, news and other discussed topics.


Downloading prueba_era5.grib...


2025-05-29 15:11:50,585 INFO Request ID is ebecea7e-8c6a-40ed-8985-f0888e81073b
2025-05-29 15:11:50,673 INFO status has been updated to accepted
2025-05-29 15:11:59,178 INFO status has been updated to running
2025-05-29 15:12:04,368 INFO status has been updated to successful
  vars, attrs, coord_names = xr.conventions.decode_cf_variables(
  vars, attrs, coord_names = xr.conventions.decode_cf_variables(
  vars, attrs, coord_names = xr.conventions.decode_cf_variables(
  vars, attrs, coord_names = xr.conventions.decode_cf_variables(
  vars, attrs, coord_names = xr.conventions.decode_cf_variables(
  vars, attrs, coord_names = xr.conventions.decode_cf_variables(
  vars, attrs, coord_names = xr.conventions.decode_cf_variables(
  vars, attrs, coord_names = xr.conventions.decode_cf_variables(
  vars, attrs, coord_names = xr.conventions.decode_cf_variables(


In [4]:

dfs = []

for ds in dataset:
    # Try to drop 'step' and other multi-dim coords if needed
    if "step" in ds.dims:
        ds = ds.mean(dim="step")  # Or choose step=0 or another value

    if "valid_time" in ds.coords:
        ds = ds.drop_vars("valid_time")

    if "number" in ds.coords:
        ds = ds.drop_vars("number")

    try:
        df = ds.to_dataframe().reset_index()
        dfs.append(df)
    except Exception as e:
        print(f"Failed converting a dataset: {e}")

# Merge all DataFrames on ['time', 'latitude', 'longitude']
if dfs:
    merged_df = reduce(lambda left, right: pd.merge(
        left, right,
        on=["time", "latitude", "longitude"],
        how="outer",
        suffixes=('', '_dup')  # Prevent merge error on duplicate names
    ), dfs)

    # Drop duplicated columns if created
    merged_df = merged_df.loc[:, ~merged_df.columns.str.endswith('_dup')]

else:
    merged_df = pd.DataFrame()
    print("No dataframes were created from the datasets.")


In [5]:
dfm = merged_df.copy()
dfm['coordinates'] = dfm['latitude'].astype(str) + ',' + dfm['longitude'].astype(str)
dfm['time'] = pd.to_datetime(dfm['time'], format='%Y-%m-%dT%H:%M:%S.%fZ')
dfm['day'] = dfm['time'].dt.day
dfm['month'] = dfm['time'].dt.month
dfm['year'] = dfm['time'].dt.year
dfm['hour'] = dfm['time'].dt.hour
dfm['day of the year'] = dfm['time'].dt.dayofyear
new_order = ['time', 'day of the year', 'day', 'month', 'year', 'hour', 'latitude', 'longitude', 'coordinates'] + [col for col in dfm.columns if col not in ['time', 'day', 'month', 'year', 'hour', 'latitude', 'longitude', 'coordinates']]
dfm = dfm[new_order]
#drop date if it comes from different day
dfm.sort_values(by='time')
if dfm['day'].iloc[0] != dfm['day'].iloc[-1]:
    dfm.dropna(subset=['t2m'], inplace=True)


In [6]:
dfm.head()

Unnamed: 0,time,day of the year,day,month,year,hour,latitude,longitude,coordinates,surface,...,ro,avg_tprate,step,sst,tcwv,tcc,t2m,d2m,skt,day of the year.1
0,1940-01-01,1,1,1,1940,0,-90.0,-180.0,"-90.0,-180.0",,...,,,0 days,,0.756644,0.0,241.756897,237.667786,240.96019,1
1,1940-01-01,1,1,1,1940,0,-90.0,-179.0,"-90.0,-179.0",,...,,,0 days,,0.756644,0.0,241.756897,237.667786,240.96019,1
2,1940-01-01,1,1,1,1940,0,-90.0,-178.0,"-90.0,-178.0",,...,,,0 days,,0.756644,0.0,241.756897,237.667786,240.96019,1
3,1940-01-01,1,1,1,1940,0,-90.0,-177.0,"-90.0,-177.0",,...,,,0 days,,0.756644,0.0,241.756897,237.667786,240.96019,1
4,1940-01-01,1,1,1,1940,0,-90.0,-176.0,"-90.0,-176.0",,...,,,0 days,,0.756644,0.0,241.756897,237.667786,240.96019,1


In [7]:
dfm['sst'].describe()

count    1.033176e+06
mean     2.864469e+02
std      1.155671e+01
min      2.714600e+02
25%      2.734619e+02
50%      2.874014e+02
75%      2.981240e+02
max      3.048613e+02
Name: sst, dtype: float64

In [24]:
# Define log path
log_path = r"C:\Users\dmoli\Documents\Coding\Weathercast_project\date_log_global.csv"
dfm['time'] = pd.to_datetime(dfm['time'])
dfm = dfm.sort_values(['coordinates', 'time'])

# Start tracking broken columns
broken_columns = []

# Start log list
log_entries = []

# Check missing data
missing_data = dfm.isnull()

for column in dfm.columns:
    missing_count = missing_data[column].sum()
    if isinstance(missing_count, pd.Series):
        missing_count = missing_count.sum()
    missing_count = int(missing_count)

    if missing_count > 0:
        timestamp = dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

        # Columns that can tolerate up to 3 full missing days
        relaxed_cols = [
            'tcwv','t2m', 'd2m', 'skt', 'tcc' 
                                    ]

        if column not in relaxed_cols:
            if missing_count > 1433520:
                log_entries.append([timestamp, column, missing_count, 'too many missing values'])
                broken_columns.append(column)
        else:
            ratio = round(missing_count / 65160, 2)
            if ratio > 3:
                log_entries.append([timestamp, column, missing_count, f'missing > 3 days ({ratio})'])
                broken_columns.append(column)

# Save log if needed
if log_entries:
    log_df = pd.DataFrame(log_entries, columns=["timestamp", "variable", "missing_count", "note"])
    if os.path.exists(log_path):
        log_df.to_csv(log_path, mode='a', header=False, index=False)
    else:
        log_df.to_csv(log_path, index=False)

# ------- Now: Handle 2-3 hours missing data for specific columns ------- #

# Columns that need special NaN handling
target_columns = [
            'tcwv','t2m', 'd2m', 'skt', 'tcc'
                                    ]



# Group by coordinate
for coord, group in dfm.groupby('coordinates'):
    for col in target_columns:
        # Select time and the target variable
        sub_df = group[['time', col]].set_index('time')

        # Find missing values
        nan_mask = sub_df[col].isna()
        if nan_mask.sum() > 0:
            nan_times = sub_df[nan_mask].index

            for t in nan_times:
                # Define +/- 2 hours window
                prev_2h = t - pd.Timedelta(hours=2)
                next_2h = t + pd.Timedelta(hours=2)

                window = sub_df.loc[prev_2h:next_2h][col]

                # If 5 expected timestamps and max 2 missing, proceed
                if len(window) == 5 and window.isna().sum() <= 2:
                    # Check if missing hours are consecutive
                    missing_consec = window.isna().astype(int).diff().abs().sum() == 2

                    if missing_consec:
                        # Interpolate over window
                        dfm.loc[(dfm['coordinates'] == coord) & (dfm['time'].between(prev_2h, next_2h)), col] = \
                            dfm.loc[(dfm['coordinates'] == coord) & (dfm['time'].between(prev_2h, next_2h)), col].interpolate(method='linear', limit_direction='both')
                    else:
                        # Take average of 2 hours before and 2 hours after
                        try:
                            before = sub_df.loc[[prev_2h, t - pd.Timedelta(hours=1)]][col].dropna()
                            after = sub_df.loc[[t + pd.Timedelta(hours=1), next_2h]][col].dropna()
                            if len(before) == 2 and len(after) == 2:
                                avg_val = pd.concat([before, after]).mean()
                                dfm.loc[(dfm['coordinates'] == coord) & (dfm['time'] == t), col] = avg_val
                        except Exception as e:
                            print(f"Error filling value for {coord} at {t}:", e)
                            continue
low_freq_columns = [
    'sro','ro', 'sst', 'avg_tprate'
]

# Sort first to ensure time continuity
dfm = dfm.sort_values(['coordinates', 'time'])

# Interpolate low-frequency columns within each coordinate group
for col in low_freq_columns:
    dfm[col] = dfm.groupby('coordinates')[col].transform(
        lambda group: group.interpolate(method='linear', limit_direction='both'))


In [None]:
    
    # process information to be stored in the database
    

    dfm['t2m'] = (dfm['t2m'] - 273.15).round(2)
    dfm['d2m'] = (dfm['d2m'] - 273.15).round(2)
    dfm['skt'] = (dfm['skt'] - 273.15).round(2)
    
    #identify outliers and store in DB
    # Initialize list to collect outliers
    outliers_list = []

    # Exclude metadata columns
    meta_cols = ['time', 'day of the year', 'day', 'month', 'year', 'hour', 'coordinates']
    data_cols = [col for col in dfm.columns if col not in meta_cols]

    # Group by coordinate
    for coord, group in dfm.groupby('coordinates'):
        for col in data_cols:
            if group[col].isnull().all():
                continue  # skip all-NaN columns

            try:
                # Compute z-scores
                col_data = group[col].dropna()
                if col_data.std() < 1e-8:
                    continue  # skip nearly constant series``
                z_scores = zscore(col_data)
                outlier_mask = np.abs(z_scores) > 3

                # Get times and values
                outlier_values = group.loc[group[col].dropna().index[outlier_mask], ['time', col]]
                for _, row in outlier_values.iterrows():
                    outliers_list.append({
                        'coordinates': coord,
                        'date': row['time'].date(),
                        'time': row['time'].time(),
                        'value': row[col],
                        'variable': col
                    })
            except Exception as e:
                print(f"Error computing outliers for {coord}, {col}: {e}")
                continue

    # Convert to DataFrame
    outliers_df = pd.DataFrame(outliers_list)

    # Save to DB
    
    # MySQL Connection
    user = 'root'
    password = 'Hamilton1186!'
    host = '127.0.0.1'
    port = '3306'
    db = 'weatherdb'
    table_name = 'weather_summary_global_outliers'

    # Create SQLAlchemy engine
    engine = create_engine(f"mysql+pymysql://{user}:{password}@{host}:{port}/{db}")

    # Check if table exists
    inspector = inspect(engine)
    if not inspector.has_table(table_name):
        # Create table by writing empty df with same schema
        outliers_df.head(0).to_sql(name=table_name, con=engine, if_exists='replace', index=False)
        print(f"Table '{table_name}' created.")
    else:
        print(f"Table '{table_name}' already exists. Skipping creation.")

    # Append data
    outliers_df.to_sql(name=table_name, con=engine, if_exists='append', index=False)
    print(f"Data inserted into '{table_name}'.")

    # Dispose of the engine to close the connection
    engine.dispose()

In [20]:
weather = [ 't2m', 'd2m', 'sst', 'skt', 'tcc', 'sro', 'ro', 'avg_tprate', 'tcwv']

common_stats = ['mean', 'min', 'max', 'std', 'median', lambda x: x.max() - x.min()]

summary_data = {}
for weather_var in weather:
    for stat in common_stats:
        weather_cast = dfm[weather_var].agg(stat)
        key = f"{weather_var}_{stat.__name__}" if callable(stat) else f"{weather_var}_{stat}"
        summary_data[key] = weather_cast
            

summary_df = pd.DataFrame([summary_data])
summary_df['Date'] = dfm['time'].iloc[0]
summary_df['Date'] = pd.to_datetime(summary_df['Date'])
summary_df['day_of_year'] = summary_df['Date'].dt.dayofyear
summary_df['coord_min_temp'] = dfm.loc[dfm['t2m'].idxmin(), 'coordinates']
summary_df['coord_max_temp'] = dfm.loc[dfm['t2m'].idxmax(), 'coordinates']
summary_df['coord_min_time'] = dfm.loc[dfm['t2m'].idxmin(), 'hour']
summary_df['coord_max_time'] = dfm.loc[dfm['t2m'].idxmax(), 'hour']
summary_df['ocean_min_temp'] = dfm.loc[dfm['sst'].idxmin(), 'coordinates']
summary_df['ocean_max_temp'] = dfm.loc[dfm['sst'].idxmax(), 'coordinates']
summary_df['ocean_min_time'] = dfm.loc[dfm['sst'].idxmin(), 'hour']
summary_df['ocean_max_time'] = dfm.loc[dfm['sst'].idxmax(), 'hour']
new_order = ['Date', 'day_of_year'] + [col for col in summary_df.columns if col not in ['Date', 'day_of_year']]
summary_df = summary_df[new_order]

print(summary_df)


        Date  day_of_year  tcwv_mean  tcwv_min   tcwv_max   tcwv_std  \
0 1940-01-01            1  15.919818  0.148484  66.949211  15.349381   

   tcwv_median  tcwv_<lambda>  t2m_mean  t2m_min  ...  avg_tprate_median  \
0    10.187007      66.800728  2.791903   -49.16  ...           0.000002   

   avg_tprate_<lambda>  coord_min_temp  coord_max_temp  coord_min_time  \
0             0.004132       73.0,97.0     -21.0,119.0              21   

   coord_max_time  ocean_min_temp  ocean_max_temp  ocean_min_time  \
0               6     -78.0,-44.0     -15.0,136.0               0   

   ocean_max_time  
0              22  

[1 rows x 64 columns]


In [22]:
# Save to DB
# MySQL Connection
user = 'root'
password = 'Hamilton1186!'
host = '127.0.0.1'
port = '3306'
db = 'weatherdb'
table_name = 'weather_summary_global'

# Create SQLAlchemy engine
engine = create_engine(f"mysql+pymysql://{user}:{password}@{host}:{port}/{db}")

# Check if table exists
inspector = inspect(engine)
if not inspector.has_table(table_name):
    # Create table by writing empty df with same schema
    summary_df.head(0).to_sql(name=table_name, con=engine, if_exists='replace', index=False)
    print(f"Table '{table_name}' created.")
else:
    print(f"Table '{table_name}' already exists. Skipping creation.")

# Append data
summary_df.to_sql(name=table_name, con=engine, if_exists='append', index=False)
print(f"Data inserted into '{table_name}'.")

# Dispose of the engine to close the connection
engine.dispose()
# Delete GRIB file if it exists
if os.path.exists(grib_file):
    try:
        os.remove(grib_file)
        print(f"Deleted {grib_file}")
    except Exception as e:
        print(f"Error deleting {grib_file}: {e}")

print("DataFrame successfully stored in the MySQL database!")


Table 'weather_summary_global' already exists. Skipping creation.
Data inserted into 'weather_summary_global'.
DataFrame successfully stored in the MySQL database!


In [23]:
# Save to DB
# MySQL Connection
user = 'root'
password = 'Hamilton1186!'
host = '127.0.0.1'
port = '3306'
db = 'weatherdb'
table_name = 'weather_summary_global'

# Create SQLAlchemy engine
engine = create_engine(f"mysql+pymysql://{user}:{password}@{host}:{port}/{db}")

# Check if table exists
inspector = inspect(engine)
if not inspector.has_table(table_name):
    # Create table by writing empty df with same schema
    summary_df.head(0).to_sql(name=table_name, con=engine, if_exists='replace', index=False)
    print(f"Table '{table_name}' created.")
else:
    print(f"Table '{table_name}' already exists. Skipping creation.")

# Append data
summary_df.to_sql(name=table_name, con=engine, if_exists='append', index=False)
print(f"Data inserted into '{table_name}'.")

# Dispose of the engine to close the connection
engine.dispose()
# Delete GRIB file if it exists
if os.path.exists(grib_file):
    try:
        os.remove(grib_file)
        print(f"Deleted {grib_file}")
    except Exception as e:
        print(f"Error deleting {grib_file}: {e}")

print("DataFrame successfully stored in the MySQL database!")


Table 'weather_summary_global' already exists. Skipping creation.
Data inserted into 'weather_summary_global'.
DataFrame successfully stored in the MySQL database!
