-
-
Notifications
You must be signed in to change notification settings - Fork 52
/
fileindex.py
136 lines (113 loc) · 5.43 KB
/
fileindex.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# -*- coding: utf-8 -*-
# Copyright (C) 2018-2021, earthobservations developers.
# Distributed under the MIT License. See LICENSE for more info.
from typing import List, Optional
import pandas as pd
from pytz import timezone
from wetterdienst.metadata.extension import Extension
from wetterdienst.metadata.period import Period
from wetterdienst.metadata.resolution import Resolution
from wetterdienst.provider.dwd.index import _create_file_index_for_dwd_server
from wetterdienst.provider.dwd.metadata.column_names import DwdColumns
from wetterdienst.provider.dwd.metadata.constants import DWDCDCBase
from wetterdienst.provider.dwd.metadata.datetime import DatetimeFormat
from wetterdienst.provider.dwd.observation.metadata.dataset import (
DWD_URBAN_DATASETS,
DwdObservationDataset,
)
from wetterdienst.provider.dwd.observation.metadata.resolution import HIGH_RESOLUTIONS
STATION_ID_REGEX = r"(?<!\d)\d{5}(?!\d)"
DATE_RANGE_REGEX = r"(?<!\d)\d{8}_\d{8}(?!\d)"
def create_file_list_for_climate_observations(
station_id: str,
dataset: DwdObservationDataset,
resolution: Resolution,
period: Period,
date_range: Optional[str] = None,
) -> List[str]:
"""
Function for selecting datafiles (links to archives) for given
station_ids, parameter, time_resolution and period_type under consideration of a
created list of files that are
available online.
Args:
station_id: station id for the weather station to ask for data
dataset: observation measure
resolution: frequency/granularity of measurement interval
period: recent or historical files
date_range:
Returns:
List of path's to file
"""
file_index = create_file_index_for_climate_observations(dataset, resolution, period)
file_index = file_index[file_index[DwdColumns.STATION_ID.value] == station_id]
if date_range:
file_index = file_index[file_index[DwdColumns.DATE_RANGE.value] == date_range]
return file_index[DwdColumns.FILENAME.value].values.tolist()
def create_file_index_for_climate_observations(
dataset: DwdObservationDataset,
resolution: Resolution,
period: Period,
) -> pd.DataFrame:
"""
Function (cached) to create a file index of the DWD station data. The file index
is created for an individual set of parameters.
Args:
dataset: parameter of Parameter enumeration
resolution: time resolution of TimeResolution enumeration
period: period type of PeriodType enumeration
Returns:
file index in a pandas.DataFrame with sets of parameters and station id
"""
timezone_germany = timezone("Europe/Berlin")
if dataset in DWD_URBAN_DATASETS:
file_index = _create_file_index_for_dwd_server(
dataset, resolution, period, DWDCDCBase.CLIMATE_URBAN_OBSERVATIONS
)
else:
file_index = _create_file_index_for_dwd_server(dataset, resolution, period, DWDCDCBase.CLIMATE_OBSERVATIONS)
file_index = file_index.loc[file_index[DwdColumns.FILENAME.value].str.endswith(Extension.ZIP.value), :]
file_index.loc[:, DwdColumns.STATION_ID.value] = (
file_index[DwdColumns.FILENAME.value].str.findall(STATION_ID_REGEX).str[0]
)
file_index = file_index.dropna().reset_index(drop=True)
file_index.loc[:, DwdColumns.STATION_ID.value] = file_index[DwdColumns.STATION_ID.value].astype(str)
if resolution in HIGH_RESOLUTIONS and period == Period.HISTORICAL:
# Date range string for additional filtering of historical files
file_index.loc[:, DwdColumns.DATE_RANGE.value] = (
file_index[DwdColumns.FILENAME.value].str.findall(DATE_RANGE_REGEX).str[0]
)
file_index.loc[:, [DwdColumns.FROM_DATE.value, DwdColumns.TO_DATE.value]] = (
file_index[DwdColumns.DATE_RANGE.value].str.split("_", expand=True).values
)
file_index.loc[:, DwdColumns.FROM_DATE.value] = pd.to_datetime(
file_index[DwdColumns.FROM_DATE.value],
format=DatetimeFormat.YMD.value,
)
file_index.loc[:, DwdColumns.FROM_DATE.value] = file_index.loc[:, DwdColumns.FROM_DATE.value].dt.tz_localize(
timezone_germany
)
file_index.loc[:, DwdColumns.TO_DATE.value] = pd.to_datetime(
file_index[DwdColumns.TO_DATE.value],
format=DatetimeFormat.YMD.value,
) + pd.Timedelta(days=1)
file_index.loc[:, DwdColumns.TO_DATE.value] = file_index.loc[:, DwdColumns.TO_DATE.value].dt.tz_localize(
timezone_germany
)
# Temporary fix for filenames with wrong ordered/faulty dates
# Fill those cases with minimum/maximum date to ensure that they are loaded as
# we don't know what exact date range the included data has
wrong_date_order_index = file_index[DwdColumns.FROM_DATE.value] > file_index[DwdColumns.TO_DATE.value]
file_index.loc[wrong_date_order_index, DwdColumns.FROM_DATE.value] = file_index[
DwdColumns.FROM_DATE.value
].min()
file_index.loc[wrong_date_order_index, DwdColumns.TO_DATE.value] = file_index[DwdColumns.TO_DATE.value].max()
file_index.loc[:, DwdColumns.INTERVAL.value] = file_index.apply(
lambda x: pd.Interval(
left=x[DwdColumns.FROM_DATE.value],
right=x[DwdColumns.TO_DATE.value],
closed="both",
),
axis=1,
)
return file_index.sort_values(by=[DwdColumns.STATION_ID.value, DwdColumns.FILENAME.value])