# 🦭 Setup MariaDB.

### In this Notebook, we'll Setup our MariaDB Instance and Create Databases and Tables for our Data.

### P.S.: Since we already Formatted our Code that we previously wrote, we'll use some Functions to speed things up a bit.

### Import Necessary Libraries.

In [1]:
import json
import time

import numpy as np
import pandas as pd

from datetime import datetime

import mariadb as mdb

import sys

import os
from dotenv import load_dotenv

load_dotenv("../.env")

import sys

sys.path.append("../Scripts/")
sys.path.append("../")

import paths
import config
import sourcing
import featureengineering

### We'll probably need a single Database, but Multiple Tables.
### Specifically, We'll need One Table for Raw Data, one for Transformed Data and one for Predictions (might need more, but will figure out later).
### Now, let's connect to our MariaDB Instance (Running on the Container) and create our DB.

In [2]:
def ConnectMariaDB():
    # Connect to MariaDB Platform
    try:
        connection = mdb.connect(
            user = "dev",
            password = "development",
            host = "172.17.0.3",    #<-MariaDB Container IP Address (Running Locally, otherwise, you'd need to put the IP Address of the Machine hosting the Container)
            port = 3306,
            database = "AQIPredictions"
        )
        
    except mariadb.Error as e:
        print(f"Error connecting to MariaDB Platform: {e}")
        sys.exit(1)

    # Get Cursor
    cursor = connection.cursor()
    
    return cursor, connection

In [3]:
cursor, connection = ConnectMariaDB()


### Now, let's Fetch our Raw Data from Disk, create a Table and insert the Data.
### NOTE: it can also be done with SQLAlchemy, but I preferred this way.

In [4]:
GigaDF = sourcing.FetchFromDisk()

Fetching Raw Data from Disk in Raw Data Dir for Perugia
Got City ID PG
Fetching Raw Data from Disk in Raw Data Dir for Roma
Got City ID RM
Fetching Raw Data from Disk in Raw Data Dir for Bologna
Got City ID BO
Fetching Raw Data from Disk in Raw Data Dir for Trento
Got City ID TN
Fetching Raw Data from Disk in Raw Data Dir for Potenza
Got City ID PZ
Fetching Raw Data from Disk in Raw Data Dir for Firenze
Got City ID FI
Fetching Raw Data from Disk in Raw Data Dir for Ancona
Got City ID AN
Fetching Raw Data from Disk in Raw Data Dir for Catanzaro
Got City ID CZ
Fetching Raw Data from Disk in Raw Data Dir for Cagliari
Got City ID CA
Fetching Raw Data from Disk in Raw Data Dir for Venezia
Got City ID VE
Fetching Raw Data from Disk in Raw Data Dir for Genova
Got City ID GE
Fetching Raw Data from Disk in Raw Data Dir for Milano
Got City ID MI
Fetching Raw Data from Disk in Raw Data Dir for Bari
Got City ID BA
Fetching Raw Data from Disk in Raw Data Dir for Campobasso
Got City ID CB
Fetching R

In [5]:
GigaDF

Unnamed: 0,CityID,Date_GMT+1_Europe/Berlin,Temperature_2m,Relative_Humidity_2m,Dew_Point_2m,Precipitation,Pressure_msl,Surface_Pressure,Cloud_Cover,Wind_Speed_10m,...,Wind_Wirection_10m,Wind_Direction_100m,Soil_Temperature_0-7cm,Soil_Temperature_7-28cm,Soil_Temperature_28-100cm,Soil_Temperature_100-255cm,Soil_Moisture_0-7cm,Soil_Moisture_7-28cm,Soil_Moisture_28-100cm,EuropeanAQI
0,PG,2022-09-01T00:00,18.8,84,15.9,0.0,1012.6,958.8,77,5.4,...,127,165,21.5,24.0,22.7,20.5,0.254,0.201,0.144,29
1,PG,2022-09-01T01:00,19.1,83,16.0,0.0,1012.2,958.5,9,3.4,...,32,328,22.9,24.1,22.7,20.5,0.255,0.203,0.145,33
2,PG,2022-09-01T02:00,18.8,82,15.6,0.0,1011.8,958.0,80,4.2,...,31,29,22.1,24.0,22.7,20.5,0.255,0.203,0.145,33
3,PG,2022-09-01T03:00,18.2,85,15.6,0.0,1011.8,957.9,53,9.8,...,163,156,21.5,23.8,22.7,20.5,0.255,0.203,0.145,34
4,PG,2022-09-01T04:00,16.8,97,16.3,7.9,1012.2,958.1,100,1.6,...,27,256,20.7,23.6,22.7,20.5,0.358,0.203,0.145,34
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
13123,,2024-02-29T19:00,10.5,90,8.9,0.0,1006.2,1003.9,88,5.1,...,315,305,13.7,14.5,13.4,14.4,0.391,0.393,0.357,26
13124,,2024-02-29T20:00,10.4,90,8.8,0.0,1006.0,1003.7,99,4.2,...,340,320,13.0,14.4,13.4,14.4,0.391,0.392,0.358,28
13125,,2024-02-29T21:00,10.2,90,8.7,0.1,1005.8,1003.5,100,4.3,...,336,324,12.6,14.3,13.4,14.4,0.391,0.392,0.358,28
13126,,2024-02-29T22:00,10.5,92,9.3,1.4,1006.3,1004.0,100,6.0,...,115,113,12.3,14.1,13.4,14.4,0.405,0.392,0.358,29


In [6]:
GigaDF.info()

<class 'pandas.core.frame.DataFrame'>
Index: 262560 entries, 0 to 13127
Data columns (total 21 columns):
 #   Column                      Non-Null Count   Dtype  
---  ------                      --------------   -----  
 0   CityID                      262560 non-null  object 
 1   Date_GMT+1_Europe/Berlin    262560 non-null  object 
 2   Temperature_2m              262560 non-null  float64
 3   Relative_Humidity_2m        262560 non-null  int64  
 4   Dew_Point_2m                262560 non-null  float64
 5   Precipitation               262560 non-null  float64
 6   Pressure_msl                262560 non-null  float64
 7   Surface_Pressure            262560 non-null  float64
 8   Cloud_Cover                 262560 non-null  int64  
 9   Wind_Speed_10m              262560 non-null  float64
 10  Wind_Speed_100m             262560 non-null  float64
 11  Wind_Wirection_10m          262560 non-null  int64  
 12  Wind_Direction_100m         262560 non-null  int64  
 13  Soil_Temperature_0-7

In [7]:
#Keeping in mind that all Dates are in GMT+1 we get rid of the 'T' Character.
GigaDF["Date_GMT+1_Europe/Berlin"] = GigaDF["Date_GMT+1_Europe/Berlin"].apply(lambda x: x.replace("T", " "))

### Now we want to define our "Schema" for the DataBase Table.

In [8]:
'''
#First 2 are Strings, and so, defined Manually
DataFormat = {"CityID": "VARCHAR(2)", "Date_GMT1": "VARCHAR(16)"}

for col in GigaDF.columns:
    formattedx = x.replace("-", "_")
    
    if GigaDF[x].dtype != "object":
        if GigaDF[x].dtype  == "int64":
            DataFormat[formattedx] = "INTEGER"
        elif GigaDF[x].dtype  == "float64":
            DataFormat[formattedx] = "DOUBLE"
        else:
            DataFormat[formattedx] = "VARCHAR(50)"
'''

#First 2 are Strings, and so, defined Manually
DataFormat = {"CityID": "VARCHAR(2)", "Date_GMT1": "VARCHAR(16)"}

DataFormat.update({column.replace("-", "_"):  "INTEGER" if GigaDF[column].dtype == "int64"
                                                        else "DOUBLE" if GigaDF[column].dtype == "float64"
                                                        else "VARCHAR(50)"
                                                        for column in GigaDF.columns if GigaDF[column].dtype != "object"})

#Quick Check
if len(DataFormat) != len(GigaDF.columns):
    raise DataIntegrityError("There's been an Error in Setting Columns and DataTypes for MariaDB!!")
    
else:
    print("Check Passed, Data correctly Transformed in MariaDB Table!")

DataFormat

Check Passed, Data correctly Transformed in MariaDB Table!


{'CityID': 'VARCHAR(2)',
 'Date_GMT1': 'VARCHAR(16)',
 'Temperature_2m': 'DOUBLE',
 'Relative_Humidity_2m': 'INTEGER',
 'Dew_Point_2m': 'DOUBLE',
 'Precipitation': 'DOUBLE',
 'Pressure_msl': 'DOUBLE',
 'Surface_Pressure': 'DOUBLE',
 'Cloud_Cover': 'INTEGER',
 'Wind_Speed_10m': 'DOUBLE',
 'Wind_Speed_100m': 'DOUBLE',
 'Wind_Wirection_10m': 'INTEGER',
 'Wind_Direction_100m': 'INTEGER',
 'Soil_Temperature_0_7cm': 'DOUBLE',
 'Soil_Temperature_7_28cm': 'DOUBLE',
 'Soil_Temperature_28_100cm': 'DOUBLE',
 'Soil_Temperature_100_255cm': 'DOUBLE',
 'Soil_Moisture_0_7cm': 'DOUBLE',
 'Soil_Moisture_7_28cm': 'DOUBLE',
 'Soil_Moisture_28_100cm': 'DOUBLE',
 'EuropeanAQI': 'INTEGER'}

In [9]:
#Formatting as Single String
NameFormats = ["".join(f'{ColName} {ColFormat}, ' for ColName, ColFormat in DataFormat.items())]

In [10]:
#Taking out last ", " characters
NameFormats = NameFormats[-1][:-2]

In [11]:
NameFormats

'CityID VARCHAR(2), Date_GMT1 VARCHAR(16), Temperature_2m DOUBLE, Relative_Humidity_2m INTEGER, Dew_Point_2m DOUBLE, Precipitation DOUBLE, Pressure_msl DOUBLE, Surface_Pressure DOUBLE, Cloud_Cover INTEGER, Wind_Speed_10m DOUBLE, Wind_Speed_100m DOUBLE, Wind_Wirection_10m INTEGER, Wind_Direction_100m INTEGER, Soil_Temperature_0_7cm DOUBLE, Soil_Temperature_7_28cm DOUBLE, Soil_Temperature_28_100cm DOUBLE, Soil_Temperature_100_255cm DOUBLE, Soil_Moisture_0_7cm DOUBLE, Soil_Moisture_7_28cm DOUBLE, Soil_Moisture_28_100cm DOUBLE, EuropeanAQI INTEGER'

In [12]:
MariaDBTableName = "TestRawData"
    
cursor.execute(f'CREATE TABLE IF NOT EXISTS {MariaDBTableName} ({NameFormats})')
connection.commit()

In [13]:
cursor.execute('SHOW TABLES')

Tables = []

for x in cursor:
    Tables.append(x[0])
    
if MariaDBTableName not in Tables:
    raise TableCreationError("There's been an Error in Creating the RawData Table for MariaDB!!")
    
else:
    print("Check Passed, MariaDB RawData Table Correcly Created.!")

Check Passed, MariaDB RawData Table Correcly Created.!


In [14]:
#Generating our Query for inserting Values into the Table.
query = f'INSERT INTO {MariaDBTableName} VALUES ({", ".join(["%s"]*len(list(DataFormat.keys())))})'

In [15]:
#Now, this is Inefficient, and I'm well aware of that.
#Here's what the Numbers show: 
#Base machine configuration -
#2.7 GHz Dual-Core Intel Core i5
#16 GB 1867 MHz DDR3
#Flash Storage
#Results:
#cursor.execute: 250 Inserts to max 315 Inserts in one second
#cursor.executemany: 25,000 Inserts in one second
#However, what the Numbers do not show is that insertmany is unstable, and so, likely to break.
#As this is a Batch Scoring System, between Speed and Stability I pick Stability.

DataToInsert = []

for x in GigaDF.iterrows():
    DataToInsert.append(tuple(x[1]))

In [16]:
DataToInsert[0]

('PG',
 '2022-09-01 00:00',
 18.8,
 84,
 15.9,
 0.0,
 1012.6,
 958.8,
 77,
 5.4,
 12.7,
 127,
 165,
 21.5,
 24.0,
 22.7,
 20.5,
 0.254,
 0.201,
 0.144,
 29)

In [17]:
#Actually Inserting.
for x in DataToInsert:
    cursor.execute(query, x)

In [18]:
connection.commit()

In [19]:
#Checking the Data
cursor.execute(f'SELECT * FROM {MariaDBTableName}')

In [20]:
#Checking only the First Row of Fetched Data from Table.
#NOTE: Data is Returned as a list of Tuples.
[[*x] for x in cursor][0]

['PG',
 '2022-09-01 00:00',
 18.8,
 84,
 15.9,
 0.0,
 1012.6,
 958.8,
 77,
 5.4,
 12.7,
 127,
 165,
 21.5,
 24.0,
 22.7,
 20.5,
 0.254,
 0.201,
 0.144,
 29]

### Now, let's insert the Transformed Data.
### NOTE: It would be more suitable a Feature Store in Production Environments, but we'll do everything in a Database here.

In [21]:
#Fetching Transformed Data From Disk.
#Later on this will be done in a Pipeline.

TransformedGigaDF = pd.read_parquet(paths.TRANSFORMED_DATA_DIR / "TRANSFORMED_All_Cities_HistoricalData_01092022_29022024.parquet")

TransformedGigaDF

Unnamed: 0,Temperature_2m,Relative_Humidity_2m,Dew_Point_2m,Precipitation,Pressure_msl,Surface_Pressure,Cloud_Cover,Wind_Speed_10m,Wind_Speed_100m,Wind_Wirection_10m,...,IsHour_14,IsHour_15,IsHour_16,IsHour_17,IsHour_18,IsHour_19,IsHour_20,IsHour_21,IsHour_22,IsHour_23
0,18.8,84,15.9,0.0,1012.6,958.8,77,5.4,12.7,127,...,0,0,0,0,0,0,0,0,0,0
1,19.1,83,16.0,0.0,1012.2,958.5,9,3.4,3.4,32,...,0,0,0,0,0,0,0,0,0,0
2,18.8,82,15.6,0.0,1011.8,958.0,80,4.2,6.6,31,...,0,0,0,0,0,0,0,0,0,0
3,18.2,85,15.6,0.0,1011.8,957.9,53,9.8,15.8,163,...,0,0,0,0,0,0,0,0,0,0
4,16.8,97,16.3,7.9,1012.2,958.1,100,1.6,1.5,27,...,0,0,0,0,0,0,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
13123,10.5,90,8.9,0.0,1006.2,1003.9,88,5.1,8.8,315,...,0,0,0,0,0,1,0,0,0,0
13124,10.4,90,8.8,0.0,1006.0,1003.7,99,4.2,9.4,340,...,0,0,0,0,0,0,1,0,0,0
13125,10.2,90,8.7,0.1,1005.8,1003.5,100,4.3,9.3,336,...,0,0,0,0,0,0,0,1,0,0
13126,10.5,92,9.3,1.4,1006.3,1004.0,100,6.0,4.7,115,...,0,0,0,0,0,0,0,0,1,0


In [22]:
TransformedGigaDF.info()

<class 'pandas.core.frame.DataFrame'>
Index: 262560 entries, 0 to 13127
Data columns (total 50 columns):
 #   Column                      Non-Null Count   Dtype  
---  ------                      --------------   -----  
 0   Temperature_2m              262560 non-null  float64
 1   Relative_Humidity_2m        262560 non-null  int64  
 2   Dew_Point_2m                262560 non-null  float64
 3   Precipitation               262560 non-null  float64
 4   Pressure_msl                262560 non-null  float64
 5   Surface_Pressure            262560 non-null  float64
 6   Cloud_Cover                 262560 non-null  int64  
 7   Wind_Speed_10m              262560 non-null  float64
 8   Wind_Speed_100m             262560 non-null  float64
 9   Wind_Wirection_10m          262560 non-null  int64  
 10  Wind_Direction_100m         262560 non-null  int64  
 11  Soil_Temperature_0-7cm      262560 non-null  float64
 12  Soil_Temperature_7-28cm     262560 non-null  float64
 13  Soil_Temperature_28-

In [23]:
#Defining the Schema
DataFormat = {column.replace("-", "_"):  "INTEGER" if TransformedGigaDF[column].dtype == "int64"
                                                        else "DOUBLE" if TransformedGigaDF[column].dtype == "float64"
                                                        else "VARCHAR(50)"
                                                        for column in TransformedGigaDF.columns if TransformedGigaDF[column].dtype != "object"}

#Quick Check
if len(DataFormat) != len(TransformedGigaDF.columns):
    raise DataIntegrityError("There's been an Error in Setting Columns and DataTypes for MariaDB!!")
    
else:
    print("Check Passed, Data correctly Transformed in MariaDB Table!")

DataFormat

Check Passed, Data correctly Transformed in MariaDB Table!


{'Temperature_2m': 'DOUBLE',
 'Relative_Humidity_2m': 'INTEGER',
 'Dew_Point_2m': 'DOUBLE',
 'Precipitation': 'DOUBLE',
 'Pressure_msl': 'DOUBLE',
 'Surface_Pressure': 'DOUBLE',
 'Cloud_Cover': 'INTEGER',
 'Wind_Speed_10m': 'DOUBLE',
 'Wind_Speed_100m': 'DOUBLE',
 'Wind_Wirection_10m': 'INTEGER',
 'Wind_Direction_100m': 'INTEGER',
 'Soil_Temperature_0_7cm': 'DOUBLE',
 'Soil_Temperature_7_28cm': 'DOUBLE',
 'Soil_Temperature_28_100cm': 'DOUBLE',
 'Soil_Temperature_100_255cm': 'DOUBLE',
 'Soil_Moisture_0_7cm': 'DOUBLE',
 'Soil_Moisture_7_28cm': 'DOUBLE',
 'Soil_Moisture_28_100cm': 'DOUBLE',
 'EuropeanAQI': 'INTEGER',
 'IsSubRegion_Center': 'INTEGER',
 'IsSubRegion_North': 'INTEGER',
 'IsSubRegion_South': 'INTEGER',
 'IsSeason_Autumn': 'INTEGER',
 'IsSeason_Spring': 'INTEGER',
 'IsSeason_Summer': 'INTEGER',
 'IsSeason_Winter': 'INTEGER',
 'IsHour_0': 'INTEGER',
 'IsHour_1': 'INTEGER',
 'IsHour_2': 'INTEGER',
 'IsHour_3': 'INTEGER',
 'IsHour_4': 'INTEGER',
 'IsHour_5': 'INTEGER',
 'IsHour_6

In [24]:
#Formatting DataTypes as a Single String and taking out last 2 Characters (", ")
NameFormats = ["".join(f'{ColName} {ColFormat}, ' for ColName, ColFormat in DataFormat.items())]

NameFormats = NameFormats[-1][:-2]

NameFormats

'Temperature_2m DOUBLE, Relative_Humidity_2m INTEGER, Dew_Point_2m DOUBLE, Precipitation DOUBLE, Pressure_msl DOUBLE, Surface_Pressure DOUBLE, Cloud_Cover INTEGER, Wind_Speed_10m DOUBLE, Wind_Speed_100m DOUBLE, Wind_Wirection_10m INTEGER, Wind_Direction_100m INTEGER, Soil_Temperature_0_7cm DOUBLE, Soil_Temperature_7_28cm DOUBLE, Soil_Temperature_28_100cm DOUBLE, Soil_Temperature_100_255cm DOUBLE, Soil_Moisture_0_7cm DOUBLE, Soil_Moisture_7_28cm DOUBLE, Soil_Moisture_28_100cm DOUBLE, EuropeanAQI INTEGER, IsSubRegion_Center INTEGER, IsSubRegion_North INTEGER, IsSubRegion_South INTEGER, IsSeason_Autumn INTEGER, IsSeason_Spring INTEGER, IsSeason_Summer INTEGER, IsSeason_Winter INTEGER, IsHour_0 INTEGER, IsHour_1 INTEGER, IsHour_2 INTEGER, IsHour_3 INTEGER, IsHour_4 INTEGER, IsHour_5 INTEGER, IsHour_6 INTEGER, IsHour_7 INTEGER, IsHour_8 INTEGER, IsHour_9 INTEGER, IsHour_10 INTEGER, IsHour_11 INTEGER, IsHour_12 INTEGER, IsHour_13 INTEGER, IsHour_14 INTEGER, IsHour_15 INTEGER, IsHour_16 INTEG

In [25]:
#Creating the TransformedTable
MariaDBTableName = "TestTransformedData"
    
cursor.execute(f'CREATE TABLE IF NOT EXISTS {MariaDBTableName} ({NameFormats})')
connection.commit()

cursor.execute('SHOW TABLES')

Tables = []

for x in cursor:
    Tables.append(x[0])
    
if MariaDBTableName not in Tables:
    raise TableCreationError("There's been an Error in Creating the RawData Table for MariaDB!!")
    
else:
    print("Check Passed, MariaDB TransformedData Table Correcly Created.!")

Check Passed, MariaDB TransformedData Table Correcly Created.!


In [26]:
#Generating our Query for inserting Values into the Table.
query = f'INSERT INTO {MariaDBTableName} VALUES ({", ".join(["%s"]*len(list(DataFormat.keys())))})'

In [27]:
DataToInsert = []

for x in TransformedGigaDF.iterrows():
    DataToInsert.append(tuple(x[1]))

In [28]:
#Actually Inserting - Inefficient but Safe and Stable.
for x in DataToInsert:
    cursor.execute(query, x)

In [29]:
connection.commit()

In [30]:
#Checking the Data
cursor.execute(f'SELECT * FROM {MariaDBTableName}')

In [31]:
#Checking only the First Row of Fetched Data from Table.
#NOTE: Data is Returned as a list of Tuples.
[[*x] for x in cursor][0]

[18.8,
 84,
 15.9,
 0.0,
 1012.6,
 958.8,
 77,
 5.4,
 12.7,
 127,
 165,
 21.5,
 24.0,
 22.7,
 20.5,
 0.254,
 0.201,
 0.144,
 29,
 1,
 0,
 0,
 0,
 0,
 1,
 0,
 1,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0]

In [32]:
#cursor.execute("DROP TABLE RawData")
#cursor.execute("DROP TABLE TransformedData")
#connection.commit()

In [33]:
#cursor.execute("SHOW TABLES")