#### Connection Parameters

In [None]:
server = "opendata.dwd.de"
user   = "anonymous"
passwd = ""

#### FTP Directory Definition and Station Description Filename Pattern

In [None]:
topic_dir = "/hourly/precipitation/recent/"
station_desc_pattern = "_Beschreibung_Stationen.txt"
ftp_climate_data_dir = "/climate_environment/CDC/observations_germany/climate/"
ftp_dir =  ftp_climate_data_dir + topic_dir

#### Creating local Directories

In [None]:
local_ftp_dir         = "../data_pp/original/"      # Local directory to store local ftp data copies
local_ftp_station_dir = local_ftp_dir + topic_dir  # Local directory where local station info is located
local_ftp_ts_dir      = local_ftp_dir + topic_dir  # Local directory where time series downloaded from ftp are located

local_generated_dir   = "../data_pp/generated/"          # The generated of derived data in contrast to local_ftp_dir
local_station_dir     = local_generated_dir + topic_dir # Derived station data
local_ts_merged_dir   = local_generated_dir + topic_dir # Parallelly merged time series, wide data frame with one TS per column
local_ts_appended_dir = local_generated_dir + topic_dir # Serially appended time series, long data frame for QGIS TimeManager Plugin


In [None]:
print(local_ftp_station_dir)
print(local_ftp_ts_dir)
print()
print(local_generated_dir)
print(local_station_dir)
print(local_ts_merged_dir)
print(local_ts_appended_dir)

In [None]:
import os
os.makedirs(local_ftp_dir,exist_ok = True)
os.makedirs(local_ftp_station_dir,exist_ok = True)
os.makedirs(local_ftp_ts_dir,exist_ok = True)

os.makedirs(local_generated_dir,exist_ok = True)
os.makedirs(local_station_dir,exist_ok = True)
os.makedirs(local_ts_merged_dir,exist_ok = True)
os.makedirs(local_ts_appended_dir,exist_ok = True)

#### FTP Connection

In [None]:
import ftplib
ftp = ftplib.FTP(server)
res = ftp.login(user=user, passwd = passwd)
print(res)

ret = ftp.cwd(".")

#ftp.quit()

#### Generate Pandas Dataframe from FTP Directory Listing

In [None]:
import pandas as pd
def gen_df_from_ftp_dir_listing(ftp, ftpdir):
    lines = []
    flist = []
    try:
        # issue the command LIST in the FTP connection 
        res = ftp.retrlines("LIST "+ftpdir, lines.append)
    except:
        print("Error: ftp.retrlines() failed. ftp timeout? Reconnect!")
        return
        
    if len(lines) == 0:
        print("Error: ftp dir is empty")
        return
    
    for line in lines:
#        print(line)
        [ftype, fsize, fname] = [line[0:1], int(line[31:42]), line[56:]]
        
        fext = os.path.splitext(fname)[-1]
        
        if fext == ".zip":
            station_id = int(fname.split("_")[2])
        else:
            station_id = -1 
        
        flist.append([station_id, fname, fext, fsize, ftype])
        
    df_ftpdir = pd.DataFrame(flist,columns=["station_id", "name", "ext", "size", "type"])
    return(df_ftpdir)
df_ftpdir = gen_df_from_ftp_dir_listing(ftp, ftp_dir)
df_ftpdir

#### Download and Process the Station Description File

In [None]:
import pandas as pd
import os
import ftplib

encoding = "latin1" # German Umlaute

def grabFile(ftp,ftpfullname,localfullname):
    try:
        ret = ftp.cwd(".") 
        localfile = open(localfullname, 'wb')
        ftp.retrbinary('RETR ' + ftpfullname, localfile.write, 1024)
        localfile.close()
    
    except ftplib.error_perm:
        print("FTP ERROR. Operation not permitted. File not found?")

    except ftplib.error_temp:
        print("FTP ERROR. Timeout.")

    except ConnectionAbortedError:
        print("FTP ERROR. Connection aborted.")


In [None]:
station_fname = df_ftpdir[df_ftpdir['name'].str.contains(station_desc_pattern)]["name"].values[0]
print("Station description file name:\n%s" % (station_fname))


In [None]:
src = ftp_dir + station_fname
dest = local_ftp_station_dir + station_fname
print("grabFile(ftp, src, dest):")
print("FTP source: " + src)
print("Local dest:   " + dest)
grabFile(ftp, src, dest)

#### Rename the Column Headers

In [None]:
import codecs

def station_desc_txt_to_csv(txtfile, csvfile):
    import pandas as pd
    
    file = codecs.open(txtfile,"r",encoding)
    r = file.readline()
    file.close()
    colnames_de = r.split()
    colnames_de
    
    # German-English dictionary
    translate = \
    {'Stations_id':'station_id',
     'von_datum':'date_from',
     'bis_datum':'date_to',
     'Stationshoehe':'altitude',
     'geoBreite': 'latitude',
     'geoLaenge': 'longitude',
     'Stationsname':'name',
     'Bundesland':'state'}
    
    colnames_en = [translate[h] for h in colnames_de]
    
    # Skip the first two rows and set the column names.
    df = pd.read_fwf(txtfile,skiprows=2,names=colnames_en, parse_dates=["date_from","date_to"],index_col = 0,encoding=encoding)
    
    # write CSV file with field separator semicolon
    df.to_csv(csvfile, sep = ";")
    return(df)

basename = os.path.splitext(station_fname)[0]
df_stations = station_desc_txt_to_csv(local_ftp_station_dir + station_fname, local_station_dir + basename + ".csv")
df_stations.head()

### Select only Stations Located in NRW and being Operational 

In [None]:
isNRW = df_stations['state'].str.contains("Nordrhein")

isOperational = df_stations['date_to'] == df_stations.date_to.max() 

isBefore2021 = df_stations['date_from'] < '2021'

dfNRW = df_stations[isNRW & isOperational & isBefore2021]

print("Number of stations in NRW: \n", dfNRW.count())
dfNRW

### Creating a Geo Data Frame - Geopandas 


In [None]:
import os

conda_prefix = os.environ['conda_prefix']
print(f"CONDA_PREFIX: {conda_prefix:s}")
os.environ['proj_lib'] = conda_prefix + r"\Library\share\proj"
proj_lib = os.environ['proj_lib']
print(f"New env var value: \nPROJ_LIB={proj_lib:s}")

import pyproj
print(f"pyproj.datadir.get_data_dir() -> {pyproj.datadir.get_data_dir():s}") 


In [None]:
import pandas as pd
from geopandas import GeoDataFrame
from shapely.geometry import Point
import fiona
from pyproj import CRS

#df = pd.read_csv('data.csv')
df = dfNRW

geometry = [Point(xy) for xy in zip(df.longitude, df.latitude)]
crs = CRS("epsg:4326") #http://www.spatialreference.org/ref/epsg/2263/
stations_gdf = GeoDataFrame(df, crs=crs, geometry=geometry)

stations_gdf.head()

#### Connecting to the PostGIS database

In [None]:
param_dic = {
  "user" : "geo_master",
  "pw"   : "xxxxxx",
  "host" : "localhost",
  "db"   : "geo"
}

template = "postgresql://{user}:{pw}@{host}:5432/{db}"

db_connection_url = template.format(**param_dic)
print("Connection URL: ", db_connection_url) 

### Write Geopandas Data Frame directly into PostGIS Database


In [None]:

from sqlalchemy import create_engine
from sqlalchemy import Numeric, Float, Date, REAL
#import psycopg2

engine = create_engine(db_connection_url)

In [None]:
dtypes = {"station_id": Numeric(6,0), "altitude" : REAL, "date_from" : Date, "date_to" : Date, "longitude" : REAL, "latitude" : REAL}

engine.execute("DROP VIEW IF EXISTS dwd.v_stations_prec")

stations_gdf.to_postgis(name="stations", schema="dwd", if_exists = "replace", index = "station_id", index_label=True, con=engine, dtype=dtypes)

engine.execute('alter table dwd.stations add primary key (station_id)')

## Download and Process the Time Series Zip Archives
### Dataframe with TS Zip Files from FTP Directory Listing 


In [None]:
df_zips = df_ftpdir[df_ftpdir["ext"]==".zip"]
df_zips.set_index("station_id", inplace = True)
df_zips.tail(30)

### Download TS Data from FTP Server


In [None]:
local_zip_list = []

station_ids_selected = list(dfNRW.index)

for station_id in station_ids_selected:
    try:
        fname = df_zips["name"][station_id]
        print(fname)
        grabFile(ftp, ftp_dir + fname, local_ftp_ts_dir + fname)
        local_zip_list.append(fname)
    except:
        print("WARNING: TS file for key %d not found in FTP directory." % station_id)

### Write the time series to the database

In [None]:
from zipfile import ZipFile
def prec_ts_to_df(fname):

    import pandas as pd
    import pytz
    from datetime import datetime
    
    dateparse = lambda dates: [datetime.strptime(str(d), '%Y%m%d%H') for d in dates]

    df = pd.read_csv(fname, delimiter=";", encoding=encoding, index_col="MESS_DATUM", parse_dates = ["MESS_DATUM"], date_parser = dateparse, na_values = [-999.0, -999])

    df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_', regex=False).str.replace('(', '', regex=False).str.replace(')', '', regex=False)
    df.index.name = df.index.name.strip().lower().replace(' ', '_').replace('(', '').replace(')', '')
    
    # TIME ZONES: https://stackoverflow.com/questions/22800079/converting-time-zone-pandas-dataframe
    # DWD Prec Data is given in UTC
    
    df.index = df.index.tz_localize(pytz.utc)
    
    return(df)

#### The database writer (SQL)

In [None]:
first = True

dtypes = {"station_id": Numeric(6,0), "val" : REAL}

#for elt in local_zip_list[0:1]:
for elt in local_zip_list:
    ffname = local_ftp_ts_dir + elt
    #print("Zip archive: " + ffname)
    with ZipFile(ffname) as myzip:
        # read the time series data from the file starting with "produkt"
        prodfilename = [elt for elt in myzip.namelist() if elt.split("_")[0]=="produkt"][0] 
        print("Extract product file: %s" % prodfilename)
        # print()
        with myzip.open(prodfilename) as myfile:
            dftmp = prec_ts_to_df(myfile)[["stations_id","r1"]]
            # df.rename(columns={'oldName1': 'newName1', 'oldName2': 'newName2'}, inplace=True)
            dftmp.rename(columns={'stations_id': 'station_id', 'r1': 'val', 'mess_datum': 'ts'}, inplace = True)
            dftmp.rename_axis('ts', inplace = True)
            # dftmp.to_csv(f, header=f.tell()==0)
            if (first):
                first = False
                # dftmp.to_csv(csvfname, mode = "w", header = False)
                dftmp.to_sql(name="prec", schema="dwd", if_exists = "replace", index = ["ts"], index_label=True, con=engine, dtype=dtypes)
            else:
                # dftmp.to_csv(csvfname, mode = "a", header = False)
                dftmp.to_sql(name="prec", schema="dwd", if_exists = "append",  index = ["ts"], index_label=True, con=engine, dtype=dtypes)

# After insert completed: ceate index
print("create index")
engine.execute("ALTER TABLE dwd.prec ADD PRIMARY KEY (ts, station_id)")