# Datenquellen und Datenbank
In diesem Notebook werden die Datenquellen beschrieben sowie deren Herkunft. Aufgrund der grossen Datenmengen wird ein Data Warehouse angelegt. Dazu werden die Daten in eine SQLite Datenbank geladen. Dies erlaubt in der späteren Auswertungen vordefinierte Views zu erstellen und diese für die weitere Verwendung zu nutzen.

> !! Dieses Notebook muss zuerst ausgeführt werden, damit die Datenbank erstellt wird. Sie ist zu gross um diese direkt in Github hochzuladen. !!

## Eingesetzte Module
Für dieses Notebook werden folgende Module eingesetzt:
- __sqlite3__
    Wird für die Datenbank verwendet
- __pandas__ 
    Wird gebraucht um die CSV Dateien zu lesen und als Dataframe in die SQLite Datenbank zu laden
- __numpy__
    Wird benötigt um die NaN Werte in den Dataframes mit NULL zu ersetzen
- __glob__
    Wird eingesetzt um gesamte Folders in einem Loop in die SQLite Datenbank zu laden

In [30]:
#Import Modules
import os, sqlite3
import pandas as pd
import numpy as np
import glob

## Datenbank
Zur Untersuchung, welche Einflussfaktoren den Stromverbrauch der Gemeinden im Kanton Luzern beeinflussen können, wird auf mehrere Datenquellen zurückgegriffen. Die Datenquellen sind nachfolgend aufgelistet und werden im weiteren Verlauf beschrieben:
- Smartmeter Daten der CKW AG
- Demografische Daten des Bundes
- Solarkraftwerke Daten des Bundes
- Historische Meteo Daten des Bundes
- Gemeindenamen und BFS-ID von Swisstopo

Die genannten Datenquellen werden mit folgendem ERD in die SQLite Datenbank geladen.

> ![ERD-Diagramm](./DATA/ERD_DB_v3.png)

Die Erläuterungen, zu den einzelnen Tabellen, folgt bei den jeweiligen Datenimporten.

### Eigenheiten der SQLite Datenbank
Die SQLite Datenbank hat gewisse Einschränkungen und Eigenheiten die beachtet werden müssen. Dies weil SQLite nicht alle Formate oder Datentypen unterstützt. Die relevanten Aspekte werden in diesem Kapitel kurz erläutert.

__Strings__
Damit SQLite die Daten als String erkennt müssen diese in "" eingebettet werden. Ansonsten wird eine Fehlermeldung ausgegeben. Die Datentypen aus den Pandas Dataframes werden nicht in die SQLite Datenbank mitübergeben.

__Datum__
SQLite kennt keinen Datum-Datentyp. Dementsprechend müssen die Datum-/Zeitstempel in einem spezifischen String-Format gespeichert werden. Eine entsprechende Umformung muss daher bereits vor dem Import erfolgen, ansonsten funktionieren die SQLite Datum-/Zeitfunktionen nicht bei den abfragen. Die unterstützten Formate sind in dieser [Dokumentation](https://www.sqlite.org/lang_datefunc.html) gelistet. Für diese Arbeit sind folgende Formate relevant:
- YYYY-MM-DDTHH:MM:SS.SSS 
- YYYY-MM-DD HH:MM

__Automatische ID__
Damit automatisch ein eindeutiger Primary Key generiert wird muss beim Import der Daten, der Platzhalter für das ID-Attribut leergelassen werden und mit NULL gefüllt werden. So wird automatisch ein Primary Key vergeben.

__No Value Handling__
Leere Werte in den Datenquellen welche mit nan, NaN, na, etc. mit NULL ersetzt werden. Sonst wird der Wert als String interpretiert und gibt einen Fehler aus, wenn es sich um numerische Werte handelt.

### Erstellen des Datenbank Files
Hier wird die Datenbank angelegt. Ist das File nicht vorhanden, wird dieses automatisch angelegt.

In [31]:
# creating file (path) name
dbfile = './DATA/BINA_DATA.db' 

# Test if the database file is available in the colab workspace
if os.path.exists(dbfile):
    # Create database (file) and Open a (SQL) connection 
    connection = sqlite3.connect(dbfile)
    # Create a data cursor to exchange information between Python and SQLite
    cursor = connection.cursor()
else:
    print("Angegebene Database wurde nicht gefunden")
    # Create database (file) and Open a (SQL) connection 
    connection = sqlite3.connect(dbfile)
    # Create a data cursor to exchange information between Python and SQLite
    cursor = connection.cursor()
    #sys.exit(0)


Angegebene Database wurde nicht gefunden


### Tabellen erstellen
Basierend auf dem ERD werden die Tabellen in der Datenbank angelegt. Entsprechend werden die Primary Keys definierte und die Abhängigkeiten referenziert.

In [32]:
# Tabelle erstellen
sql = ["CREATE TABLE city (id INTEGER PRIMARY KEY, plz string, cityName string, bfsID string, lawCityName string, kantonkuerzel string)",
"CREATE TABLE smartmeter (id INTEGER PRIMARY KEY, plz REFERENCES city(plz) ON UPDATE CASCADE, timestamp string, anzMeter int,valueKwh float)",
"CREATE TABLE solarPlants (id INTEGER PRIMARY KEY, xtfID string, plz string, totalPower float, mainCategory string, subCategory string, plantCategory string,  _x int,  _y int, FOREIGN KEY (plz) REFERENCES city(plz) ON UPDATE CASCADE, FOREIGN KEY (subCategory) REFERENCES subCategory(id) ON UPDATE CASCADE, FOREIGN KEY (mainCategory) REFERENCES mainCategory(id) ON UPDATE CASCADE, FOREIGN KEY (plantCategory) REFERENCES plantCategory(id) ON UPDATE CASCADE)",
"CREATE TABLE mainCategory (id string PRIMARY KEY, de string, fr string, it string, en string)",
"CREATE TABLE plantCategory (id string PRIMARY KEY, de string, fr string, it string, en string)",
"CREATE TABLE subCategory (id string PRIMARY KEY, de string, fr string, it string, en string)",
"CREATE TABLE indicator (id string PRIMARY KEY, descr string)",
"CREATE TABLE unit (id string,  mes string)",
"CREATE TABLE demoValue (id INTEGER PRIMARY KEY, bfsID string, period string, indicator string, unit string, value float, FOREIGN KEY (bfsID) REFERENCES city(bfsID) ON UPDATE CASCADE, FOREIGN KEY (indicator) REFERENCES indicator (id) ON UPDATE CASCADE, FOREIGN KEY (unit) REFERENCES unit(id) ON UPDATE CASCADE)",
"CREATE TABLE meteoParameter (parameterID string PRIMARY KEY , measure string, description string)",
"CREATE TABLE meteoStations (stn string PRIMARY KEY, stnName string, lawCityName string,  datasource string, bfsID string, coEast string, coNorth string, coLength string, coWide string, FOREIGN KEY (lawCityName) REFERENCES city (lawCityName) ON UPDATE CASCADE, FOREIGN KEY (bfsID) REFERENCES city (bfsID) ON UPDATE CASCADE)",
"CREATE TABLE meteoData (id INTEGER PRIMARY KEY, meteoStation string, meteoParameter string, dataTime string, value float, FOREIGN KEY (meteoStation) REFERENCES meteoStations (stn) ON UPDATE CASCADE, FOREIGN KEY (meteoParameter) REFERENCES meteoParameter (parameterID) ON UPDATE CASCADE)",
"CREATE TABLE meteoParamBfs (id INTEGER PRIMARY KEY, bfsID string, meteoParameter string, meteoStation string, FOREIGN KEY (bfsID) REFERENCES city (bfsID) ON UPDATE CASCADE, FOREIGN KEY (meteoParameter) References meteoParameter (parameterID) ON UPDATE CASCADE, FOREIGN KEY (meteoStation) REFERENCES meteoStations (stn) ON UPDATE CASCADE)",
"CREATE TABLE plzBfsMapping (id INTEGER PRIMARY KEY, plz string, bfsID string, cityName string, lawCityName string)"]

for code in sql:
    cursor.execute(code)
    

### Views erstellen
Diese Views helfen in der späteren Datenanalyse und können direkt in ein Dataframe geladen werden für die weitere Verwendung.

> <font color=red>Beschreibungen der Views</font>

In [33]:
# Views erstellen
sql = [ "CREATE VIEW population AS SELECT id, bfsID, period, value FROM demoValue WHERE demoValue.indicator == 'Ind_01_01';",
        "CREATE VIEW population_density AS SELECT id, bfsID, period, value FROM demoValue WHERE demoValue.indicator == 'Ind_01_03';",
        "CREATE VIEW areaTotal AS SELECT id, bfsID, period, value FROM demoValue WHERE demoValue.indicator == 'Ind_04_01';",
        "CREATE VIEW areaSettlement AS SELECT id, bfsID, period, value FROM demoValue WHERE demoValue.indicator == 'Ind_04_02';",
        "CREATE VIEW areaAgricultural AS SELECT id, bfsID, period, value FROM demoValue WHERE demoValue.indicator == 'Ind_04_04';",
        "CREATE VIEW areaUnproductive AS SELECT id, bfsID, period, value FROM demoValue WHERE demoValue.indicator == 'Ind_04_07';",
        "CREATE VIEW keyFiguresPopulation as SELECT p.bfsID, p.period, p.value as population, pd.value as populationDensity FROM population p LEFT JOIN population_density pd ON p.bfsID = pd.bfsID AND p.period = pd.period;",
        "CREATE VIEW keyFiguresArea as SELECT a.bfsID, a.period as periodTotal, a.value as total,ase.period as periodSettlement, ase.value as settlement, aa.period as periodAgricultural, aa.value as agricultural, au.period as periodUnproductive, au.value as unproductive FROM areaTotal a LEFT JOIN areaSettlement ase ON a.bfsID = ase.bfsID LEFT JOIN areaAgricultural aa ON a.bfsID = aa.bfsID LEFT JOIN areaUnproductive au ON a.bfsID = au.bfsID;",
        "CREATE VIEW sumSmartmeter as SELECT s.plz as plz, bfsID, SUM(valueKwh) as 'kWh' FROM smartmeter as s LEFT JOIN (SELECT plz, bfsID FROM city GROUP BY plz) as c ON s.plz = c.plz GROUP BY bfsID;",
        "CREATE VIEW meteoStationsParameter as SELECT DISTINCT meteoStations.bfsID, meteoStation, meteoParameter FROM meteoData LEFT JOIN meteoStations on meteoStations.stn = meteoData.meteoStation;"
       ]

for code in sql:
    cursor.execute(code)

In [34]:
# City Daten einlesen als Dataframe und in DB sichern
city = pd.read_csv("./DATA/city_directory/AMTOVZ_CSV_LV95.csv", delimiter=";")

for index, row in city.iterrows():
    ort = '"' + row["Ortschaftsname"] + '"'
    lawCityName = '"' + row["Gemeindename"] + '"'
    if isinstance(row["Kantonskürzel"], str):
        kantonkuerzel = '"' + row["Kantonskürzel"] + '"'
    else:
        kantonkuerzel = "NULL"
    sql = "INSERT INTO city VALUES(NULL,{},{},{},{},{})".format(row["PLZ"], ort, row["BFS-Nr"], lawCityName, kantonkuerzel)
    cursor.execute(sql)

connection.commit()

In [35]:
# Solar Anlagen Daten einlesen als Dataframe und in DB sichern
plantCategory = pd.read_csv("./DATA/solar_powerplants/PlantCategoryCatalogue.csv", delimiter=",")
plantCategory.head()

for index, row in plantCategory.iterrows():
    catalogueId = '"' + row["Catalogue_id"] + '"'
    de = '"' + row["de"] + '"'
    fr = '"' + row["fr"] + '"'
    it = '"' + row["it"] + '"'
    en = '"' + row["en"] + '"'
    sql = "INSERT INTO plantCategory VALUES({},{},{},{},{})".format(catalogueId, de, fr, it, en)
    cursor.execute(sql)

connection.commit()

In [36]:
# Importieren der Hauptkategorien
mainCategory = pd.read_csv("./DATA/solar_powerplants/MainCategoryCatalogue.csv", delimiter=",")
mainCategory.head()

for index, row in mainCategory.iterrows():
    catalogueId = '"' + row["Catalogue_id"] + '"'
    de = '"' + row["de"] + '"'
    fr = '"' + row["fr"] + '"'
    it = '"' + row["it"] + '"'
    en = '"' + row["en"] + '"'
    sql = "INSERT INTO mainCategory VALUES({},{},{},{},{})".format(catalogueId, de, fr, it, en)
    cursor.execute(sql)

connection.commit()

In [37]:
# Importieren der Unterkategorien
subCategory = pd.read_csv("./DATA/solar_powerplants/SubCategoryCatalogue.csv", delimiter=",")
subCategory.head()

for index, row in subCategory.iterrows():
    catalogueId = '"' + row["Catalogue_id"] + '"'
    de = '"' + row["de"] + '"'
    fr = '"' + row["fr"] + '"'
    it = '"' + row["it"] + '"'
    en = '"' + row["en"] + '"'
    sql = "INSERT INTO subCategory VALUES({},{},{},{},{})".format(catalogueId, de, fr, it, en)
    cursor.execute(sql)

connection.commit()

In [38]:
# Importieren der Solar Kraftanlagen
electricityProductionPlant = pd.read_csv("./DATA/solar_powerplants/ElectricityProductionPlant.csv", delimiter=",")
electricityProductionPlant.head()
electricityProductionPlant.replace(np.nan, "NULL", inplace=True)

for index, row in electricityProductionPlant.iterrows():
    mainCategoryId = '"' + row["MainCategory"] + '"'
    subCategoryId = '"' + row["SubCategory"] + '"'
    xCor = row["_x"]
    yCor = row["_y"]
    
    if isinstance(row["PlantCategory"], str):
        plantCategoryId = '"' + row["PlantCategory"] + '"'
    else:
        plantCategoryId = "NULL"
        
    sql = "INSERT INTO solarPlants VALUES(NULL,{},{},{},{},{},{},{},{})".format(row["xtf_id"], row["PostCode"], row["TotalPower"], mainCategoryId, subCategoryId, plantCategoryId, xCor, yCor)
    cursor.execute(sql)
    
connection.commit()

In [39]:
#Importieren Demografie Daten Indikator
demoIndicator = pd.read_csv("./DATA/key_figures_communities/Indicator_DemoData.csv", delimiter=";")

for index, row in demoIndicator.iterrows():
    indicatorId = '"' + row["INDICATORS"] + '"'
    de = '"' + row["DE"] + '"'
    
    sql = "INSERT INTO indicator VALUES({},{})".format(indicatorId, de)
    cursor.execute(sql)
    
connection.commit()

In [40]:
#Importieren Unit Messeinheit
unitMeas = pd.read_csv("./DATA/key_figures_communities/unit_mes_DemoData.csv", delimiter=";")

for index, row in unitMeas.iterrows():
    unitMeasId = '"' + row["UNIT_MES"] + '"'
    unit = '"' + row["Einheit"] + '"'
    
    sql = "INSERT INTO unit VALUES({}, {})".format(unitMeasId, unit)
    cursor.execute(sql)

connection.commit()

In [41]:
#Importieren demoValue
demoValue = pd.read_csv("./DATA/key_figures_communities/ts-x-21.03.01.csv", delimiter=";", dtype={'PERIOD_REF': str, 'PERIOD_COMP': str, 'REGION': str})
demoValue.replace("CH", 0, inplace=True)
demoValue.replace(np.nan, "NULL", inplace=True)

for index, row in demoValue.iterrows():
    period = row["PERIOD_REF"]
    if isinstance(period, str):
        period = '"' + period + '"'
    else:
        period = str(period)
        period = '"' + period + '"'
        
    indicator = '"' + row["INDICATORS"] + '"'
    unit = '"' + row["UNIT_MES"] + '"'
    
    sql = "INSERT INTO demoValue VALUES(NULL,{},{},{},{},{})".format(row["CODE_REGION"], period, indicator, unit, row["VALUE"])
    cursor.execute(sql)
    
connection.commit()

In [42]:
# Importieren SmartMeter DATA
path = './DATA/smartmeter'
csv_files = glob.glob(path + '/*.csv.gz')
df_list = (pd.read_csv(file) for file in csv_files)

smartmeter_df = pd.concat(df_list, ignore_index=True)
smartmeter_df["timestamp"] = '"' + smartmeter_df["timestamp"] + '"'

for index, row in smartmeter_df.iterrows():
    sql = "INSERT INTO smartmeter VALUES(NULL,{},{},{},{})".format(row["area_code"], row["timestamp"], row["num_meter"], row["value_kwh"])
    cursor.execute(sql)
    
connection.commit()


In [43]:
#Test Date Time and manipulation

#input = '202001240000'
#output = '%Y%m%d%H%M'

#date = input.strftime('%Y-%m-%d %H:%M')
#date = datetime.datetime.strptime('202001240000', '%Y%m%d%H%M').strftime('%Y-%m-%d %H:%M')

#df = pd.DataFrame({'date':['2020012400', '2020012400']})

#print(df)

#df['date'] += '00'

#print(df)

In [44]:
#Import Meteo Parameter
meteoParameter = pd.read_csv("./DATA/meteo/parameter.csv", delimiter=";")

meteoParameter.head()
meteoParameter["parameterID"] = '"' + meteoParameter["parameterID"] + '"'
meteoParameter["measure"] = '"' + meteoParameter["measure"] + '"'
meteoParameter["description"] = '"' + meteoParameter["description"] + '"'

for index, row in meteoParameter.iterrows():
    sql = "INSERT INTO meteoParameter VALUES({}, {}, {})".format(row["parameterID"], row["measure"], row["description"])
    cursor.execute(sql)

connection.commit()

In [45]:
#Import Meteo Stations
meteoStations = pd.read_csv("./DATA/meteo/meteoStation.csv", delimiter=";")

meteoStations["stn"] = '"' + meteoStations["stn"] + '"'
meteoStations["stnName"] = '"' + meteoStations["stnName"] + '"'
meteoStations["lawCityName"] = '"' + meteoStations["lawCityName"] + '"'
meteoStations["datasource"] = '"' + meteoStations["datasource"] + '"'
meteoStations["coLength"] = '"' + meteoStations["coLength"] + '"'
meteoStations["coWide"] = '"' + meteoStations["coWide"] + '"'

for index, row in meteoStations.iterrows():
    sql = "INSERT INTO meteoStations VALUES({}, {}, {}, {}, {}, {}, {}, {}, {})".format(row["stn"], row["stnName"], row["lawCityName"], row["datasource"], row["bfsId"], row["coEast"], row["coNorth"], row["coLength"], row["coWide"])
    cursor.execute(sql)

connection.commit()

In [46]:
#Import MeteoData of parameter rre150h0
#Files bereinigen mit den ersten zwei Zeilen jeder Datei überspringen
#Datum umformatieren damit SQLite dies interpretieren kann
path = './DATA/meteo/data'
csv_files = glob.glob(path + '/*rre150h0*data.txt')
df_list = (pd.read_csv(file, delimiter=';', na_values= '-') for file in csv_files)

meteo_df = pd.concat(df_list, ignore_index=True)
meteo_df.replace(np.nan, "NULL", inplace=True)
meteo_df = meteo_df.astype({'time': str})
meteo_df['time'] = meteo_df['time'] + '00'
meteo_df['time'] = pd.to_datetime(meteo_df.time)
meteo_df['time'] = meteo_df['time'].dt.strftime('%Y-%m-%d %H:%M')

meteo_df['time'] = '"' + meteo_df['time'] + '"'
meteo_df['stn'] = '"' + meteo_df['stn'] + '"'


for index, row in meteo_df.iterrows():
    sql = "INSERT INTO meteoData VALUES(NULL,{},{},{},{})".format(row["stn"], '"rre150h0"', row["time"], row["rre150h0"])
    cursor.execute(sql)

connection.commit()

In [47]:
#Import MeteoData of parameter tre200h0
path = './DATA/meteo/data'
csv_files = glob.glob(path + '/*tre200h0*data.txt')
df_list = (pd.read_csv(file, delimiter=';', na_values= '-') for file in csv_files)

meteo_df = pd.concat(df_list, ignore_index=True)
meteo_df.replace(np.nan, "NULL", inplace=True)
meteo_df = meteo_df.astype({'time': str})
meteo_df['time'] = meteo_df['time'] + '00'
meteo_df = meteo_df.astype({'time': str})
meteo_df['time'] = meteo_df['time'] + '00'
meteo_df['time'] = pd.to_datetime(meteo_df.time)
meteo_df['time'] = meteo_df['time'].dt.strftime('%Y-%m-%d %H:%M')

meteo_df['time'] = '"' + meteo_df['time'] + '"'
meteo_df['stn'] = '"' + meteo_df['stn'] + '"'

for index, row in meteo_df.iterrows():
    sql = "INSERT INTO meteoData VALUES(NULL,{},{},{},{})".format(row["stn"], '"tre200h0"', row["time"], row["tre200h0"])
    cursor.execute(sql)

connection.commit()

In [48]:
#Import MeteoData of parameter sre000h0
path = './DATA/meteo/data'
csv_files = glob.glob(path + '/*sre000h0*data.txt')
df_list = (pd.read_csv(file, delimiter=';', na_values= '-') for file in csv_files)

meteo_df = pd.concat(df_list, ignore_index=True)
meteo_df.replace(np.nan, "NULL", inplace=True)
meteo_df = meteo_df.astype({'time': str})
meteo_df['time'] = meteo_df['time'] + '00'
meteo_df = meteo_df.astype({'time': str})
meteo_df['time'] = meteo_df['time'] + '00'
meteo_df['time'] = pd.to_datetime(meteo_df.time)
meteo_df['time'] = meteo_df['time'].dt.strftime('%Y-%m-%d %H:%M')

meteo_df['time'] = '"' + meteo_df['time'] + '"'
meteo_df['stn'] = '"' + meteo_df['stn'] + '"'

for index, row in meteo_df.iterrows():
    sql = "INSERT INTO meteoData VALUES(NULL,{},{},{},{})".format(row["stn"], '"sre000h0"', row["time"], row["sre000h0"])
    cursor.execute(sql)

connection.commit()

In [51]:
#Import MeteoParamBFS
meteoParams = pd.read_csv("./DATA/meteo/meteoStationBfsParameter.csv", delimiter=";")

meteoParams["meteoParameter"] = '"' + meteoParams["meteoParameter"] + '"'
meteoParams["meteoStation"] = '"' + meteoParams["meteoStation"] + '"'

for index, row in meteoParams.iterrows():
    sql = "INSERT INTO meteoParamBfs VALUES(NULL, {}, {}, {})".format(row["bfsID"], row["meteoParameter"], row["meteoStation"])
    cursor.execute(sql)

connection.commit()

In [50]:
mapping = pd.read_csv("./DATA/city_directory/mapping_plz_bfsid_lu.csv", delimiter=",")

for index, row in mapping.iterrows():
    cityName = '"' + row["cityName"] + '"'
    lawCityName = '"' + row["lawCityName"] + '"'
    
    sql = "INSERT INTO plzBfsMapping VALUES(NULL,{},{},{},{})".format(row["plz"], row["bfsId"], cityName, lawCityName)
    cursor.execute(sql)

connection.commit()