# Read DWD Time Series for Recent Annual Temperature, Merge with Station Description and Append 

  * Temperature at climate stations in **Bayern** only
  * **Annual mean temperature** data from the **historical** data set
  * Time interval **from 2017 to 2019**  


## DWD Subdirectory, Topic of Interest: annual - KL - historical!

In [None]:
# The topic of interest
topic_dir = "/annual/kl/historical/"
print("Subdirectory on FTP Server:", topic_dir)

## Local Directories

In [None]:
#local_ftp_dir         = "../data/original/DWD/"      # Local directory to store local ftp data copies, the local data source or input data. 
local_ftp_dir         = "data/original/DWD/"      # Local directory to store local ftp data copies, the local data source or input data. 
local_ftp_station_dir = local_ftp_dir + topic_dir # Local directory where local station info is located
local_ftp_ts_dir      = local_ftp_dir + topic_dir # Local directory where time series downloaded from ftp are located

#local_generated_dir   = "../data/generated/DWD/" # The generated of derived data in contrast to local_ftp_dir
local_generated_dir   = "data/generated/DWD/" # The generated of derived data in contrast to local_ftp_dir
local_station_dir     = local_generated_dir + topic_dir # Derived station data, i.e. the CSV file
local_ts_merged_dir   = local_generated_dir + topic_dir # Parallel merged time series, wide data frame with one TS per column
local_ts_appended_dir = local_generated_dir + topic_dir # Serially appended time series, long data frame for QGIS TimeManager Plugin

In [None]:
import os
os.makedirs(local_ftp_dir,exist_ok = True) # it does not complain if the dir already exists.
os.makedirs(local_ftp_station_dir,exist_ok = True)
os.makedirs(local_ftp_ts_dir,exist_ok = True)

os.makedirs(local_generated_dir,exist_ok = True)
os.makedirs(local_station_dir,exist_ok = True)
os.makedirs(local_ts_merged_dir,exist_ok = True)
os.makedirs(local_ts_appended_dir,exist_ok = True)

In [None]:
print(local_ftp_dir)
print(local_ftp_station_dir)
print(local_ftp_ts_dir)
print()
print(local_generated_dir)
print(local_station_dir)
print(local_ts_merged_dir)
print(local_ts_appended_dir)

## FTP Connection

### Connection Parameters

In [None]:
server = "opendata.dwd.de"
user   = "anonymous"
passwd = ""

### FTP Directory Definition and Station Description Filename Pattern

In [None]:
# This is the search pattern common to ALL station description file names 
station_desc_pattern = "_Beschreibung_Stationen.txt"

# Below this directory tree node all climate data are stored.
ftp_climate_data_dir = "/climate_environment/CDC/observations_germany/climate/"

# The absolute ftp directory with the data (topic) of concern
ftp_dir =  ftp_climate_data_dir + topic_dir
print("Absolte FTP directory path with data of concern:", ftp_dir)

### FTP Connect

In [None]:
import ftplib
ftp = ftplib.FTP(server)
res = ftp.login(user=user, passwd = passwd)
print(res)

In [None]:
ret = ftp.cwd(".")

In [None]:
#ftp.quit()

### FTP Grab File Function

In [None]:
def grabFile(ftpfullname,localfullname):
    try:
        ret = ftp.cwd(".") # A dummy action to chack the connection and to provoke an exception if necessary.
        localfile = open(localfullname, 'wb')
        ftp.retrbinary('RETR ' + ftpfullname, localfile.write, 1024)
        localfile.close()
    
    except ftplib.error_perm:
        print("FTP ERROR. Operation not permitted. File not found?")

    except ftplib.error_temp:
        print("FTP ERROR. Timeout.")

    except ConnectionAbortedError:
        print("FTP ERROR. Connection aborted.")



## Generate Pandas Dataframe from FTP Directory Listing

In [None]:
import pandas as pd
import os

def gen_df_from_ftp_dir_listing(ftp, ftpdir):
    lines = []
    flist = []
    try:    
        res = ftp.retrlines("LIST "+ftpdir, lines.append)
    except:
        print("Error: ftp.retrlines() failed. ftp timeout? Reconnect!")
        return
        
    if len(lines) == 0:
        print("Error: ftp dir is empty")
        return
    
    for line in lines:
#        print(line)
        [ftype, fsize, fname] = [line[0:1], int(line[31:42]), line[56:]]
#        itemlist = [line[0:1], int(line[31:42]), line[56:]]
#        flist.append(itemlist)
        
        fext = os.path.splitext(fname)[-1]
        
        if fext == ".zip":
            station_id = int(fname.split("_")[2])
        else:
            station_id = -1 
        
        flist.append([station_id, fname, fext, fsize, ftype])
        
        

    df_ftpdir = pd.DataFrame(flist,columns=["station_id", "name", "ext", "size", "type"])
    return(df_ftpdir)

In [None]:
df_ftpdir = gen_df_from_ftp_dir_listing(ftp, ftp_dir)

In [None]:
df_ftpdir.head(10)

### Dataframe with TS Zip Files

In [None]:
#df_ftpdir["ext"]==".zip"
df_zips = df_ftpdir[df_ftpdir["ext"]==".zip"]
df_zips.set_index("station_id", inplace = True)
df_zips.head(10)

## Download the Station Description File

In [None]:
station_fname = df_ftpdir[df_ftpdir['name'].str.contains(station_desc_pattern)]["name"].values[0]
print(station_fname)

# ALternative
#station_fname2 = df_ftpdir[df_ftpdir["name"].str.match("^.*Beschreibung_Stationen.*txt$")]["name"].values[0]
#print(station_fname2)

In [None]:
print("grabFile: ")
print("From: " + ftp_dir + station_fname)
print("To:   " + local_ftp_station_dir + station_fname)
grabFile(ftp_dir + station_fname, local_ftp_station_dir + station_fname)

In [None]:
# extract column names. They are in German (de)
# We have to use codecs because of difficulties with character encoding (German Umlaute)
import codecs

def station_desc_txt_to_csv(txtfile, csvfile):
    file = codecs.open(txtfile, "r", encoding="latin1")
    r = file.readline()
    file.close()
    colnames_de = r.split()
    colnames_de
    
    translate = \
    {'Stations_id':'station_id',
     'von_datum':'date_from',
     'bis_datum':'date_to',
     'Stationshoehe':'altitude',
     'geoBreite': 'latitude',
     'geoLaenge': 'longitude',
     'Stationsname':'name',
     'Bundesland':'state'}
    
    colnames_en = [translate[h] for h in colnames_de]
    
    # Skip the first two rows and set the column names.
    df = pd.read_fwf(\
        txtfile, skiprows=2, names=colnames_en, \
        parse_dates=["date_from","date_to"], index_col = 0, \
        encoding="latin1")
    
    # write csv
    df.to_csv(csvfile, sep = ";")
    return(df)

In [None]:
basename = os.path.splitext(station_fname)[0]
df_stations = station_desc_txt_to_csv(local_ftp_station_dir + station_fname, local_station_dir + basename + ".csv")
df_stations.head()

### Select Stations Located in Bayern from Station Description Dataframe

In [None]:
# Create variable with TRUE if state is Bayern
isNRW = (df_stations['state'] == "Bayern").values

# Create variable with TRUE if date_to is latest date (indicates operation up to now)
isActive2021 = (df_stations['date_to'] > "2021").values 

# select on both conditions
station_ids_selected = df_stations[isNRW & isActive2021].index

print(f"Stations located in Bayern and still active in 2021: \n{list(station_ids_selected)}")


In [None]:
df_stations.loc[station_ids_selected].head()

## Download TS Data from FTP Server

Problem: Not all stations listed in the station description file are associated with a time series (zip file)! The stations in the description file and the set of stations whoch are TS data provided for (zip files) do not match perfectly.  

In [None]:
# Add the names of the zip files only to a list. 
local_zip_list = []

for station_id in station_ids_selected:
    try:
        fname = df_zips["name"][station_id]
        print(fname)
        grabFile(ftp_dir + fname, local_ftp_ts_dir + fname)
        local_zip_list.append(fname)
    except:
        print("WARNING: TS file for key %d not found in FTP directory." % station_id)

## Selected Period for Time Series in Data Frames

These parameters are used to limit the time period of the series added to the Pandas data frames. 
<br>To select all dates in the series you can set a very broad interval, e.g.

```python
date_from = '2017-01-01'
date_to   = '2019-12-31'
```

In [None]:
date_from = '2017-01-01'
date_to   = '2019-12-31'

## Join (Merge) the Time Series Columns

The goal is to create a data frame with column oriented time series. Each column contuins a time series for one station. Column name is the station number.

More on merge and join: <br>
https://medium.com/@chaimgluck1/working-with-pandas-fixing-messy-column-names-42a54a6659cd



### Precipitation Time Series Data (text file) to Data Frame 
Read a text file with precipitation time series (CSV file) and make it a data frame. <br>
Only add measuerments which lie within the given time period.

In [None]:
import datetime as dt

In [None]:
def prec_ts_to_df(fname, date_from='1700-01-01', date_to='2100-12-31'):
    import datetime as dt
    
    dateparse = lambda dates: [dt.datetime.strptime(str(d), '%Y%m%d%H') for d in dates]

    df = pd.read_csv(fname, delimiter=";", encoding="utf8", index_col="MESS_DATUM", parse_dates = ["MESS_DATUM"], date_parser = dateparse, na_values = [-999.0, -999])

    # Attention: Selecting df with given dates may lead to empty result!
    df = df[(df.index >= date_from) & (df.index <= date_to)]
    
    # Code inspired by: https://medium.com/@chaimgluck1/working-with-pandas-fixing-messy-column-names-42a54a6659cd
    # Column headers: remove leading blanks (strip), replace " " with "_", and convert to lower case.

    df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_', regex=False).str.replace('(', '', regex=False).str.replace(')', '', regex=False)
    df.index.name = df.index.name.strip().lower().replace(' ', '_').replace('(', '').replace(')', '')
    return(df)

### Climate Time Series Data (text file) to Data Frame 
The KL data format for annual temperatures differs significantly from the RR format for hourly precipitation.

Tha annual data uses an interval `[MESS_DATUM_BEGINN, MESS_DATUM_ENDE]`, e.g. ['20180101','20181231'], as time reference for the measurements whereas the hourly data provides a unique time stamp in hourly resolution, e.g. '2018052113'.

In [None]:
def kl_ts_to_df(fname, date_from='1700-01-01', date_to='2100-12-31'):
    import datetime as dt
    
    dateparse = lambda dates: [dt.datetime.strptime(str(d), '%Y%m%d') for d in dates]

    df = pd.read_csv(fname, delimiter=";", encoding="utf8", index_col="MESS_DATUM_BEGINN", parse_dates = ["MESS_DATUM_BEGINN", "MESS_DATUM_ENDE"], date_parser = dateparse, na_values = [-999.0, -999])

    # Attention: Selecting df with given dates may lead to empty result!
    df = df[(df.index >= date_from) & (df.index <= date_to)]
    
    # Code inspired by: https://medium.com/@chaimgluck1/working-with-pandas-fixing-messy-column-names-42a54a6659cd
    # Column headers: remove leading blanks (strip), replace " " with "_", and convert to lower case.

    df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_', regex=False).str.replace('(', '', regex=False).str.replace(')', '', regex=False)
    df.index.name = df.index.name.strip().lower().replace(' ', '_').replace('(', '').replace(')', '')
    return(df)

### Merge Columnwise

In [None]:
from zipfile import ZipFile
def ts_merge(date_from='2017-01-01', date_to='2019-12-31'):
    # Very compact code.
    df = pd.DataFrame()
    for elt in local_zip_list:
        ffname = local_ftp_ts_dir + elt
        print("Zip archive: " + ffname)
        with ZipFile(ffname) as myzip:
            # read the time series data from the file starting with "produkt"
            prodfilename = [elt for elt in myzip.namelist() if elt.split("_")[0]=="produkt"][0] 
            print("Extract product file: %s" % prodfilename)
            print()
            with myzip.open(prodfilename) as myfile:
                dftmp = kl_ts_to_df(myfile, date_from, date_to)

                if len(dftmp) > 0: # Check if cropped df is empty, i.e. no values in given period.
                    s = dftmp["ja_tt"].rename(dftmp["stations_id"][0]).to_frame()
                    df = pd.merge(df, s, left_index=True, right_index=True, how='outer')
                else:
                    print("WARNING: data file", prodfilename, "does not contain data for given period.")

    #df.index.names = ["year"]
    df.index.rename(name = "time", inplace = True)
    return(df)

In [None]:
 df_merged_ts = ts_merge(date_from, date_to)
df_merged_ts = ts_merge(date_from, date_to)

In [None]:
print('df_merged_ts.shape:', df_merged_ts.shape)
df_merged_ts.head()

In [None]:
df_merged_ts.to_csv(local_ts_merged_dir + "ts_merged.csv",sep=";")

## Plotting a few Data Series for a first Check

Plot for example the Series of Stations Essen-Bredeney, Düsseldorf, and Kahler Asten


In [None]:
df_plt = df_stations[ ( ( df_stations['name'].str.contains("Dillingen/Donau-Fristingen") ) | (df_stations['name'].str.contains("Eichstätt-Landershofen") | ( df_stations['name'].str.contains("Kitzingen") ) ) ) & (df_stations['date_to'] > '2019') ]
df_plt

In [None]:
idx = list(df_plt.index)
#idx = [1078,1303]
print("Station_ID to be plotted:", idx)

In [None]:
df_merged_ts[idx].head()

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt

In [None]:
fig1, ax1 = plt.subplots(dpi=136, figsize=(6,4))
df_merged_ts[idx].plot(ax=ax1)
#ax1.set_xlim(pd.Timestamp('2019-05-01'), pd.Timestamp('2019-05-30'))
plt.show()

## Append Time Series 

Append the time series one below the other. The station number is added as an additional column to identify the time series. 

This format is necessary for the **QGIS Time Manager**.

In [None]:
def ts_append(date_from='2017-01-01', date_to='2019-12-31'):
    # Very compact code.
    df = pd.DataFrame()
    for elt in local_zip_list:
        ffname = local_ftp_ts_dir + elt
        print("Zip archive: " + ffname)
        with ZipFile(ffname) as myzip:
            # read the time series data from the file starting with "produkt"
            prodfilename = [elt for elt in myzip.namelist() if elt.split("_")[0]=="produkt"][0] 
            print("Extract product file: %s" % prodfilename)
            print()
            with myzip.open(prodfilename) as myfile:
                dftmp = kl_ts_to_df(myfile, date_from, date_to)

                if len(dftmp) > 0: # Check if cropped df is empty, i.e. no values in given period.
                    dftmp = dftmp.merge(df_stations,how="inner",left_on="stations_id",right_on="station_id",right_index=True)
                    df = df.append(dftmp)
                else:
                    print("WARNING: data file", prodfilename, "does not contain data for given period.")
                
    df.index.rename(name = "time", inplace = True)
    return(df)

In [None]:
df_appended_ts = ts_append(date_from, date_to)

In [None]:
df_appended_ts.head(20)

In [None]:
df_appended_ts.to_csv(local_ts_appended_dir + "ts_appended.csv",sep=";")

## Plot Temperature vs. Altitude

In [None]:
df_plot = df_appended_ts[ (df_appended_ts.index == '2017-01-01') & (df_appended_ts['state'].str.contains("Bayern"))  ]
print("df_plot.shape:", df_plot.shape)

In [None]:
fig2, ax2 = plt.subplots(dpi=136, figsize=(8,4))
#df_appended_ts[idx].plot(ax=ax1)
ax2.plot(df_plot['altitude'],df_plot['ja_tt'],".")
#ax1.set_xlim(pd.Timestamp('2019-05-01'), pd.Timestamp('2019-05-30'))
#ax2.set_ylim(4,14)
ax2.set_ylabel("Annual Mean Temperature")
ax2.set_xlabel("Altitude from Station Description")
ax2.set_title("Annual Mean Temperature in Year 2017 at DWD Stations in Bayern")
ax2.grid(True)
plt.show()

In [None]:
fig2.savefig("fig1.png")

## Data from Zugspitze, just for curiosity ...

In [None]:
df_stations[df_stations['name'].str.contains("Zug")]