# Read DWD CDC Precipitation Time Series

The main idea behind this activity is to see if there is a correlation between precipitation and sales of the stores.

In [3]:
import datetime as dt
import pandas as pd
import os
import geopandas as gpd

## FTP Connection

### Connection Parameters

In [4]:
server = "opendata.dwd.de"
user   = "anonymous"
passwd = ""

### FTP Directory Definition and Station Description Filename Pattern

In [5]:
# The topic of interest.
topic_dir = "hourly/precipitation/historical/"

# This is the search pattern common to ALL station description file names 
station_desc_pattern = "_Beschreibung_Stationen.txt"

# Below this directory tree node all climate data are stored.
ftp_climate_data_dir = "/climate_environment/CDC/observations_germany/climate/"
ftp_dir =  ftp_climate_data_dir + topic_dir

### Local Directories

In [6]:
local_ftp_dir         = "data/original/DWD/"      # Local directory to store local ftp data copies, the local data source or input data. 
local_ftp_station_dir = local_ftp_dir + topic_dir # Local directory where local station info is located
local_ftp_ts_dir      = local_ftp_dir + topic_dir # Local directory where time series downloaded from ftp are located

local_generated_dir   = "data/generated/DWD/" # The generated of derived data in contrast to local_ftp_dir
local_station_dir     = local_generated_dir + topic_dir # Derived station data, i.e. the CSV file
local_ts_merged_dir   = local_generated_dir + topic_dir # Parallel merged time series, wide data frame with one TS per column
local_ts_appended_dir = local_generated_dir + topic_dir # Serially appended time series, long data frame for QGIS TimeManager Plugin

In [7]:
print(local_ftp_dir)
print(local_ftp_station_dir)
print(local_ftp_ts_dir)
print()
print(local_generated_dir)
print(local_station_dir)
print(local_ts_merged_dir)
print(local_ts_appended_dir)

data/original/DWD/
data/original/DWD/hourly/precipitation/historical/
data/original/DWD/hourly/precipitation/historical/

data/generated/DWD/
data/generated/DWD/hourly/precipitation/historical/
data/generated/DWD/hourly/precipitation/historical/
data/generated/DWD/hourly/precipitation/historical/


In [8]:
import os
os.makedirs(local_ftp_dir,exist_ok = True) # it does not complain if the dir already exists.
os.makedirs(local_ftp_station_dir,exist_ok = True)
os.makedirs(local_ftp_ts_dir,exist_ok = True)

os.makedirs(local_generated_dir,exist_ok = True)
os.makedirs(local_station_dir,exist_ok = True)
os.makedirs(local_ts_merged_dir,exist_ok = True)
os.makedirs(local_ts_appended_dir,exist_ok = True)

### FTP Connect

In [9]:
import ftplib
ftp = ftplib.FTP(server)
res = ftp.login(user=user, passwd = passwd)
print(res)

230 Login successful.


In [10]:
ret = ftp.cwd(".")

In [11]:
#ftp.quit()

### FTP Grab File Function

In [12]:
def grabFile(ftpfullname,localfullname):
    try:
        ret = ftp.cwd(".") # A dummy action to chack the connection and to provoke an exception if necessary.
        localfile = open(localfullname, 'wb')
        ftp.retrbinary('RETR ' + ftpfullname, localfile.write, 1024)
        localfile.close()
    
    except ftplib.error_perm:
        print("FTP ERROR. Operation not permitted. File not found?")

    except ftplib.error_temp:
        print("FTP ERROR. Timeout.")

    except ConnectionAbortedError:
        print("FTP ERROR. Connection aborted.")



### Generate Pandas Dataframe from FTP Directory Listing

In [13]:
import pandas as pd
import os

def gen_df_from_ftp_dir_listing(ftp, ftpdir):
    lines = []
    flist = []
    try:    
        res = ftp.retrlines("LIST "+ftpdir, lines.append)
    except:
        print("Error: ftp.retrlines() failed. ftp timeout? Reconnect!")
        return
        
    if len(lines) == 0:
        print("Error: ftp dir is empty")
        return
    
    for line in lines:
#        print(line)
        [ftype, fsize, fname] = [line[0:1], int(line[31:42]), line[56:]]
#        itemlist = [line[0:1], int(line[31:42]), line[56:]]
#        flist.append(itemlist)
        
        fext = os.path.splitext(fname)[-1]
        
        if fext == ".zip":
            station_id = int(fname.split("_")[2])
        else:
            station_id = -1 
        
        flist.append([station_id, fname, fext, fsize, ftype])
        
        

    df_ftpdir = pd.DataFrame(flist,columns=["station_id", "name", "ext", "size", "type"])
    return(df_ftpdir)

In [14]:
df_ftpdir = gen_df_from_ftp_dir_listing(ftp, ftp_dir)

In [15]:
df_ftpdir.tail(10)

Unnamed: 0,station_id,name,ext,size,type
1031,15444,stundenwerte_RR_15444_20140901_20201231_hist.zip,.zip,171026,-
1032,15478,stundenwerte_RR_15478_20150201_20201231_hist.zip,.zip,151417,-
1033,15490,stundenwerte_RR_15490_20161201_20201231_hist.zip,.zip,106256,-
1034,15512,stundenwerte_RR_15512_20160901_20201231_hist.zip,.zip,110995,-
1035,15514,stundenwerte_RR_15514_20171101_20201231_hist.zip,.zip,82992,-
1036,15555,stundenwerte_RR_15555_20160501_20201231_hist.zip,.zip,120197,-
1037,15810,stundenwerte_RR_15810_20180601_20201231_hist.zip,.zip,69532,-
1038,19140,stundenwerte_RR_19140_20201101_20201231_hist.zip,.zip,9708,-
1039,19171,stundenwerte_RR_19171_20200901_20201231_hist.zip,.zip,12514,-
1040,19172,stundenwerte_RR_19172_20200901_20201231_hist.zip,.zip,14631,-


### Dataframe with Temp Zip Files

In [16]:
#df_ftpdir["ext"]==".zip"
df_zips = df_ftpdir[df_ftpdir["ext"]==".zip"]
df_zips.set_index("station_id", inplace = True)
df_zips.tail(10)

Unnamed: 0_level_0,name,ext,size,type
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
15444,stundenwerte_RR_15444_20140901_20201231_hist.zip,.zip,171026,-
15478,stundenwerte_RR_15478_20150201_20201231_hist.zip,.zip,151417,-
15490,stundenwerte_RR_15490_20161201_20201231_hist.zip,.zip,106256,-
15512,stundenwerte_RR_15512_20160901_20201231_hist.zip,.zip,110995,-
15514,stundenwerte_RR_15514_20171101_20201231_hist.zip,.zip,82992,-
15555,stundenwerte_RR_15555_20160501_20201231_hist.zip,.zip,120197,-
15810,stundenwerte_RR_15810_20180601_20201231_hist.zip,.zip,69532,-
19140,stundenwerte_RR_19140_20201101_20201231_hist.zip,.zip,9708,-
19171,stundenwerte_RR_19171_20200901_20201231_hist.zip,.zip,12514,-
19172,stundenwerte_RR_19172_20200901_20201231_hist.zip,.zip,14631,-


### Download the Station Description File

In [17]:
station_fname = df_ftpdir[df_ftpdir['name'].str.contains(station_desc_pattern)]["name"].values[0]
print(station_fname)

# ALternative
#station_fname2 = df_ftpdir[df_ftpdir["name"].str.match("^.*Beschreibung_Stationen.*txt$")]["name"].values[0]
#print(station_fname2)

RR_Stundenwerte_Beschreibung_Stationen.txt


In [18]:
print("grabFile: ")
print("From: " + ftp_dir + station_fname)
print("To:   " + local_ftp_station_dir + station_fname)
grabFile(ftp_dir + station_fname, local_ftp_station_dir + station_fname)

grabFile: 
From: /climate_environment/CDC/observations_germany/climate/hourly/precipitation/historical/RR_Stundenwerte_Beschreibung_Stationen.txt
To:   data/original/DWD/hourly/precipitation/historical/RR_Stundenwerte_Beschreibung_Stationen.txt


In [19]:
# extract column names. They are in German (de)
# We have to use codecs because of difficulties with character encoding (German Umlaute)
import codecs

def station_desc_txt_to_csv(txtfile, csvfile):
    file = codecs.open(txtfile,"r","utf8")#"utf8", iso8859_2
    r = file.readline()
    file.close()
    colnames_de = r.split()
    colnames_de
    
    translate = \
    {'Stations_id':'station_id',
     'von_datum':'date_from',
     'bis_datum':'date_to',
     'Stationshoehe':'altitude',
     'geoBreite': 'latitude',
     'geoLaenge': 'longitude',
     'Stationsname':'name',
     'Bundesland':'state'}
    
    colnames_en = [translate[h] for h in colnames_de]
    
    # Skip the first two rows and set the column names.
    df = pd.read_fwf(txtfile,skiprows=2,names=colnames_en,encoding='iso8859_2', parse_dates=["date_from","date_to"],index_col = 0)
    
    # write csv
    df.to_csv(csvfile, sep = ";")
    return(df)

In [20]:
basename = os.path.splitext(station_fname)[0]
df_stations = station_desc_txt_to_csv(local_ftp_station_dir + station_fname, local_station_dir + basename + ".csv")


df_stations['date_from']=pd.to_datetime(df_stations['date_from'],infer_datetime_format=True) 
df_stations['date_to']=pd.to_datetime(df_stations['date_to'],infer_datetime_format=True)
df_stations.tail()

Unnamed: 0_level_0,date_from,date_to,altitude,latitude,longitude,name,state
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
19319,2021-11-01,2022-03-03,340,50.2891,7.0888,Anschau,Rheinland-Pfalz
19361,2021-10-29,2022-03-03,540,50.4159,11.0437,Frankenblick-Rauenstein,Thüringen
19362,2021-10-29,2022-03-03,720,50.4602,11.0858,Neuhaus-Steinheid (Thür.),Thüringen
19366,2021-11-29,2022-03-03,242,49.1505,7.9174,Annweiler am Trifels-Stein,Rheinland-Pfalz
19367,2021-11-29,2022-03-03,174,49.2133,7.98,Annweiler am Trifels/Pfalz,Rheinland-Pfalz


### Select Stations Located in NRW from Station Description Dataframe

In [21]:
prep_s_df=gpd.read_file('./data/generated/DWD/hourly/precipitation/historical/Prep_df_stations_cologne.geojson', index_col='station_id')\
.set_crs(4326, allow_override=True).to_crs(5676).set_index('station_id')
prep_s_df

Unnamed: 0_level_0,date_from,date_to,altitude,latitude,longitude,name,state,field_9,ID_0,ISO,...,NAME_1,ID_2,NAME_2,ID_3,NAME_3,TYPE_3,ENGTYPE_3,NL_NAME_3,VARNAME_3,geometry
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1024,2006-08-01,2022-03-02,37,51.1157,6.851,Dormagen-Zons,Nordrhein-Westfalen,,86,DEU,...,Nordrhein-Westfalen,29,Köln,289,Cologne Städte,Kreisfreie Städte,Urban district,,Koln,MULTIPOINT Z (2559632.490 5664862.623 37.000)
2667,1995-09-01,2022-03-02,92,50.8646,7.1575,Köln-Bonn,Nordrhein-Westfalen,,86,DEU,...,Nordrhein-Westfalen,29,Köln,289,Cologne Städte,Kreisfreie Städte,Urban district,,Koln,MULTIPOINT Z (2581532.425 5637222.637 92.000)
2968,2008-12-01,2022-03-02,43,50.9894,6.9777,Köln-Stammheim,Nordrhein-Westfalen,,86,DEU,...,Nordrhein-Westfalen,29,Köln,289,Cologne Städte,Kreisfreie Städte,Urban district,,Koln,MULTIPOINT Z (2568690.662 5650922.640 43.000)
6047,2019-02-14,2022-03-02,78,50.995,6.7111,Buesdorf,Nordrhein-Westfalen,,86,DEU,...,Nordrhein-Westfalen,29,Köln,289,Cologne Städte,Kreisfreie Städte,Urban district,,Koln,MULTIPOINT Z (2549966.231 5651331.126 78.000)
6058,2019-02-14,2022-03-02,180,51.0877,7.1005,Burscheid,Nordrhein-Westfalen,,86,DEU,...,Nordrhein-Westfalen,29,Köln,289,Cologne Städte,Kreisfreie Städte,Urban district,,Koln,MULTIPOINT Z (2577149.147 5661979.532 180.000)
6067,2019-02-14,2022-03-02,73,50.8825,6.6989,Kerpen,Nordrhein-Westfalen,,86,DEU,...,Nordrhein-Westfalen,29,Köln,289,Cologne Städte,Kreisfreie Städte,Urban district,,Koln,MULTIPOINT Z (2549228.395 5638807.907 73.000)
13671,2007-12-01,2022-03-02,221,50.9655,7.2753,Overath-Böke,Nordrhein-Westfalen,,86,DEU,...,Nordrhein-Westfalen,29,Köln,289,Cologne Städte,Kreisfreie Städte,Urban district,,Koln,MULTIPOINT Z (2589631.310 5648583.441 221.000)
14158,2012-06-13,2022-03-02,46,51.1189,6.9322,Langenfeld,Nordrhein-Westfalen,,86,DEU,...,Nordrhein-Westfalen,29,Köln,289,Cologne Städte,Kreisfreie Städte,Urban district,,Koln,MULTIPOINT Z (2565313.654 5665287.537 46.000)
14180,2016-06-01,2022-02-25,43,51.0469,6.9729,Leverkusen (Klärwerk),Nordrhein-Westfalen,,86,DEU,...,Nordrhein-Westfalen,29,Köln,289,Cologne Städte,Kreisfreie Städte,Urban district,,Koln,MULTIPOINT Z (2568269.170 5657314.744 43.000)


In [22]:
df_zips.tail()

Unnamed: 0_level_0,name,ext,size,type
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
15555,stundenwerte_RR_15555_20160501_20201231_hist.zip,.zip,120197,-
15810,stundenwerte_RR_15810_20180601_20201231_hist.zip,.zip,69532,-
19140,stundenwerte_RR_19140_20201101_20201231_hist.zip,.zip,9708,-
19171,stundenwerte_RR_19171_20200901_20201231_hist.zip,.zip,12514,-
19172,stundenwerte_RR_19172_20200901_20201231_hist.zip,.zip,14631,-


In [23]:
#print(dfNRW.index)
station_ids_selected = list(prep_s_df.index)
set(station_ids_selected).issubset(set(df_zips)) # we have some missing stations

False

### Download TS Data from FTP Server

Problem: Not all stations listed in the station description file are associated with a time series (zip file)! The stations in the description file and the set of stations whoch are TS data provided for (zip files) do not match perfectly.  

In [24]:
# Add the names of the zip files only to a list. 
local_zip_list = []

for station_id in station_ids_selected:
    try:
        fname = df_zips["name"][station_id]
        #print(fname)
        grabFile(ftp_dir + fname, local_ftp_ts_dir + fname)
        local_zip_list.append(fname)
    except:
        print("WARNING: TS file for key %d not found in FTP directory." % station_id)



### Concat the  Time Series dfs




In [25]:
# column names
#STATIONS_ID;MESS_DATUM;QN_9;TT_TU;RF_TU;eor

In [26]:
def temp_ts_to_df(fname):
    
    df = pd.read_csv(fname, delimiter=";", encoding="iso8859_2")
   
    
    # https://medium.com/@chaimgluck1/working-with-pandas-fixing-messy-column-names-42a54a6659cd

    # Column headers: remove leading blanks (strip), replace " " with "_", and convert to lower case.
    df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_').str.replace('(', '').str.replace(')', '')
    df['mess_datum']=pd.to_datetime(df['mess_datum'], format='%Y%m%d%H', utc=True, errors='ignore')
    #begin_date=dt.datetime.strptime('01-08-2019',"%d-%m-%Y")
    begin_date=dt.datetime(2019, 8, 1, 0, 0, 0, tzinfo=dt.timezone.utc)
    df=df[df['mess_datum']>begin_date]
    return(df)

In [27]:
from zipfile import ZipFile

In [28]:
def ts_merge():
    # Very compact code.
    df = pd.DataFrame()
    for elt in local_zip_list:
        ffname = local_ftp_ts_dir + elt
        print("Zip archive: " + ffname)
        with ZipFile(ffname) as myzip:
            # read the time series data from the file starting with "produkt"
            prodfilename = [elt for elt in myzip.namelist() if elt.split("_")[0]=="produkt"][0] 
            print("Extract product file: %s" % prodfilename)
            print()
            with myzip.open(prodfilename) as myfile:
                dftmp = temp_ts_to_df(myfile)
                df = pd.concat([df, dftmp])

    
    return(df)

In [29]:
df_merged_ts = ts_merge()

Zip archive: data/original/DWD/hourly/precipitation/historical/stundenwerte_RR_01024_20060801_20201231_hist.zip
Extract product file: produkt_rr_stunde_20060801_20201231_01024.txt

Zip archive: data/original/DWD/hourly/precipitation/historical/stundenwerte_RR_02667_19950901_20201231_hist.zip
Extract product file: produkt_rr_stunde_19950901_20201231_02667.txt

Zip archive: data/original/DWD/hourly/precipitation/historical/stundenwerte_RR_02968_20081201_20201231_hist.zip
Extract product file: produkt_rr_stunde_20081201_20201231_02968.txt

Zip archive: data/original/DWD/hourly/precipitation/historical/stundenwerte_RR_13671_20071201_20201231_hist.zip
Extract product file: produkt_rr_stunde_20071201_20201231_13671.txt



### Description

Stations_ID;Von_Datum;Bis_Datum;Stationsname;Parameter;Parameterbeschreibung;Einheit;Datenquelle (Strukturversion=SV);Zusatz-Info;Besonderheiten;Literaturhinweis;eor;
3;19950901;20010331;Aachen;R1;stdl. Niederschlagshoehe;mm;Daten aus SYNOP-Meldungen (FM 12 1981-31.03.2001);in UTC;;;eor;
3;20010401;20110401;Aachen;R1;stdl. Niederschlagshoehe;mm;Daten aus SYNOP-Meldungen (FM 12 ab 01.04.2001 mit neuen nationalen Schl¸sselgruppen);in UTC;;;eor;
3;19950901;20010331;Aachen;RS_IND;Indikator Niederschlag ja/nein;numerischer Code;Daten aus SYNOP-Meldungen (FM 12 1981-31.03.2001);in UTC;;;eor;
3;20010401;20110401;Aachen;RS_IND;Indikator Niederschlag ja/nein;numerischer Code;Daten aus SYNOP-Meldungen (FM 12 ab 01.04.2001 mit neuen nationalen Schl¸sselgruppen);in UTC;;;eor;
3;20010403;20110401;Aachen;WRTR;stdl. Niederschlagsform (=Niederschlagshoehe_ind);numerischer Code;Daten aus SYNOP-Meldungen (FM 12 ab 01.04.2001 mit neuen nationalen Schl¸sselgruppen);01,02,04,05,07,08,10,11,13,14,16,17,19,20,22,23 UTC=1h Bezugszeitraum;;;eor;
Legende: FT  = Folgetag; GZ = Gesetzliche Zeit
generiert: 12.04.2021 --  Deutscher Wetterdienst  --


In [30]:
df_merged_ts.head()

Unnamed: 0,stations_id,mess_datum,qn_8,r1,rs_ind,wrtr,eor
113806,1024,2019-08-01 01:00:00+00:00,3,0.0,0,-999,eor
113807,1024,2019-08-01 02:00:00+00:00,3,0.0,0,-999,eor
113808,1024,2019-08-01 03:00:00+00:00,3,0.0,0,-999,eor
113809,1024,2019-08-01 04:00:00+00:00,3,0.0,0,-999,eor
113810,1024,2019-08-01 05:00:00+00:00,3,0.0,0,-999,eor


In [31]:
df_merged_ts.shape

(49768, 7)

In [32]:
df_merged_ts['mess_datum']=pd.to_datetime(df_merged_ts['mess_datum'], format='%Y%m%d%H', utc=True, errors='ignore')
df_merged_ts.head()

Unnamed: 0,stations_id,mess_datum,qn_8,r1,rs_ind,wrtr,eor
113806,1024,2019-08-01 01:00:00+00:00,3,0.0,0,-999,eor
113807,1024,2019-08-01 02:00:00+00:00,3,0.0,0,-999,eor
113808,1024,2019-08-01 03:00:00+00:00,3,0.0,0,-999,eor
113809,1024,2019-08-01 04:00:00+00:00,3,0.0,0,-999,eor
113810,1024,2019-08-01 05:00:00+00:00,3,0.0,0,-999,eor


In [33]:
df_all=df_merged_ts.set_index('stations_id').join(df_stations, how='left')

In [34]:
df_merged_ts.to_csv(local_ts_merged_dir + "ts_merged.csv",sep=";")

In [35]:
df_stations.to_csv(local_ts_merged_dir + "df_stations.csv",sep=";")

In [36]:
df_all.head()

Unnamed: 0,mess_datum,qn_8,r1,rs_ind,wrtr,eor,date_from,date_to,altitude,latitude,longitude,name,state
1024,2019-08-01 01:00:00+00:00,3,0.0,0,-999,eor,2006-08-01,2022-03-03,37,51.1157,6.851,Dormagen-Zons,Nordrhein-Westfalen
1024,2019-08-01 02:00:00+00:00,3,0.0,0,-999,eor,2006-08-01,2022-03-03,37,51.1157,6.851,Dormagen-Zons,Nordrhein-Westfalen
1024,2019-08-01 03:00:00+00:00,3,0.0,0,-999,eor,2006-08-01,2022-03-03,37,51.1157,6.851,Dormagen-Zons,Nordrhein-Westfalen
1024,2019-08-01 04:00:00+00:00,3,0.0,0,-999,eor,2006-08-01,2022-03-03,37,51.1157,6.851,Dormagen-Zons,Nordrhein-Westfalen
1024,2019-08-01 05:00:00+00:00,3,0.0,0,-999,eor,2006-08-01,2022-03-03,37,51.1157,6.851,Dormagen-Zons,Nordrhein-Westfalen


In [37]:
df_all.to_csv(local_ts_merged_dir + "df_all.csv",sep=";")

In [38]:
df_all.shape

(49768, 13)