# Libraries

In [1]:
#pip install mysql-connector-python

In [2]:
#pip install PyYAML

In [3]:
import os
import mysql.connector
import yaml
from glob import iglob

from pandas import DataFrame

from math import floor
from numpy import array, split

# Global Parameters

In [4]:
yaml_filename = "mysql_credentials.yaml"
new_yaml_filnename = ""

for i in range(5):
    if not(os.path.exists(new_yaml_filnename)):
        new_yaml_filnename = "../"*i + yaml_filename
    else:
        yaml_filename = new_yaml_filnename        
        break

In [5]:
# For privacy reasons, user credentials are located in a separate .yaml file

try:
    mysql_credentials = yaml.safe_load(open(yaml_filename))
except:
    print('Login information is not available!!!')

mysql_user = mysql_credentials[0]['mysql_user']
mysql_pwd = mysql_credentials[0]['mysql_pwd']
mysql_database_name = mysql_credentials[0]['mysql_database_name']
print('Connection with MySQL database is ready!')

mydb = mysql.connector.connect(
  host="localhost"
  ,user=mysql_user
  ,password=mysql_pwd
  ,database='HDL_Project'
)

mycursor = mydb.cursor()    

Connection with MySQL database is ready!


# User-Defined Functions / Classes

In [6]:
def tablename_from_sqlq(sqlq):
    """
    Extract tablename from sql query
    """
    sqlq = sqlq.replace("FROM", "from")
    # Add a space at the end of query
    txt = sqlq + " "
    # Replace irrelevant characters
    txt = txt.replace("`", "")

    # Obtain table name from sql table
    index_from = txt.find("from")
    txt = txt[index_from + 5:]
    txt = txt[:txt.find(" ")]
    
    return txt    

In [7]:
def cols_from_sqlq(sqlq):
    sqlq = sqlq.replace("FROM", "from")
    sql_table = tablename_from_sqlq(sqlq)

    if sqlq.find("*") != -1:
        col_list = aux_qdata("show columns from {}".format(sql_table))
        col_names = [col_list[i][0] for i in range(len(col_list))]

    else:
        
        col_aux = sqlq[7:sqlq.find(" from")]
        col_names = [col.strip() for col in col_aux.split(",")]
        
    return col_names

In [8]:
def where_from_sqlq(sqlq):
    sqlq = sqlq.replace("WHERE", "where")
    
    return sqlq[sqlq.find("where"):]

In [9]:
#https://dev.mysql.com/doc/connector-python/en/connector-python-example-cursor-transaction.html

def aux_qdata(sqlq):
    """
    UDF to query data in raw format from a local MySQL RDBMS
    
    Input:
    "sqlq": Query (e.g. Select * from table)
    
    """
    mycursor.execute(sqlq)
    myresult = mycursor.fetchall()
    return myresult

In [10]:
def qdata(sqlq):
    """
    qdata queries data from MySQL RDBMS and returns it in a dataframe format
    , along with its corresponding column names. 
    
    Input:
    * `sql_table`: Table name
    * `sqlq`: Complete query (e.g. Select * from table where col1 = "val1")
    (!) Don't rename columns here (e.g. "Select col1 as colA ..."). 
    
    """
    col_names = cols_from_sqlq(sqlq)

    data = DataFrame(aux_qdata(sqlq))
    
    data.columns = col_names

    return data

In [11]:
class data_restructuring():
    """
    * Restructure_Raw_Data(): Function to restructure original raw data from SIMA to upload to MySQL database
    * 
    """
    
    def __init__(self, file_location = "Raw_SIMA_Data"):
        
        self.file_location = "/" + file_location +  "/"
        self.new_file_location = "/Restructured_" + self.file_location[1:]
        
        self.files_list = list(iglob('.{}*22.csv'.format(self.file_location)))

        self.data_type = ['F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F']
        self.cols = ['datetime','SE','NE','CE','NO','SO','NO2','NTE','NE2','SE2','SO2','SE3','SUR','NTE2','NE3']

        self.F_types = ['DATETIME NOT NULL', 'FLOAT NULL', 'FLOAT NULL', 'FLOAT NULL', 'FLOAT NULL', 'FLOAT NULL', 'FLOAT NULL', 'FLOAT NULL', 'FLOAT NULL', 'FLOAT NULL', 'FLOAT NULL', 'FLOAT NULL', 'FLOAT NULL', 'FLOAT NULL', 'FLOAT NULL']
        self.I_types = ['DATETIME NOT NULL', 'INT NULL', 'INT NULL', 'INT NULL', 'INT NULL', 'INT NULL', 'INT NULL', 'INT NULL', 'INT NULL', 'INT NULL', 'INT NULL', 'INT NULL', 'INT NULL', 'INT NULL', 'INT NULL']
        
    def Restructure_Raw_Data(self):

        for file_to_edit in self.files_list:
            # Editing original raw files
            df = read_csv(file_to_edit)
            file_location = self.file_location
            new_file_location = self.new_file_location

            # datetime is originally structured as DD/MM/YYYY and will be transformed to YYYY/MM/DD
            day = df['date'].str.slice(0, 2)
            month = df['date'].str.slice(3, 5)
            year = df['date'].str.slice(6, 10)
            time = df['date'].str[11:] + ":00"

            df["datetime_edit"] = year + "-" + month + "-" + day + " " + time
            df["datetime_edit"] = df["datetime_edit"].replace(" a.m.", "", regex=True)
            df["datetime_edit"] = df["datetime_edit"].replace(" p.m.", "", regex=True)

            df["date"] = df["datetime_edit"]
            df = df.drop("datetime_edit", axis = 1)    
            df = df.rename(columns={"date": "datetime"})

            file_to_edit = file_to_edit.replace("./", "")

            rename = file_to_edit
            rename = rename.replace(self.file_location[1:], "")
            rename = rename.replace("_2015_2022.csv", "").lower()
            rename = "sima_" + rename + ".csv"
            rename = self.new_file_location[1:] + rename

            # Case-specific edit
            rename = rename.replace("2.5", "25")

            df.to_csv(rename, encoding='utf-8', index=False)            
            
    
    def sql_tables_structures(self, files_list, directory, prefix):
        """
        
        """
        with open("{}_sql_tables_structure.sql".format(prefix),"w") as file:
            # Query commands to create SQL tables.
            for i in files_list:
                file.write("CREATE TABLE {}.`{}` (\n".format(mysql_database_name, i))
                for j, k, l in zip(range(len(files_list)), self.cols, self.data_type):
                    if l == "F":
                        dtypes = self.F_types
                    else:
                        dtypes = self.I_types

                    if j == 0:
                        file.write("`{}` {}\n".format(k, dtypes[j]))
                    else:
                        file.write(", `{}` {}\n".format(k, dtypes[j]))
                file.write(") COMMENT = \"Source: ./{}/{}.csv\"\n;".format(directory,i))
                file.write("\n")
                
    def upload_csv_script(self, files_list, directory, prefix):
        # Command line. Insert CSV files into created MySQL tables
        with open("{}_upload_csv.sh".format(prefix),"w") as file:
            file.write("#!/bin/bash\n")
            
            for i in files_list:
                file.write("mysql -u{} -p{} --local-infile {} -e \"LOAD DATA LOCAL INFILE \'./{}/{}.csv\'  INTO TABLE {} FIELDS TERMINATED BY \',\' LINES TERMINATED BY \'\\n\' IGNORE 1 ROWS\"; \n".format(mysql_user, mysql_pwd, mysql_database_name, directory, i, i))

                
    def creating_views(self, files_list_trim, mvi_method_name, prefix):
                
        mvi_underscore = "" if mvi_method_name == "" else "_"
        features = ['co', 'no', 'no2', 'nox', 'o3', 'pm10', 'pm25', 'so2', 'tout', 'wdr', 'wsr', 'rh', 'sr', 'rainf', 'prs']

        stations = ['SE', 'NE', 'CE', 'NO', 'SO', 'NO2', 'NTE', 'NE2', 'SE2', 'SO2', 'SE3', 'SUR', 'NTE2', 'NE3']

        with open("{}_views_creation.sql".format(prefix),"w") as file:
            for s in stations:
                file.write("CREATE VIEW {}.sima_station{}{}_{} AS \n".format(mysql_database_name, mvi_underscore, prefix, s))
                file.write("SELECT l0.datetime as datetime \n")

                for f, ix in zip(features, range(len(features))):
                    file.write(", l{}.{} as {} \n".format(ix, s, f))

                file.write("FROM {}.{}_sima_co l0 \n".format(mysql_database_name, prefix, f))

                for f, ix in zip(features[1:], range(len(features))):
                    file.write("LEFT JOIN {}.{}_sima_{} l{} ON l0.datetime = l{}.datetime \n".format(mysql_database_name, prefix, f, ix+1, ix+1))

                file.write(";\n")

In [None]:
class sample_creation_time_metadata():
    def __init__(self, target, look_back, look_forward, test_frac_split, sqlq, colums_to_drop, n_batches):
        
        # Parameters
        self.target = target
        self.look_back = look_back
        self.look_forward = look_forward
        self.test_frac_split = test_frac_split
        
        # Columns to drop from final dataset
        self.colums_to_drop = colums_to_drop
        
        # Recognizing parameters from sql command
        self.sqlq = sqlq
        self.where_sql = where_from_sqlq(sqlq) 
        self.table_sql = tablename_from_sqlq(sqlq)
        self.cols_sql = cols_from_sqlq(sqlq)
        
        # Number of batches
        self.n_batches = n_batches
        
        # Counting total number of observations from sql command
        count_obj = aux_qdata("select count(*) from ({}) s1".format(sqlq))[0][0]
        self.count_obj = count_obj
        
        # Calculating total number of observations for each batch
        self.n_observations = int((count_obj - (count_obj % n_batches)) / n_batches)

    def updated_table_time_metadata(self):
        # Recognizing parameters from sql command
        where_sql = self.where_sql
        table_sql = self.table_sql
        cols_sql = self.cols_sql
        
        # Counting total number of observations from sql command
        count_obj = self.count_obj

        # Calculating total number of observations for each batch
        n_observations = self.n_observations

        # Table with added time metadata
        time_cols = ", ".join(cols_sql).replace("datetime", "datetime, monthname(datetime), hour(datetime)")
        time_sqlq = """ Select {}
        from {} 
        {}
        ORDER BY DATETIME ASC
        LIMIT {}
        """.format(time_cols, table_sql, where_sql, n_batches * n_observations)

        # Editing column names 
        cols_sql.insert(1, "month")
        cols_sql.insert(2, "hour")
        time_df = DataFrame(aux_qdata(time_sqlq), columns = cols_sql)
        
        # Update output removing undesired columns
        time_df = time_df.loc[:, ~time_df.columns.isin(self.colums_to_drop)]
        
        # Rearrange target to be at the end of the dataset
        target_col = time_df.pop(self.target)
        time_df[self.target] = target_col
        
        return time_df.set_index("datetime")
    
    def encoding_df(self):
        from category_encoders import LeaveOneOutEncoder
        
        time_df = self.updated_table_time_metadata()
        
        encoder = LeaveOneOutEncoder(cols = ["month", "hour"])
        encoded_df = encoder.fit_transform(time_df, time_df[self.target])
        
        # |print(encoded_df.shape)
        
        return encoded_df 
    
    
    def normalizing_df(self):
        from sklearn.preprocessing import RobustScaler
        
        # Calling encoded dataset
        enc_df = self.encoding_df()
        
        # Backup objects
        indices = enc_df.index
        cols = enc_df.columns
        target_col = enc_df[self.target].copy()
        
        transformer = RobustScaler(with_centering = False).fit(enc_df)
        norm_np = transformer.transform(enc_df)
        norm_df = DataFrame(norm_np, columns = cols).set_index(indices)
        
        # print(norm_df.shape)
        
        return norm_df

    def X_y_sets(self):
        """
        Creating samples for X, y datasets by splitting the multivariate dataset.

        Input:
        * df: This function takes in a pandas dataset for facility, but returns a numpy array of values
        * look_back:
        * look_forward: Multi-step

        """

        df = self.normalizing_df()
        df_values = df.values
        look_back = self.look_back
        look_forward = self.look_forward

        X, y = list(), list()

        limit = len(df_values)

        for i in range(limit):

            # Updated indices delimiting end of series. 
            end_ix = i + look_back
            out_end_ix = end_ix + look_forward-1

            # check if we are beyond the dataset
            if out_end_ix > limit:
                break

            # gather input and output parts of the pattern
            seq_x = df_values[i:end_ix           , :-1]
            seq_y = df_values[end_ix-1:out_end_ix, -1]

            X.append(seq_x)
            y.append(seq_y)
            
        X = array(X)
        y = array(y)   

        #print("X.shape" , X.shape) 
        #print("y.shape" , y.shape)

        return X, y
    
    
    def train_test_sets(self):
        
        X, y = self.X_y_sets()
        
        split_point = floor(self.n_observations * self.n_batches * self.test_frac_split) 
        
        train_X = X[:split_point, :] 
        train_y = y[:split_point, :]
        test_X = X[split_point:, :] 
        test_y  = y[split_point:, :]
        
        print("train_X.shape", train_X.shape)
        print("train_y.shape", train_y.shape)
        print("test_X.shape", test_X.shape)
        print("test_y.shape", test_y.shape)
        
        return train_X , train_y, test_X , test_y

In [12]:
#del mysql_credentials, mysql_user, mysql_pwd, mysql_database_name