# Data Warehouse

In [None]:
import pandas as pd
import pymysql
from datetime import datetime
from tqdm import tqdm
from pathlib import Path
import sys
from config import DatenbankWetter, DatenbankUmsatz, DatenbankUmsatzGewerbe

## Erstellen der Tabellen

In [None]:
# Gewerbe
connection_mysql = pymysql.connect(
    host=DatenbankUmsatz.HOSTNAME,
    user=DatenbankUmsatz.USER,
    password=DatenbankUmsatz.PASSWORD,
    database=DatenbankUmsatz.DBNAME
)

try:
    with connection_mysql.cursor() as cursor:
        create_bezeichner_gastgewerbe_table = """
        CREATE TABLE IF NOT EXISTS Bezeichner_Gastgewerbe (
            GastgewerbeCode VARCHAR(50) PRIMARY KEY,
            GastgewerbeBezeichner VARCHAR(100) NOT NULL
        );
        """
        cursor.execute(create_bezeichner_gastgewerbe_table)

        create_gastro_umsatz_table = """
        CREATE TABLE IF NOT EXISTS Gastro_Umsatz (
            Datum DATE NOT NULL,
            Gastgewerbe VARCHAR(50) NOT NULL,
            Umsatz FlOAT NOT NULL,
            PRIMARY KEY (Datum, Gastgewerbe),
            FOREIGN KEY (Gastgewerbe) REFERENCES Bezeichner_Gastgewerbe(Gastgewerbe)
        );
        """
        cursor.execute(create_gastro_umsatz_table)
    
    connection_mysql.commit()
    
finally:
    connection_mysql.close()


In [None]:
# Wetter
connection = pymysql.connect(
    host=DatenbankWetter.HOSTNAME,
    user=DatenbankWetter.USER,
    password=DatenbankWetter.PASSWORD,
    database=DatenbankWetter.DBNAME
)

try:
    with connection.cursor() as cursor:
        
        # station_history Tabelle
        create_station_history_table = """
        CREATE TABLE IF NOT EXISTS station_history (
            Stations_ID INT PRIMARY KEY,
            Stationsname VARCHAR(255),
            Von_Datum DATE,
            Bis_Datum DATE
        );
        """
        cursor.execute(create_station_history_table)
        
        # operator_history Tabelle
        create_operator_history_table = """
        CREATE TABLE IF NOT EXISTS operator_history (
            id INT AUTO_INCREMENT PRIMARY KEY,
            Stations_ID INT,
            Betreibername VARCHAR(255),
            Von_Datum DATE,
            Bis_Datum DATE,
            CONSTRAINT fk_operator_station_id
                FOREIGN KEY (Stations_ID)
                REFERENCES station_history(Stations_ID)
        );
        """
        cursor.execute(create_operator_history_table)
        
        # weather_data Tabelle
        create_weather_data_table = """
        CREATE TABLE IF NOT EXISTS weather_data (
            id INT AUTO_INCREMENT PRIMARY KEY,
            MESS_DATUM DATE,
            STATIONS_ID INT,
            QN_3 INT,
            FX FLOAT,
            FM FLOAT,
            QN_4 INT,
            RSK FLOAT,
            RSKF FLOAT,
            SDK FLOAT,
            SHK_TAG FLOAT,
            NM FLOAT,
            VPM FLOAT,
            PM FLOAT,
            TMK FLOAT,
            UPM FLOAT,
            TXK FLOAT,
            TNK FLOAT,
            TGK FLOAT,
            CONSTRAINT fk_weather_station_id
                FOREIGN KEY (STATIONS_ID)
                REFERENCES station_history(Stations_ID)
        );
        """
        cursor.execute(create_weather_data_table)
        
        # metadata Tabelle
        create_metadata_table = """
        CREATE TABLE IF NOT EXISTS metadata (
            id INT AUTO_INCREMENT PRIMARY KEY,
            Stations_id INT,
            Von_Datum DATE,
            Bis_Datum DATE,
            Parameter VARCHAR(255),
            Einheit VARCHAR(255),
            Nullwert INT,
            Herkunft VARCHAR(255),
            CONSTRAINT fk_metadata_station_id
                FOREIGN KEY (Stations_id)
                REFERENCES station_history(Stations_ID)
        );
        """
        cursor.execute(create_metadata_table)
        
        # missing_data Tabelle
        create_missing_data_table = """
        CREATE TABLE IF NOT EXISTS missing_data (
            id INT AUTO_INCREMENT PRIMARY KEY,
            Stations_id INT,
            Datum DATE,
            Parameter VARCHAR(255),
            Fehldaten VARCHAR(255),
            CONSTRAINT fk_missing_data_station_id
                FOREIGN KEY (Stations_id)
                REFERENCES station_history(Stations_ID)
        );
        """
        cursor.execute(create_missing_data_table)
        
        # incorrect_data Tabelle
        create_incorrect_data_table = """
        CREATE TABLE IF NOT EXISTS incorrect_data (
            id INT AUTO_INCREMENT PRIMARY KEY,
            Stations_id INT,
            Datum DATE,
            Parameter VARCHAR(255),
            Fehlwerte VARCHAR(255),
            CONSTRAINT fk_incorrect_data_station_id
                FOREIGN KEY (Stations_id)
                REFERENCES station_history(Stations_ID)
        );
        """
        cursor.execute(create_incorrect_data_table)
        
        # geography_metadata Tabelle
        create_geography_metadata_table = """
        CREATE TABLE IF NOT EXISTS geography_metadata (
            id INT AUTO_INCREMENT PRIMARY KEY,
            Stations_id INT,
            von_datum DATE,
            bis_datum DATE,
            geoBreite FLOAT,
            geoLaenge FLOAT,
            Stationshoehe INT,
            CONSTRAINT fk_geography_metadata_station_id
                FOREIGN KEY (Stations_id)
                REFERENCES station_history(Stations_ID)
        );
        """
        cursor.execute(create_geography_metadata_table)
        
        # parameter_metadata Tabelle
        create_parameter_metadata_table = """
        CREATE TABLE IF NOT EXISTS parameter_metadata (
            id INT AUTO_INCREMENT PRIMARY KEY,
            Stations_id INT,
            von_datum DATE,
            bis_datum DATE,
            Parameter VARCHAR(255),
            Einheit VARCHAR(255),
            Wert INT,
            CONSTRAINT fk_parameter_metadata_station_id
                FOREIGN KEY (Stations_id)
                REFERENCES station_history(Stations_ID)
        );
        """
        cursor.execute(create_parameter_metadata_table)
        
    connection.commit()

finally:
    connection.close()


In [None]:
connection_mysql = pymysql.connect(
    host=DatenbankUmsatzGewerbe.HOSTNAME,
    user=DatenbankUmsatzGewerbe.USER,
    password=DatenbankUmsatzGewerbe.PASSWORD,
    database=DatenbankUmsatzGewerbe.DBNAME
)

try:
    with connection_mysql.cursor() as cursor:
        create_Umsaetze_table = """
        CREATE TABLE IF NOT EXISTS Umsätze (
            ID INT AUTO_INCREMENT PRIMARY KEY,
            Datum DATE,
            Umsatz FLOAT NOT NULL
        )
        """
        cursor.execute(create_Umsaetze_table)
    
    connection_mysql.commit()
finally:
    connection_mysql.close()


# Daten in die datenbank laden 

In [None]:
def update_and_populate_tables(umsatz_csv_file, gastgewerbe_csv_file):
    try:
        # Connect to MySQL database
        connection = pymysql.connect(
            host=DatenbankUmsatz.HOSTNAME,
            user=DatenbankUmsatz.USER,
            password=DatenbankUmsatz.PASSWORD,
            database=DatenbankUmsatz.DBNAME,
            port=DatenbankUmsatz.PORT
        )

        # Read Umsatz CSV into DataFrame
        umsatz_data = pd.read_csv(umsatz_csv_file)
        umsatz_data = umsatz_data[['Datum', 'Gastgewerbe', 'Umsatz']].dropna()

        # Read Gastgewerbe CSV into DataFrame
        gastgewerbe_data = pd.read_csv(gastgewerbe_csv_file)

        # Ensure the necessary columns are present in Gastgewerbe data
        if 'Gastgewerbe' in gastgewerbe_data.columns and 'Name' in gastgewerbe_data.columns:
            with connection.cursor() as cursor:
                # Update Gastro_Umsatz table
                print("Updating Gastro_Umsatz table...")
                for index, row in tqdm(umsatz_data.iterrows(), total=umsatz_data.shape[0], desc="Updating rows"):
                    sql = """
                    INSERT INTO Gastro_Umsatz (Datum, Gastgewerbe, Umsatz)
                    VALUES (%s, %s, %s)
                    ON DUPLICATE KEY UPDATE Umsatz = VALUES(Umsatz);
                    """
                    values = (row["Datum"], row['Gastgewerbe'], row['Umsatz'])
                    cursor.execute(sql, values)

                # Populate Bezeichner_Gastgewerbe table
                print("Populating Bezeichner_Gastgewerbe table...")
                for index, row in gastgewerbe_data.iterrows():
                    sql = """
                    INSERT INTO Bezeichner_Gastgewerbe (Gastgewerbe, Name)
                    VALUES (%s, %s)
                    ON DUPLICATE KEY UPDATE Name = VALUES(Name);
                    """
                    values = (row['Gastgewerbe'], row['Name'])
                    cursor.execute(sql, values)

            # Commit changes to the database
            connection.commit()
            print("Update and population successful.")

        else:
            print("Gastgewerbe CSV file does not contain required columns.")

    except Exception as error:
        print("Update and population failed:", error)

    finally:
        # Close the database connection
        if connection:
            connection.close()

# Example usage of the function
update_and_populate_tables('data/Umsatzdaten/Umsatz.csv', 'data/Umsatzdaten/Gastgewerbe.csv')


In [None]:
# upload umsatz gewerbe
# Gewerbe Umsatz 
gewerbe_umsatz = pd.read_csv("data/Gewerbeumsatzdaten/orders.csv")
gewerbe_umsatz = gewerbe_umsatz.groupby('date').sum().reset_index()
gewerbe_umsatz = gewerbe_umsatz[['date','price']]
gewerbe_umsatz


def update_db(df):
    try:
        print("Updating db table...")

        connection = pymysql.connect(
            host=datenbank_umsatz_gewerbe.HOSTNAME,
            user=datenbank_umsatz_gewerbe.USER,
            password=datenbank_umsatz_gewerbe.PASSWORD,
            database=datenbank_umsatz_gewerbe.DBNAME,
            port=datenbank_umsatz_gewerbe.PORT
        )

        with connection.cursor() as cur:
            for index, row in tqdm(df.iterrows(), total=df.shape[0], desc="Updating rows"):
                sql = """
                INSERT INTO Umsätze (Datum, Umsatz)
                VALUES (%s, %s)
                ON DUPLICATE KEY UPDATE Umsatz = VALUES(Umsatz);
                """
                values = (row["date"], row['price'])
                cur.execute(sql, values)

        connection.commit()
        print("Update was successful.")

    except Exception as error:
        print("Update failed:", error)

    finally:
        if connection:
            connection.close()


update_db(gewerbe_umsatz)
