
# Weather Data for DPPD Covid-19
In this notebook we will download temperature, precipitation and sunshine duration data from _Deutscher Wetterdienst_ in order to include it into the Covid-19 Positive Deviance Analysis.

Unfortunately, DWD does not provide access via an API, so we'll have to download the measurement data from each weather station for the last 500 days until yesterday and extract the desired information manually.

Exact specifications of the data sets that are being used can be found here:
- [temperature, hourly averages in Germany](https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/hourly/air_temperature/recent/BESCHREIBUNG_obsgermany_climate_hourly_tu_recent_de.pdf)
- [precipitation, hourly averages in Germany](https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/hourly/precipitation/recent/BESCHREIBUNG_obsgermany_climate_hourly_precipitation_recent_de.pdf)
- [sunshine duration, hourly in Germany](https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/hourly/sun/recent/BESCHREIBUNG_obsgermany_climate_hourly_sun_recent_de.pdf)



In [1]:
# web scraping
import requests
from bs4 import BeautifulSoup

# file handling
from pathlib import Path
from zipfile import ZipFile
import os
import shutil

# progress bar
from tqdm import tqdm
import time


import pandas as pd

## URLs and Paths

In [2]:
# base urls to the server folders containing the data zip files
url_temp = "https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/hourly/air_temperature/recent/"
url_prec = "https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/hourly/precipitation/recent/"
url_sun = "https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/hourly/sun/recent/"
url_wind = "https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/hourly/wind/recent/"

# lists of the stations involved in measuring the corresponding data
url_temp_stations = "https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/hourly/air_temperature/recent/TU_Stundenwerte_Beschreibung_Stationen.txt"
url_prec_stations = "https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/hourly/precipitation/recent/RR_Stundenwerte_Beschreibung_Stationen.txt"
url_sun_stations = "https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/hourly/sun/recent/SD_Stundenwerte_Beschreibung_Stationen.txt"
url_wind_stations = "https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/hourly/wind/recent/FF_Stundenwerte_Beschreibung_Stationen.txt"

# create folders to place downloaded content in
path_base = Path.cwd()

path_downloads_temp = Path.joinpath(path_base, "downloads", "temp")
path_downloads_temp.mkdir(parents=True, exist_ok=True)

path_downloads_prec = Path.joinpath(path_base, "downloads", "prec")
path_downloads_prec.mkdir(parents=True, exist_ok=True)

path_downloads_sun = Path.joinpath(path_base, "downloads", "sun")
path_downloads_sun.mkdir(parents=True, exist_ok=True)

path_downloads_wind = Path.joinpath(path_base, "downloads", "wind")
path_downloads_wind.mkdir(parents=True, exist_ok=True)

# folders for exported content
path_export = Path.joinpath(path_base, "exports")
path_export.mkdir(parents=True, exist_ok=True)

## Functions

In [3]:
def stations_description_parser(url):
    """Reads the stations description that lists all weather stations that participated in
       measuring the corresponding climate property.
       There exist around 700 stations that measure temperature and sun, and about 1000
       stations that measure precipitation.
    """
    req = requests.get(url)
    text = req.text.splitlines()

    data = []
    for line in text[2:]:
        e = line.split()
        station_id = e.pop(0)
        start_date = e.pop(0)
        end_date = e.pop(0)
        altitude = e.pop(0)
        latitude = e.pop(0)
        longitude = e.pop(0)
        state = e.pop(-1)
        station_name = " ".join(e)

        row = [station_id, start_date, end_date, altitude, latitude, longitude, station_name, state]
        data.append(row)

    columns = ["station_id", "start_date", "end_date", "altitude", "latitude", "longitude", "name", "state"]
    df = pd.DataFrame(data, columns=columns)
    
    # convert columns to numeric 
    df = df.apply(pd.to_numeric, errors="ignore")
    
    # convert dates into datetime objects
    df[["start_date", "end_date"]] = df[["start_date", "end_date"]].apply(pd.to_datetime, format="%Y%m%d")
    
    return df





def scrape_file_urls(url, prefix, suffix):
    """Find all zip files that are available on the DWD server.
    """
    req = requests.get(url)
    soup = BeautifulSoup(req.content, "html.parser")

    anchors = soup.find_all("a")
    links = []

    for a in anchors:
        ref = a.get("href")
        if ref.startswith(prefix) and ref.endswith(suffix):
            links.append(ref)
    return links
    
    
    
    
    
def download_zips(filenames, url_server, path_destination):
    """Downloads all zip files from the corresponding DWD server directory, extracts them and
       keeps only the zip file content.
    """

    if not Path(path_destination).is_dir():
        print("Invalid directory")
        return
    
    print(f"Downloading files from {url_server} to {path_destination}...")
    time.sleep(0.5) # otherwise progress bar gets messed up
    for file in tqdm(filenames):
        req = requests.get(url_server + file)

        filename = Path.joinpath(path_destination, file)
        filename.write_bytes(req.content)

        # unzip the file and only keep the extracted content
        with ZipFile(filename, "r") as zippy:
            dirname = Path.joinpath(path_destination, file[:-4])
            
            # remove already-existing directory from previous run of this notebook
            if os.path.isdir(dirname):
                try:
                    shutil.rmtree(dirname)
                except OSError as e:
                    print(e)
                    print("Old downloads could not be removed by the program. " +
                          "This might happen if you are working via SSH. " + 
                          "Try removing the corresponding folders manually.")

            try:
                dirname.mkdir()
                zippy.extractall(dirname)
#                 os.remove(filename)
            except Exception as e:
                print(e)
                
                
                
                
def scan_downloaded_folders(directory, prefix, suffix):
    """Searches the specified directory for folders that match the prefix and the suffix
       and returns a list of paths of all these folders.
    """
    
    dir_list = []

    dir_iter = os.scandir(directory)
    for i in dir_iter:
        if i.is_dir():
            name = i.name
            if name.startswith(prefix) and name.endswith(suffix):
                dir_list.append(Path.joinpath(directory, name))
    return dir_list

In [4]:
temp_filenames = scrape_file_urls(url_temp, "stundenwerte_TU_", "_akt.zip")
prec_filenames = scrape_file_urls(url_prec, "stundenwerte_RR_", "_akt.zip")
sun_filenames = scrape_file_urls(url_sun, "stundenwerte_SD_", "_akt.zip")
wind_filenames = scrape_file_urls(url_wind, "stundenwerte_FF_", "_akt.zip")

In [5]:
# wind_filenames

In [6]:
# download all the zip files (about 960 MB)
answer = None
answer = input("Do you want to download 1GB? (y/n)")
if answer == "y":
    download_zips(temp_filenames, url_temp, path_downloads_temp)
    download_zips(prec_filenames, url_prec, path_downloads_prec)
    download_zips(sun_filenames, url_sun, path_downloads_sun)
    download_zips(wind_filenames, url_wind, path_downloads_wind)

Do you want to download 1GB? (y/n)y
Downloading files from https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/hourly/air_temperature/recent/ to C:\Users\joshu\OneDrive\Desktop\giz_dppd_covid19\siemens_hackathon_stuff\weather_4_hackathon\downloads\temp...


100%|████████████████████████████████████████████████████████████████████████████████| 512/512 [01:53<00:00,  4.52it/s]


Downloading files from https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/hourly/precipitation/recent/ to C:\Users\joshu\OneDrive\Desktop\giz_dppd_covid19\siemens_hackathon_stuff\weather_4_hackathon\downloads\prec...


100%|████████████████████████████████████████████████████████████████████████████████| 972/972 [03:19<00:00,  4.86it/s]


Downloading files from https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/hourly/sun/recent/ to C:\Users\joshu\OneDrive\Desktop\giz_dppd_covid19\siemens_hackathon_stuff\weather_4_hackathon\downloads\sun...


100%|████████████████████████████████████████████████████████████████████████████████| 303/303 [01:01<00:00,  4.91it/s]


Downloading files from https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/hourly/wind/recent/ to C:\Users\joshu\OneDrive\Desktop\giz_dppd_covid19\siemens_hackathon_stuff\weather_4_hackathon\downloads\wind...


100%|████████████████████████████████████████████████████████████████████████████████| 294/294 [01:08<00:00,  4.30it/s]


In [4]:
# find downloaded folders
downloaded_temp_folders = scan_downloaded_folders(path_downloads_temp, "stundenwerte_TU_", "_akt")
downloaded_prec_folders = scan_downloaded_folders(path_downloads_prec, "stundenwerte_RR_", "_akt")
downloaded_sun_folders = scan_downloaded_folders(path_downloads_sun, "stundenwerte_SD_", "_akt")
downloaded_wind_folders = scan_downloaded_folders(path_downloads_wind, "stundenwerte_FF_", "_akt")
# downloaded_wind_folders

In [5]:
# downloaded_wind_folders

In [6]:
def find_product_file(directory, prefix, suffix):
    """Searches the extracted content of a downloaded zip file for the product file,
       which contains the information we are interested in. There exist additional files
       that provide meta data which we will ignore for now.
    """
    file_list = []
    
    dir_iter = os.scandir(directory)
    for i in dir_iter:
        if i.is_file():
            name = i.name
            if name.startswith(prefix) and name.endswith(suffix):
                file_list.append(Path.joinpath(directory, name))
    
    # make sure that only one product file was contained in the folder
    if len(file_list) == 1:
        return file_list[0]
    elif len(file_list) > 1:
        raise Exception("There seem to exist two product files for the same station!")
    else:
        raise Exception("No product file found!")

        
        

def temp_to_dataframe(file_path):
    """Converts the downloaded temperature text file into a pandas dataframe.
    """
    text = file_path.read_text()
    text = text.splitlines()
    
    data = []
    for line in text[1:]:
        row = line.split(";")
        values = [value.strip() for value in row]
        values.pop()
        data.append(values)

    colnames = ["station_id", "date", "quality", "temperature", "humidity"]
    df = pd.DataFrame(data, columns=colnames)

    # TODO apply processing to columns (e.g. datetime)

    return df



def prec_to_dataframe(file_path):
    text = file_path.read_text()
    text = text.splitlines()
    
    data = []
    for line in text[1:]:
        row = line.split(";")
        values = [value.strip() for value in row]
        values.pop()
        data.append(values)

    colnames = ["station_id", "date", "quality", "R1", "R1_IND", "WRTR"]
    df = pd.DataFrame(data, columns=colnames)


    # TODO apply processing to columns (e.g. datetime)

    return df



def sun_to_dataframe(file_path):
    text = file_path.read_text()
    text = text.splitlines()
    
    data = []
    for line in text[1:]:
        row = line.split(";")
        values = [value.strip() for value in row]
        values.pop()
        data.append(values)

    colnames = ["station_id", "date", "quality", "SD_SO"]
    df = pd.DataFrame(data, columns=colnames)

    # TODO apply processing to columns (e.g. datetime)

    return df

def wind_to_dataframe(file_path):
    """Converts the downloaded wind text file into a pandas dataframe.
    """
    text = file_path.read_text()
    text = text.splitlines()
    
    data = []
    for line in text[1:]:
        row = line.split(";")
        values = [value.strip() for value in row]
        values.pop()
        data.append(values)

    colnames = ["station_id", "date", "quality", "velocity", "direction"]
    df = pd.DataFrame(data, columns=colnames)

    # TODO apply processing to columns (e.g. datetime)

    return df

def collect_all_product_files(dirlist, prefix, suffix):
    frames = []
    for folder_name in dirlist:
        #path = Path.joinpath(rootdir, folder_name)
        path = Path(folder_name)
        # find product file name
        try:
            product_file = find_product_file(path, prefix, suffix)
        except:
            print("Product file not found")
            continue

        # extract the data from this file
        if prefix == "produkt_tu_stunde_":
            df = temp_to_dataframe(product_file)
            frames.append(df)
        elif prefix == "produkt_rr_stunde_":
            df = prec_to_dataframe(product_file)
        elif prefix == "produkt_sd_stunde_":
            df = sun_to_dataframe(product_file)
        elif prefix == "produkt_ff_stunde_":
            df = wind_to_dataframe(product_file)
        else:
            raise Exception(f"Unsupported prefix {prefix}!")
            
        frames.append(df)

    # concatenate all the frames into one frame
    return pd.concat(frames)

In [7]:
# extract all data
df_temp = collect_all_product_files(downloaded_temp_folders, "produkt_tu_stunde_", ".txt")
df_prec = collect_all_product_files(downloaded_prec_folders, "produkt_rr_stunde_", ".txt")

In [8]:
df_sun = collect_all_product_files(downloaded_sun_folders, "produkt_sd_stunde_", ".txt")
df_wind = collect_all_product_files(downloaded_wind_folders, "produkt_ff_stunde_", ".txt")

In [9]:
df_temp_stations = stations_description_parser(url_temp_stations)
df_prec_stations = stations_description_parser(url_prec_stations)
df_sun_stations = stations_description_parser(url_sun_stations)
df_wind_stations = stations_description_parser(url_wind_stations)


## Data Cleaning

In [10]:
def clean_data(df):
    df = df.apply(pd.to_numeric)
    df["date"] = pd.to_datetime(df["date"], format="%Y%m%d%H")

    # -999 = missing data
    if 'temperature' in df.columns:
        df.loc[df["temperature"] == -999, "temperature"] = float("NaN")
    if 'humidity' in df.columns:
        df.loc[df["humidity"] == -999, "humidity"] = float("NaN")
    if 'R1' in df.columns:
        df.loc[df["R1"] == -999, "R1"] = float("NaN")
    if 'R1_IND' in df.columns:
        df.loc[df["R1_IND"] == -999, "R1_IND"] = float("NaN")
    if 'SD_SO' in df.columns:
        df.loc[df["SD_SO"] == -999, "SD_SO"] = float("NaN")
    if 'velocity' in df.columns:
        df.loc[df["velocity"] == -999, "velocity"] = float("NaN")
    if 'direction' in df.columns:
        df.loc[df["direction"] == -999, "direction"] = float("NaN")
    return df

df_temp = clean_data(df_temp)
df_prec = clean_data(df_prec)
df_sun = clean_data(df_sun)
df_wind = clean_data(df_wind)

In [11]:
df_temp.sample(20)

Unnamed: 0,station_id,date,quality,temperature,humidity
6616,5871,2020-01-04 16:00:00,3,2.3,91.0
4228,5014,2019-09-27 04:00:00,3,14.1,96.0
4461,2201,2019-10-06 21:00:00,3,7.4,87.0
11714,4480,2020-08-04 02:00:00,1,10.3,95.0
7365,4371,2020-02-04 21:00:00,3,3.7,89.0
7722,102,2020-02-20 12:00:00,3,8.1,90.0
6612,2578,2020-01-04 12:00:00,3,5.4,83.0
3706,954,2020-06-09 01:00:00,3,11.5,67.0
3477,3362,2019-08-26 21:00:00,3,18.8,89.0
5439,3679,2019-11-16 15:00:00,3,5.0,86.0


In [None]:
df_wind.dtypes

In [12]:
# Discard all data from before 2020
df_temp = df_temp[df_temp['date'].dt.year == 2020]
df_prec = df_prec[df_prec['date'].dt.year == 2020]
df_sun = df_sun[df_sun['date'].dt.year == 2020]
df_wind = df_wind[df_wind['date'].dt.year == 2020]

In [13]:
# export as pickle
df_temp.to_pickle(Path.joinpath(path_export, "temp.pkl"))
df_temp_stations.to_pickle(Path.joinpath(path_export, "temp_stations.pkl"))

df_prec.to_pickle(Path.joinpath(path_export, "prec.pkl"))
df_prec_stations.to_pickle(Path.joinpath(path_export, "prec_stations.pkl"))

df_sun.to_pickle(Path.joinpath(path_export, "sun.pkl"))
df_prec_stations.to_pickle(Path.joinpath(path_export, "sun_stations.pkl"))

df_wind.to_pickle(Path.joinpath(path_export, "wind.pkl"))
df_wind_stations.to_pickle(Path.joinpath(path_export, "wind_stations.pkl"))

In [None]:
# pd.merge(df_wind, df_wind_stations, on='station_id')