Inititalisierung von Variablen und Ordnern

In [54]:
import time
import pandas as pd
import os
import concurrent.futures
import requests
import json
from datetime import datetime
import zipfile
import io
import re
import concurrent.futures

In [55]:
#Definiere abzufragende Stationen
combine_historicforecast_bool =False
station_ids_r = [ "01262", "01975", "02667"]
station_ids_f = [ "10870", "10147", "10513"]
station_place = [ "Muenchen", "Hamburg", "KoelnBonn" ]

In [56]:
#Ordnerstruktur für die Brechnung und Ausgabe
output_folder = "./weather/"
station_folder = "./weather/stations"
computing_folder = "./weather/computing_folder"
stations_combined = "./weather/stations_combined"
data_collection_folder="../data_collection"
forecas_folder="../forecast"

In [57]:
#Basis-URL für die DWD Wetterdaten
base_url_review = "https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/hourly/"
url_forecast = "https://dwd.api.proxy.bund.dev/v30/stationOverviewExtended"

In [58]:
# Nicht benötigte Spalten   
columns_remove_clouds = ["STATIONS_ID","eor", "QN_8","V_N_I"]
columns_remove_pressure = ["STATIONS_ID","eor", "QN_8"]
columns_remove_sun = ["STATIONS_ID","eor", "QN_7"]
columns_remove_temp = ["STATIONS_ID","QN_9", "eor"]
columns_remove_wind = ["STATIONS_ID","eor", "QN_3"]
columns_remove_precipitation = ["STATIONS_ID","eor", "QN_8", "WRTR", "RS_IND"]

columns_remove_forecast = ['isDay','dewPoint2m']

In [59]:
#URL Endungen für die Vergangenheit
data_types = {
    "temperature_historical": "air_temperature/historical/",
    "temperature_recent": "air_temperature/recent/",
    "cloudiness_historical": "cloudiness/historical/",
    "cloudiness_recent": "cloudiness/recent/",
    "pressure_historical": "pressure/historical/",
    "pressure_recent": "pressure/recent/",
    "sun_historical": "sun/historical/",
    "sun_recent": "sun/recent/",
    "wind_historical": "wind/historical/",
    "wind_recent": "wind/recent/",
    "precipitation_recent": "precipitation/recent/",
    "precipitation_historical": "precipitation/historical/",
}

In [60]:
#header für Api zugriff 
headers = {
    "accept": "application/json"
}

Definitionen der Funktionen vom Basiswetterskript

In [61]:
def combine_historic(station_r, place): 
  #Kombiniere die Dateien paarweise
  try:
    file_r = os.path.join(station_folder, station_r, f"{station_r}_data_combined.csv")
    
    #Daten einlesen
    df_r = pd.read_csv(file_r)
    combined_df=df_r
    #Ausgabe-Dateiname
    output_file = os.path.join(stations_combined, f"{place}_review.csv")
    combined_df.to_csv(output_file, index=False)

    print(f"Kombiniert: {station_r} -> {output_file}")

  except FileNotFoundError as e:
    print(f"Datei nicht gefunden: {e}")
  except Exception as e:
    print(f"Fehler beim Verarbeiten von {station_r}: {e}")

In [62]:
def combine_all_stations():
  files = [f for f in os.listdir(stations_combined) if f.endswith('.csv')]

  #Umbennenen der Spalten nach Stationsnamen
  for file in files:
    file_path = os.path.join(stations_combined, file)
    df = pd.read_csv(file_path)
    #Extrahiere den Dateinamen
    file_name = os.path.splitext(file)[0]
    columname=[df.columns[0]] + [f'{col}_{file_name}' for col in df.columns[1:]]
    df.columns = columname
    print(f'Spalten umbennant für {file_name}')
    #station_column_filename = os.path.join(stations_combined, file_name)
    df.to_csv(file_path, index=False)

  #Verbinde alle DataFrames nebeneinander  
  all_data_frames = []
  for file in files:
    file_path = os.path.join(stations_combined, file)  
    
    #Lade Daten aus Datei und füge sie zur Liste
    try:
      df = pd.read_csv(file_path, delimiter=",", parse_dates=["date"], date_format="%Y%m%d%H")
      all_data_frames.append(df)
      print(f"Daten hinzugefügt von: {file_path}")
    except Exception as e:
      print(f"Fehler beim Laden der Datei {file}: {e}")
  
  #Wenn geladen wurden -> kombiniere
  if all_data_frames:
    combined_data = all_data_frames[0]
    for df in all_data_frames[1:]:
      #Test MESS_DATUM als Datum
      df["date"] = pd.to_datetime(df["date"], errors="coerce")                
      #Daten zusammenführen
      combined_data = pd.merge(combined_data, df, on=[  "date"], how="outer")

    #Sortieren und doppelte löschen
    combined_data = combined_data.sort_values(by=[  "date"]).drop_duplicates(subset=[  "date"], keep='last')

  #Speichern
  final_filename = os.path.join(data_collection_folder, f"weather.csv")
  combined_data.to_csv(final_filename, index=False)
  print(f"Alle kombinierten Daten gespeichert in: {final_filename}")

In [63]:
def start_combine_historic():
    max_workers = min(os.cpu_count(), len(station_ids_r))  #Maximal so viele Stationen wie vorhanden oder CPU Anzahl
    print(f"Starte die Verknüfung aller Daten für {len(station_ids_r)} Stationen mit {max_workers} Threads.")
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        #Jede Station wird parallel heruntergeladen
        future_to_station = {executor.submit(combine_historic, station_r, place): (station_r,  place) for station_r,  place in zip(station_ids_r,  station_place) }    
        for future in concurrent.futures.as_completed(future_to_station):
            station_id = future_to_station[future]
            try:
                future.result()  #Funktion ausführen und Fehler abfangen
                print(f"Dateien verknüpft aller Daten für Station {station_id}.")
            except Exception as e:
                print(f"Fehler beim Verknüfung aller Daten für Station {station_id}: {e}")

In [64]:
def combine_forecast():

  files = [f for f in os.listdir(station_folder) if f.endswith('.csv')]

  #Umbennenen der Spalten nach Stationsnamen
  for file in files:
    file_path = os.path.join(station_folder, file)
    df = pd.read_csv(file_path)
    #Extrahiere den Dateinamen
    file_name = os.path.splitext(file)[0]
    columname=[df.columns[0]] + [f'{col}_{file_name}' for col in df.columns[1:]]
    df.columns = columname
    print(f'Spalten umbennant für {file_name}')
    #station_column_filename = os.path.join(stations_combined, file_name)
    df.to_csv(file_path, index=False)

  #Verbinde alle DataFrames nebeneinander  
  all_data_frames = []
  for file in files:
    file_path = os.path.join(station_folder, file)  
    
    #Lade Daten aus Datei und füge sie zur Liste
    try:
      df = pd.read_csv(file_path, delimiter=",", parse_dates=["date"], date_format="%Y%m%d%H")
      all_data_frames.append(df)
      print(f"Daten hinzugefügt von: {file_path}")
    except Exception as e:
      print(f"Fehler beim Laden der Datei {file}: {e}")
  
  #Wenn geladen wurden -> kombiniere
  if all_data_frames:
    combined_data = all_data_frames[0]
    for df in all_data_frames[1:]:
      #Test MESS_DATUM als Datum
      df["date"] = pd.to_datetime(df["date"], errors="coerce")                
      #Daten zusammenführen
      combined_data = pd.merge(combined_data, df, on=[  "date"], how="outer")

    #Sortieren und doppelte löschen
    combined_data = combined_data.sort_values(by=[  "date"]).drop_duplicates(subset=[  "date"], keep='last')


  final_filename = os.path.join(forecas_folder, f"weather_forecast.csv")
  combined_data.to_csv(final_filename, index=False)
  print(f"Kombinierter Forecast gespeichert in: {final_filename}")

In [65]:
def create_folder():
  os.makedirs(computing_folder, exist_ok=True)
  os.makedirs(stations_combined, exist_ok=True)
  for station in station_ids_r:
    output_folder_station = os.path.join(computing_folder, station)
    os.makedirs(output_folder_station, exist_ok=True)
    station_folder =os.path.join(output_folder,'stations',station)
    os.makedirs(station_folder, exist_ok=True)


Wetter Reviewfunktionen:

In [66]:
#Funktion zur Suche und Herunterladen der Wetterdaten pro Station
def station_folderget_weather_data_for_station_review(station_id):
    #os.makedirsrs(computing_folder, exist_ok=True)
    #os.makedirsrs(station_folder, exist_ok=True)
    output_filepath = os.path.join(computing_folder,station_id)
    #os.makedirsrs(output_filepath, exist_ok=True)
    print(f"Speicherort {output_filepath}, computing_folder {computing_folder}, station_id {station_id}")    
    for data_type, endpoint in data_types.items():
        url = base_url_review + endpoint
        
        #Esrtellt Liste von Dateien im Verzeichnis
        response = requests.get(url)
        response.raise_for_status()

        #Suche nach passender ZIP-Datei
        for line in response.text.splitlines():
            if station_id in line and "zip" in line:
                filename = re.search(r'href="(.*?)"', line).group(1)
                file_url = url + filename
                
                #Lade ZIP-Datei herunter
                print(f"Lade herunter: {file_url}")
                file_response = requests.get(file_url)
                file_response.raise_for_status()
                
                #Entpacke ZIP-Datei und suche passender TXT-Datei in der ZIP
                with zipfile.ZipFile(io.BytesIO(file_response.content)) as z:
                    if data_type == "cloudiness_historical" or data_type == "cloudiness_recent":
                        txt_files = [name for name in z.namelist() if re.match(r'produkt_n_stunde_\d{8}_\d{8}_' + station_id + r'\.txt', name)]
                    elif data_type == "pressure_historical" or data_type == "pressure_recent":
                        txt_files = [name for name in z.namelist() if re.match(r'produkt_p0_stunde_\d{8}_\d{8}_' + station_id + r'\.txt', name)]
                    elif data_type == "sun_historical" or data_type == "sun_recent":
                        txt_files = [name for name in z.namelist() if re.match(r'produkt_sd_stunde_\d{8}_\d{8}_' + station_id + r'\.txt', name)]
                    elif data_type == "wind_historical" or data_type == "wind_recent":
                        txt_files = [name for name in z.namelist() if re.match(r'produkt_ff_stunde_\d{8}_\d{8}_' + station_id + r'\.txt', name)]
                    elif data_type == "precipitation_historical" or data_type == "precipitation_recent":
                        txt_files = [name for name in z.namelist() if re.match(r'produkt_rr_stunde_\d{8}_\d{8}_' + station_id + r'\.txt', name)]
                    else:
                        txt_files = [name for name in z.namelist() if re.match(r'produkt_tu_stunde_\d{8}_\d{8}_' + station_id + r'\.txt', name)]
                    
                    if not txt_files:
                        print(f"Keine TXT-Datei im erwarteten Format für Station {station_id} gefunden.")
                        continue  

                    #Wenn TXT-Datei gefunden wurde, lade sie in pandas
                    txt_filename = txt_files[0]
                    with z.open(txt_filename) as f:
                        #Test ob ladbar
                        try:
                            df = pd.read_csv(f, sep=";", encoding="utf-8")
                            if df.empty:
                                print(f"Warnung: Die Datei {txt_filename} ist leer.")
                            else:
                                print("Daten geladen für:", txt_filename)

                                #Ausgabeordener checken

                                #Dateinamen nach Datenart setzen
                                if data_type == "temperature_historical":
                                    new_filename = f"temp_{station_id}_hist.txt"
                                elif data_type == "temperature_recent":
                                    new_filename = f"temp_{station_id}_recent.txt"
                                elif data_type == "cloudiness_historical":
                                    new_filename = f"clouds_{station_id}_hist.txt"
                                elif data_type == "cloudiness_recent":
                                    new_filename = f"clouds_{station_id}_recent.txt"
                                elif data_type == "pressure_historical":
                                    new_filename = f"pressure_{station_id}_hist.txt"
                                elif data_type == "pressure_recent":
                                    new_filename = f"pressure_{station_id}_recent.txt"
                                elif data_type == "sun_historical":
                                    new_filename = f"sun_{station_id}_hist.txt"
                                elif data_type == "sun_recent":
                                    new_filename = f"sun_{station_id}_recent.txt"
                                elif data_type == "wind_historical":
                                    new_filename = f"wind_{station_id}_hist.txt"
                                elif data_type == "wind_recent":
                                    new_filename = f"wind_{station_id}_recent.txt"      
                                elif data_type == "precipitation_historical":
                                    new_filename = f"precipitation_{station_id}_hist.txt"
                                elif data_type == "precipitation_recent":
                                    new_filename = f"precipitation_{station_id}_recent.txt"                                
                                
                                #Speichere TXT-Datei im angegebenen Ordner
                                #print(f"Speicherort {output_filepath}, computing_folder {computing_folder}, station_id {station_id}, new_filename {new_filename}")
                                output_filename = os.path.join(output_filepath, new_filename)                                
                                df.to_csv(output_filename, sep=";", encoding="utf-8", index=False)
                                print(f"Wetterdaten gespeichert unter: {output_filepath}")   
                                print(f"Die Datei wurde erfolgreich gespeichert unter: {os.path.abspath(output_filepath)}")
                        except Exception as e:
                            print(f"Fehler beim Laden der Datei {txt_filename}: {e}")
    cut_historic_bevor_2015(station_id)

In [67]:
#Funktion zum Herunterladen der Wetterdaten für alle angegebenen Stationen
def download_weather_data_for_all_stations_review(station_ids):
    max_workers = min(os.cpu_count(), len(station_ids))  #Maximal 10 Threads oder so viele Stationen wie vorhanden
    print(f"Starte den Download für {len(station_ids)} Stationen mit {max_workers} Threads.")
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        #Jede Station wird parallel heruntergeladen
        future_to_station = {executor.submit(station_folderget_weather_data_for_station_review, station_id): station_id for station_id in station_ids}        
        for future in concurrent.futures.as_completed(future_to_station):
            station_id = future_to_station[future]
            try:
                future.result()  #Funktion ausführen und Fehler abfangen
                print(f"Download abgeschlossen für Station {station_id}.")
            except Exception as e:
                print(f"Fehler beim Herunterladen von Daten für Station {station_id}: {e}")

In [68]:
def cut_historic_bevor_2015(station_id):
    computing_folder_station = os.path.join(computing_folder, station_id)
    station_files = [f for f in os.listdir(computing_folder_station) if re.match(r'(.+)_hist\.txt', f)]    
    for file in station_files:
        file_path = os.path.join(computing_folder_station, file)
        with open(file_path, 'r') as infile:
            lines = infile.readlines()
        
        #Filtert Zeilen nach 2015 sind
        filtered_lines = []
        for line in lines[:1]:
            filtered_lines.append(line)
        for line in lines[1:]:
            columns = line.strip().split(';')
            if len(columns) > 1:  
                mess_datum = columns[1]
                year = int(mess_datum[:4])                
                if year >= 2015:
                    filtered_lines.append(line)

        #Schreibe Zeilen in die Datei zurück
        with open(file_path, 'w') as outfile:
            outfile.writelines(filtered_lines)
        print(f"Historisch bis 2015 gekürzt: {file}")
    
    #Aufruf nur benutzen, wenn start_... in weather nicht ausgeführt wird
    remove_columns_review(station_id)

In [69]:
def start_cut_historic_bevor_2015(station_ids):
    max_workers = min(os.cpu_count(), len(station_ids))  #Maximal 10 Threads oder so viele Stationen wie vorhanden
    print(f"Starte die Kürzung bis 2015 für {len(station_ids)} Stationen mit {max_workers} Threads.")
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        #Jede Station wird parallel heruntergeladen
        future_to_station = {executor.submit(cut_historic_bevor_2015, station_id): station_id for station_id in station_ids}        
        for future in concurrent.futures.as_completed(future_to_station):
            station_id = future_to_station[future]
            try:
                future.result()  #Funktion ausführen und Fehler abfangen
                print(f"Dateien bis 2015 gekürzt für Station {station_id}.")
            except Exception as e:
                print(f"Fehler beim Kürzen bis 2015 für Station {station_id}: {e}")

In [70]:
def remove_columns_review(station_id):
    print('Start Remove Columns')
    computing_folder_station =os.path.join(computing_folder, station_id)
    temp_files = [f for f in os.listdir(computing_folder_station) if f.startswith("temp_") and f.endswith(".txt")]
    clouds_files = [f for f in os.listdir(computing_folder_station) if f.startswith("clouds_") and f.endswith(".txt")]
    pressure_files = [f for f in os.listdir(computing_folder_station) if f.startswith("pressure_") and f.endswith(".txt")]
    sun_files = [f for f in os.listdir(computing_folder_station) if f.startswith("sun_") and f.endswith(".txt")]
    wind_files = [f for f in os.listdir(computing_folder_station) if f.startswith("wind_") and f.endswith(".txt")]
    precipitation_files = [f for f in os.listdir(computing_folder_station) if f.startswith("precipitation_") and f.endswith(".txt")]
    
    for file in clouds_files:
        file_path = os.path.join(computing_folder_station, file)        
        try:
            df = pd.read_csv(file_path, delimiter=";", skipinitialspace=True)            
            #Entferne die Spalten, ganz oben definiert
            df = df.drop(columns=[col for col in columns_remove_clouds if col in df.columns])            
            #Speichere modifizierte Datei
            df.to_csv(file_path, sep=";", index=False)
            print(f"Spalten aus {file} entfernt.")
        
        except Exception as e:
            print(f"Fehler beim Verarbeiten der Datei {file}: {e}")
    
    for file in pressure_files:
        file_path = os.path.join(computing_folder_station, file)
        
        try:
            df = pd.read_csv(file_path, delimiter=";", skipinitialspace=True)            
            #Entferne die Spalten, ganz oben definiert
            df = df.drop(columns=[col for col in columns_remove_pressure if col in df.columns])            
            #Speichere modifizierte Datei
            df.to_csv(file_path, sep=";", index=False)
            print(f"Spalten aus {file} entfernt.")
        
        except Exception as e:
            print(f"Fehler beim Verarbeiten der Datei {file}: {e}")

    for file in sun_files:
        file_path = os.path.join(computing_folder_station, file)
        
        try:
            df = pd.read_csv(file_path, delimiter=";", skipinitialspace=True)            
            #Entferne die Spalten, ganz oben definiert
            df = df.drop(columns=[col for col in columns_remove_sun if col in df.columns])            
            #Speichere modifizierte Datei
            df.to_csv(file_path, sep=";", index=False)
            print(f"Spalten aus {file} entfernt.")
        
        except Exception as e:
            print(f"Fehler beim Verarbeiten der Datei {file}: {e}")

    for file in temp_files:
        file_path = os.path.join(computing_folder_station, file)
        
        try:
            df = pd.read_csv(file_path, delimiter=";", skipinitialspace=True)            
            #Entferne die Spalten, ganz oben definiert
            df = df.drop(columns=[col for col in columns_remove_temp if col in df.columns])            
            #Speichere modifizierte Datei
            df.to_csv(file_path, sep=";", index=False)
            print(f"Spalten aus {file} entfernt.")
        
        except Exception as e:
            print(f"Fehler beim Verarbeiten der Datei {file}: {e}")

    for file in wind_files:
        file_path = os.path.join(computing_folder_station, file)
        
        try:
            df = pd.read_csv(file_path, delimiter=";", skipinitialspace=True)            
            #Entferne die Spalten, ganz oben definiert
            df = df.drop(columns=[col for col in columns_remove_wind if col in df.columns])            
            #Speichere modifizierte Datei
            df.to_csv(file_path, sep=";", index=False)
            print(f"Spalten aus {file} entfernt.")
        
        except Exception as e:
            print(f"Fehler beim Verarbeiten der Datei {file}: {e}")

    for file in precipitation_files:
        file_path = os.path.join(computing_folder_station, file)
        
        try:
            df = pd.read_csv(file_path, delimiter=";", skipinitialspace=True)            
            #Entferne die Spalten, ganz oben definiert
            df = df.drop(columns=[col for col in columns_remove_precipitation if col in df.columns])            
            #Speichere modifizierte Datei
            df.to_csv(file_path, sep=";", index=False)
            print(f"Spalten aus {file} entfernt.")
        
        except Exception as e:
            print(f"Fehler beim Verarbeiten der Datei {file}: {e}")

    #Aufruf nur benutzen, wenn start_... in weather nicht ausgeführt wird
    combine_historic_recent(station_id)

In [71]:
def start_remove_columns_review(station_ids):
    max_workers = min(os.cpu_count(), len(station_ids))  #Maximal 10 Threads oder so viele Stationen wie vorhanden
    print(f"Starte die Löschen von Spalten für {len(station_ids)} Stationen mit {max_workers} Threads.")
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        #Jede Station wird parallel heruntergeladen
        future_to_station = {executor.submit(remove_columns_review, station_id): station_id for station_id in station_ids}        
        for future in concurrent.futures.as_completed(future_to_station):
            station_id = future_to_station[future]
            try:
                future.result()  #Funktion ausführen und Fehler abfangen
                print(f"Spalten gelöscht für Station {station_id}.")
            except Exception as e:
                print(f"Fehler beim Löschen von Spalten für Station {station_id}: {e}")

In [72]:
def combine_historic_recent(station_id):
    computing_folder_station = os.path.join(computing_folder, station_id)    
    #Suche nach Dateien für jeweilige Station
    station_files = [f for f in os.listdir(computing_folder_station) if re.match(r'(.+)_' + station_id + r'_(hist|recent)\.txt', f)]
    
    #Gruppiere Dateien nach Wettertyp und Station-ID
    file_pairs = {}
    for file in station_files:
        match = re.match(r'(.+)_' + station_id + r'_(hist|recent)\.txt', file)
        if match:
            wettertyp, period = match.groups()  #Wettertyp und Zeitraum
            key = f"{wettertyp}_{station_id}"
            if key not in file_pairs:
                file_pairs[key] = {}
            file_pairs[key][period] = os.path.join(computing_folder_station, file)

    #Führe historische und aktuelle Daten zusammen
    for key, file_pair in file_pairs.items():
        if 'hist' in file_pair and 'recent' in file_pair:
            #Einlesen historische, aktuellen Daten
            hist_df = pd.read_csv(file_pair['hist'], delimiter=";")
            recent_df = pd.read_csv(file_pair['recent'], delimiter=";")
            hist_df["MESS_DATUM"] = pd.to_datetime(hist_df["MESS_DATUM"], format="%Y%m%d%H", errors="coerce")
            recent_df["MESS_DATUM"] = pd.to_datetime(recent_df["MESS_DATUM"], format="%Y%m%d%H", errors="coerce")

            #Kombinieren Daten und entferne Duplikaten
            combined_df = pd.concat([hist_df, recent_df]).drop_duplicates(subset=["MESS_DATUM"], keep='last')
            combined_df = combined_df.sort_values(by=["MESS_DATUM"])
            combined_df["MESS_DATUM"] = combined_df["MESS_DATUM"].dt.strftime("%Y%m%d%H")

            #Speichern unter kombinierten Namen
            combined_filename = os.path.join(computing_folder_station, f"{key}_combined.txt")
            combined_df.to_csv(combined_filename, sep=";", index=False)
            print(f"Kombinierte Datei gespeichert: {combined_filename}")
        else:
            print(f"Fehlende Datei für {key}: entweder historische oder aktuelle Datei fehlt.")
    
    #Aufruf nur benutzen, wenn start_... in weather nicht ausgeführt wird
    combine_all_station_data_review(station_id)

In [73]:
def start_combine_historic_recent(station_ids):
    max_workers = min(os.cpu_count(), len(station_ids))  #Maximal 10 Threads oder so viele Stationen wie vorhanden
    print(f"Starte die Verknüfung Historsich mit Aktuell für {len(station_ids)} Stationen mit {max_workers} Threads.")
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        #Jede Station wird parallel heruntergeladen
        future_to_station = {executor.submit(combine_historic_recent, station_id): station_id for station_id in station_ids}        
        for future in concurrent.futures.as_completed(future_to_station):
            station_id = future_to_station[future]
            try:
                future.result()  #Funktion ausführen und Fehler abfangen
                print(f"Dateien verknüpft Historsich mit Aktuell für Station {station_id}.")
            except Exception as e:
                print(f"Fehler beim Verknüfung Historsich mit Aktuell für Station {station_id}: {e}")

In [74]:
def combine_all_station_data_review(station_id):
    computing_folder_station = os.path.join(computing_folder, station_id)
    station_folder_station = os.path.join(station_folder, station_id) 
    #os.makedirsrs(station_folder_station, exist_ok=True)  
    #Suche nach Dateien mit dem Suffix "_combined" 
    combined_files = [f for f in os.listdir(computing_folder_station) if f.endswith(f"_{station_id}_combined.txt")]
    all_data_frames = []
    #print(f"Combined Files: {combined_files}")
    for file in combined_files:
        file_path = os.path.join(computing_folder_station, file)
        
        #Lade Daten aus Datei und füge sie zur Liste
        try:
            df = pd.read_csv(file_path, delimiter=";", parse_dates=["MESS_DATUM"], date_format="%Y%m%d%H")
            all_data_frames.append(df)
            print(f"Daten hinzugefügt von: {file_path}")
        except Exception as e:
            print(f"Fehler beim Laden der Datei {file}: {e}")

    #print("Alle Dateien geladen")
    #Wenn geladen wurden -> kombiniere
    if all_data_frames:
        combined_data = all_data_frames[0]
        for df in all_data_frames[1:]:
            #Test MESS_DATUM als Datum
            df["MESS_DATUM"] = pd.to_datetime(df["MESS_DATUM"], format="%Y%m%d%H", errors="coerce")                
            #Daten zusammenführen
            combined_data = pd.merge(combined_data, df, on=[  "MESS_DATUM"], how="outer")

        #Sortieren und doppelte löschen
        combined_data = combined_data.sort_values(by=[  "MESS_DATUM"]).drop_duplicates(subset=[  "MESS_DATUM"], keep='last')
        combined_data["MESS_DATUM"] = combined_data["MESS_DATUM"].dt.strftime("%Y%m%d%H")
        
        # Header ändern
        header_mapping = {
            "STATIONS_ID": "STATIONS_ID",
            "MESS_DATUM": "date",
            "V_N_I": "Wolken_Interp",
            "V_N": "clouds",
            "P": "stationPressure_hPa",
            "P0": "surfacePressure_hPa",
            "SD_SO": "sunshine_min",
            "TT_TU": "T_temperature_C",
            "RF_TU": "humidity_Percent",
            "F": "wind_speed_ms",
            "D": "wind_direction_degree",
            "R1": "precipitationTotal_mm",
            "RS_IND": "precipitation_indicator"

        }
    
        combined_data.rename(columns=header_mapping, inplace=True)

        #Speichern in Datei
        final_filename = os.path.join(station_folder_station, f"{station_id}_data_combined.csv")
        combined_data.to_csv(final_filename, sep=",", index=False)
        print(f"Alle kombinierten Daten für Station {station_id} gespeichert in: {final_filename}")

    else:
        print(f"Keine kombinierten Dateien für Station {station_id} gefunden.")

In [75]:
def start_combine_all_station_data_review(station_ids):
    max_workers = min(os.cpu_count(), len(station_ids))  #Maximal 10 Threads oder so viele Stationen wie vorhanden
    print(f"Starte die Verknüfung aller Daten für {len(station_ids)} Stationen mit {max_workers} Threads.")
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        #Jede Station wird parallel heruntergeladen
        future_to_station = {executor.submit(combine_all_station_data_review, station_id): station_id for station_id in station_ids}        
        for future in concurrent.futures.as_completed(future_to_station):
            station_id = future_to_station[future]
            try:
                future.result()  #Funktion ausführen und Fehler abfangen
                print(f"Dateien verknüpft aller Daten für Station {station_id}.")
            except Exception as e:
                print(f"Fehler beim Verknüfung aller Daten für Station {station_id}: {e}")

Wetter Forecastfunktionen:

In [76]:
def get_weather_data_for_station_forecast(station_id, station_place):
    params = {
        "stationIds": station_id
    }
    #Anfrage vorbereiten
    request = requests.Request("GET", url_forecast, headers=headers, params=params)
    prepared_request = request.prepare()
    
    response = requests.Session().send(prepared_request)
    #Ausgabeordener checken
    #os.makedirs(computing_folder, exist_ok=True)
    #os.makedirs(station_folder, exist_ok=True)
    if response.status_code == 200:
        data = response.json()
        
        filename = os.path.join(computing_folder, f"weather_forecast_{station_place}.json")
        with open(filename, "w") as file:
            json.dump(data, file, indent=4)
        print(f"Die Wettervorhersage wurde in {filename} gespeichert.")
        
        #JSON-Daten laden und verarbeiten
        with open(filename) as file:
            data = json.load(file)
        
        for station_id, station_data in data.items():
            forecast_data = station_data["forecast1"]
            start_time = forecast_data["start"]
            time_step = forecast_data["timeStep"]

            date = [datetime.utcfromtimestamp((start_time + i * time_step) / 1000) for i in range(len(forecast_data["temperature"]))]
            
            variables = {
                "T_temperature_C": forecast_data.get("temperature", []),
                "T_temperature_standarddeviation_C": forecast_data.get("temperatureStd", []),
                "precipitationTotal_mm": forecast_data.get("precipitationTotal", []),
                "sunshine_min": forecast_data.get("sunshine", []),
                "dewPoint2m": forecast_data.get("dewPoint2m", []),
                "surfacePressure_hPa": forecast_data.get("surfacePressure", []),
                "humidity_Percent": forecast_data.get("humidity", []),
                "isDay_bool": forecast_data.get("isDay", []),
                #"icon": forecast_data.get("icon", []),
                #"icon1h": forecast_data.get("icon1h", [])
            }
            
            #Alle Listen auf gleiche Länge bringen
            max_length = max(len(date), *(len(values) for values in variables.values()))
            date.extend([None] * (max_length - len(date)))  # date auf max. Länge auffüllen
            for key, values in variables.items():
                variables[key].extend([None] * (max_length - len(values)))  # Werte-Listen auffüllen
            
            # DataFrame erstellen
            df = pd.DataFrame({
                "date": date,
                **variables
            })

            #DataFrame Temperatur von ZehntelGrad in Grad             
            df["T_temperature_C"] = df["T_temperature_C"].apply(lambda x: x / 10 if pd.notnull(x) else x)
            df["T_temperature_standarddeviation_C"] = df["T_temperature_standarddeviation_C"].apply(lambda x: x / 10 if pd.notnull(x) else x)
            df["surfacePressure_hPa"] = df["surfacePressure_hPa"].apply(lambda x: x / 10 if pd.notnull(x) else x)
            df["humidity_Percent"] = df["humidity_Percent"].apply(lambda x: x / 10 if pd.notnull(x) else x)

            #Date ins richte Foramt konvertieren
            df["date"] = df["date"].apply(lambda x: x.strftime("%Y%m%d%H"))

            df.to_csv(os.path.join(station_folder, f"weather_forecast_{station_place}.csv"), index=False)
            print(f"Die Wettervorhersage wurde in weather_forecast_{station_place}.csv konvertiert")
    else:
        print(f"Fehler bei der Anfrage: {response.status_code}")

In [77]:
def download_weatherforecast_data_for_all_stations_forecast(station_ids, station_places):
    for (station_id , station_place) in zip(station_ids, station_places):
        print(f"Starte den Download für Station {station_id}...")
        get_weather_data_for_station_forecast(station_id, station_place)
        print()

In [78]:
def remove_columns_forecast():
    print("Starte den Spaltenentfernumg")
    forecast_files = [f for f in os.listdir(station_folder) if f.startswith("weather_forecast_")]  
    print(f"File: {forecast_files}...")
    for file in forecast_files:
        print(f"Starte den Spaltenentfernumg für {file}...")
        file_path = os.path.join(station_folder, file)
        
        try:
            df = pd.read_csv(file_path, delimiter=",", skipinitialspace=True)  
            print(f"Spalten im DataFrame: {list(df.columns)}")          
            #Entferne die Spalten, ganz oben definiert
            df = df.drop(columns=[col for col in columns_remove_forecast if col in df.columns])     
            #Speichere modifizierte Datei
            df.to_csv(file_path, sep=",", index=False)
            print(f"Spalten aus {file} entfernt.")
        
        except Exception as e:
            print(f"Fehler beim Verarbeiten der Datei {file}: {e}")

Starte Download und Verarbeitung der Wetterdaten

In [79]:
start = time.time()
#Erstelle die Ordner
create_folder()

#Starte Rückblick-Download
download_weather_data_for_all_stations_review(station_ids_r)

end = time.time()
verstrichene_zeit = end - start
print(f'Ausführungszeit: {verstrichene_zeit} Sekunden')

#Starte Vorhersagen-Download
download_weatherforecast_data_for_all_stations_forecast(station_ids_f, station_place)
remove_columns_forecast()

end = time.time()
verstrichene_zeit = end - start
print(f'Ausführungszeit: {verstrichene_zeit} Sekunden')

#Kombiniere die historischen und vorhergesagten Daten
start_combine_historic()
end = time.time()
verstrichene_zeit = end - start
print(f'Ausführungszeit: {verstrichene_zeit} Sekunden')
#Alle Stationen kombinieren
combine_all_stations()
combine_forecast()

end = time.time()
verstrichene_zeit = end - start
print(f'Ausführungszeit: {verstrichene_zeit} Sekunden')

Starte den Download für 3 Stationen mit 3 Threads.
Speicherort ./weather/computing_folder\01262, computing_folder ./weather/computing_folder, station_id 01262
Speicherort ./weather/computing_folder\01975, computing_folder ./weather/computing_folder, station_id 01975
Speicherort ./weather/computing_folder\02667, computing_folder ./weather/computing_folder, station_id 02667
Lade herunter: https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/hourly/air_temperature/historical/stundenwerte_TU_01975_19490101_20231231_hist.zip
Lade herunter: https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/hourly/air_temperature/historical/stundenwerte_TU_01262_19920517_20231231_hist.zip
Lade herunter: https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/hourly/air_temperature/historical/stundenwerte_TU_02667_19600101_20231231_hist.zip
Daten geladen für: produkt_tu_stunde_19920517_20231231_01262.txt
Daten geladen für: produkt_tu_stund