In [1]:
%load_ext autoreload
%autoreload 2

# Kill dask clusters

In [15]:
from distributed.client import _global_clients

In [16]:
for k, v in _global_clients.items():
    print(k, v)

In [13]:
# _global_clients[0].cluster.close()

# Explore processed results

In [40]:
import pandas as pd
import dask.dataframe as dd
from pathlib import Path
import plotly.express as px

In [29]:
files = Path("../data/ecmwf/processed/oper/").glob("*/*.parquet")
ddf = dd.read_parquet(list(files))

In [39]:
df_tmp = ddf.groupby(["run_time", "timestamp", "country_name"]).agg({"t2m": "mean"}).compute()
df_tmp.reset_index(inplace=True)

In [45]:
df_plot = df_tmp.query("country_name in ['Norway', 'France']")
fig = px.line(df_plot, x="timestamp", y="t2m", color="run_time", facet_row="country_name")
fig.show()

# Using service

In [None]:
import datetime as dt
from weather_weaver.services.service import WeatherConsumerService

from weather_weaver.outputs.localfs.client import LocalClient
from weather_weaver.inputs.ecmwf import constants as ecmwf_constants
from weather_weaver.inputs.ecmwf.open_data.fetcher import ECMWFOpenDataFetcher
from weather_weaver.inputs.ecmwf.open_data.request import ECMWFOpenDataRequestBuilder
from weather_weaver.inputs.ecmwf.open_data.processor import EMCWFOpenDataProcessor

In [None]:
fetcher = ECMWFOpenDataFetcher()
request_builder = ECMWFOpenDataRequestBuilder()
processor = EMCWFOpenDataProcessor()
storer = LocalClient()

In [None]:
service = WeatherConsumerService(
    request_builder=request_builder,
    raw_dir=ecmwf_constants.RAW_DIR,
    processed_dir=ecmwf_constants.PROCESSED_DIR,
    fetcher=fetcher,
    processor=processor,
    storer=storer,
)

In [None]:
start = dt.date(2024, 1, 4)
date_offset = 1

In [None]:
all_requests = service._build_default_requests(start=start, date_offset=date_offset)

# check the ones already processed
all_new_requests = [
    t for t in all_requests if not service.storer.exists(path=service.processed_dir / t.file_name)
]

pipeline = (
    service._build_dask_pipeline(all_new_requests) if len(all_new_requests) > 0 else None
)

In [None]:
pipeline.visualize()

In [None]:
processed_files = service.download_datasets(start=start, date_offset=1)

In [None]:
from pathlib import Path

In [None]:
path = Path("/Users/badrbenmbarek/Documents/work/git/weather-weaver/data/ecmwf/raw/oper/20240104_12z_0-90_fc.grib2")

In [None]:
import datetime as dt

import dask.dataframe as dd
import pandas as pd
import plotly.express as px
import xarray as xr
import structlog

from weather_weaver.constants import load_world_countries
from weather_weaver.inputs.ecmwf.constants import ENTSO_E_ISO3_LIST, PROCESSED_DIR
from weather_weaver.outputs.localfs.client import LocalClient as LocalfsClient

from weather_weaver.inputs.ecmwf.open_data import (
    ECMWFOpenDataFetcher,
    ECMWFOpenDataRequest,
    EMCWFOpenDataProcessor,
    GeoFilterModel,
    StreamType,
    RunTime,
    RequestType,
)

pd.set_option("display.max_columns", 500)

logger = structlog.getLogger()

In [None]:
run_date = dt.date(2024, 1, 1)
run_time = RunTime.H00
stream =  StreamType.OPER
request_type = RequestType.FORECAST

data_request = ECMWFOpenDataRequest(
    run_date=run_date,
    run_time=run_time,
    stream=stream,
    request_type=request_type,
)

In [None]:
storage_client = LocalfsClient()
fetcher = ECMWFOpenDataFetcher()

In [None]:
output_path = PROCESSED_DIR / f"{data_request.file_name}.parquet"
if storage_client.exists(path=output_path):
    logger.info(
        event="NWP data already downloaded, skipping!",
        request=data_request,
    )
    pass

In [None]:
out_path = fetcher.download_raw_files(data_request)

In [None]:
processor = EMCWFOpenDataProcessor()

In [None]:
ddf = processor.transform(raw_path=out_path, request=data_request, filter_model=geo_filter)

In [None]:
storage_client.store(ddf=ddf, destination_path=output_path)

In [None]:
# read back saved parquet file

In [None]:
test_df = dd.read_parquet(output_path)

In [None]:
test_df

In [None]:
tt = test_df.groupby(["timestamp", "country_name"]).agg(
    {"u10": "mean", "v10": "mean", "t2m": "mean", "tp": "mean"}
)

In [None]:
ttt = tt.compute()

In [None]:
fig = px.line(ttt.reset_index(), x="value_datetime", y="value", color="NAME", facet_col="variable")
fig.show()

In [None]:
path = "/Users/badrbenmbarek/Downloads/ECMWF_HRES_06Z_20190101.nc"

In [None]:
dds = xr.open_dataset(path)
dds.

In [None]:
area: str = "uk"
hours: int = 48
param_group: str = "basic"

start = dt.date(2023, 12, 26)
end = dt.date(2023, 12, 27)

In [None]:
match param_group:
    case "basic":
        parameters = ["167.128/169.128"]  # 2 Metre Temperature, Dswrf
    case _:
        parameters = list(PARAMETER_ECMWFCODE_MAP.keys())

In [None]:
all_init_times: list[dt.datetime] = [
    pdt.to_pydatetime()
    for pdt in pd.date_range(
        start=start,
        end=end + dt.timedelta(days=1),
        inclusive="left",
        freq="H",
        tz=dt.UTC,
    ).tolist()
    if pdt.hour in [0, 6, 12, 18]
]

In [None]:
for init_time in all_init_times:
    mars_request = buildMarsRequest(parameters=parameters, hours=hours, area=area, it=init_time, list_only=True, target="test.txt")
    break

In [None]:
print(mars_request)

In [None]:
server.execute(mars_request, target="test.txt")