In [10]:
import numpy as np, time
from sklearn.impute import SimpleImputer
import pandas as pd
from sklearn.model_selection import train_test_split, cross_val_predict
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, classification_report

from IPython.core.display import display, HTML                                    
display(HTML("<style>.container { width:100% !important; }</style>"))                                  #change width of Jupyer Notebook to use the whole window resolution availa

In [None]:
#https://github.com/alankrantas/sefr_multiclass_classifier
#based on https://github.com/sefr-classifier/sefr/blob/master/SEFR.py
#research paper: https://arxiv.org/abs/2006.04620
class SEFR:
    def __init__(self):
        """
        Initialize model class.
        """
        
        self.labels = np.array([])
        self.weights = np.array([])
        self.bias = np.array([])
        self.training_time = 0


    def fit(self, data_train, target_train):
        """
        Train the model.
        """
        
        self.labels = np.unique(target_train) # get all labels
        self.weights = []
        self.bias = []
        self.training_time = 0
        
        start_time = time.monotonic_ns()
        data_train = np.array(data_train, dtype='float32')
        target_train = np.array(target_train, dtype='int32')
        
        for label in self.labels: # train binary classifiers on each labels
            
            pos_labels = (target_train != label) # use "not the label" as positive class
            neg_labels = np.invert(pos_labels) # use the label as negative class
            
            pos_indices = data_train[pos_labels]
            neg_indices = data_train[neg_labels]
            
            avg_pos = np.mean(pos_indices, axis=0)
            avg_neg = np.mean(neg_indices, axis=0)
            
            weight = np.nan_to_num((avg_pos - avg_neg) / (avg_pos + avg_neg)) # calculate model weight of "not the label"
            weighted_scores = np.dot(data_train, weight)
            
            pos_score_avg = np.mean(weighted_scores[pos_labels])
            neg_score_avg = np.mean(weighted_scores[neg_labels])
            
            bias = -(neg_indices.size * pos_score_avg + # calculate weighted average of bias
                     pos_indices.size * neg_score_avg) / (neg_indices.size + pos_indices.size)
            
            self.weights.append(weight) # label weight
            self.bias.append(bias) # label bias
        
        self.weights = np.array(self.weights, dtype='float32')
        self.bias = np.array(self.bias, dtype='float32')
        self.training_time = time.monotonic_ns() - start_time


    def predict(self, new_data):
        """
        Predict labels of the new data.
        """
        
        new_data = np.array(new_data, dtype='float32')

        # calculate weighted score + bias on each labels
        weighted_score = np.add(np.dot(self.weights, new_data.T).T, self.bias)
        return self.labels[np.argmin(weighted_score, axis=1)]


    def get_params(self, deep=True): # for cross-validation
        return {}

In [11]:
if __name__ == '__main__':
    from sklearn.impute import SimpleImputer
    import pandas as pd
    from sklearn.model_selection import train_test_split, cross_val_predict
    from sklearn.preprocessing import LabelEncoder
    from sklearn.metrics import accuracy_score, classification_report
    
    #dataset selection and loading
    train_set_name = "weather_data_2000_2019"
    train_dataset = pd.read_csv(("./datasets/" + train_set_name + ".csv"),header=None)
    test_set_name = "weather_data_2020_2021"
    test_dataset = pd.read_csv(("./datasets/" + test_set_name + ".csv"),header=None) 

    print("Your Train Dataset is: ", train_set_name)                                                                       #display dataset name to user
    print("Your Test Dataset is: ", test_set_name)                                                                       #display dataset name to user

    features_selected = 5
    #training features
    temperature_train = train_dataset.iloc[:, 4:5]
    feels_like_train = train_dataset.iloc[:, 7:8]
    dew_point_train = train_dataset.iloc[:, 8:9]
    humidity_train = train_dataset.iloc[:, 9:10]
    pressure_train = train_dataset.iloc[:, 19:20] 
    #uv_index_train = train_dataset.iloc[:, 24:25] 

    #testing data
    temperature_test = test_dataset.iloc[:, 4:5]
    feels_like_test = test_dataset.iloc[:, 7:8]
    dew_point_test = test_dataset.iloc[:, 8:9]
    humidity_true = test_dataset.iloc[:, 9:10]
    pressure_test = test_dataset.iloc[:, 19:20]
    #uv_index_test = test_dataset.iloc[:, 24:25] 

    X_train = pd.concat([temperature_train, feels_like_train, dew_point_train, humidity_train, pressure_train], axis=1)   #, uv_index_train], axis=1)
    X_train.replace(([np.inf, -np.inf], np.nan), inplace=True)                                                            #replace any infinite values with nan
    X_train = X_train.to_numpy()  

    y_train = train_dataset.iloc[:, 30:31].to_numpy()
    y_true = test_dataset.iloc[:, 30:31].to_numpy()

    #https://pandas.pydata.org/pandas-docs/stable/user_guide/merging.html
    test_data = pd.concat([temperature_test, feels_like_test, dew_point_test, humidity_true, pressure_test], axis=1)       #, uv_index_test], axis=1)       
    test_data.replace(([np.inf, -np.inf], np.nan), inplace=True)                                                           #replace any infinite values with nan
    X_test = test_data.to_numpy()

    #change all nan values in all datasets with the most frequent value of the dataset
    imp = SimpleImputer(missing_values=np.nan, strategy='most_frequent')
    imp.fit(X_train)
    imp.fit(y_train)
    imp.fit(X_test)
    imp.fit(y_true)
    
    disp = int(input("Please choose 1 to display the dataset or any button to cotinue without displaying!"))
    if disp == 1:
        display(X_train)
        display(y_train.ravel())

        display(X_test)
    else: print("Not displaying dataset!")   
    
    # encode target
    le = LabelEncoder()
    y_train = le.fit_transform(y_train.ravel())
    class_names = le.classes_


    # train model and predict labels
    clf = SEFR()
    clf.fit(X_train, y_train.ravel())
    predicted = clf.predict(X_test)
    cv_predicted = cross_val_predict(clf, X_train, y_train.ravel(), cv=5)

    # view prediction results
    print('Training time:', clf.training_time, 'ns')
    print('Training CV score:', accuracy_score(y_train.ravel(), cv_predicted).round(3))
    print('Test accuracy:', accuracy_score(y_true, predicted).round(3))

Your Train Dataset is:  weather_data_2000_2019
Your Test Dataset is:  weather_data_2020_2021
Please choose 1 to display the dataset or any button to cotinue without displaying!1


array([[   7.9 ,    7.5 ,    6.9 ,   93.72, 1024.5 ],
       [   8.4 ,    7.7 ,    6.7 ,   89.02, 1025.3 ],
       [   9.1 ,    8.7 ,    7.8 ,   91.37, 1018.6 ],
       ...,
       [   7.4 ,    5.1 ,    5.2 ,   85.75, 1033.6 ],
       [   6.8 ,    5.3 ,    5.7 ,   93.15, 1029.7 ],
       [   7.3 ,    5.6 ,    6.5 ,   94.87, 1031.6 ]])

array([1, 1, 1, ..., 2, 2, 2], dtype=int64)

array([[   5.4 ,    4.  ,    3.9 ,   90.37, 1029.3 ],
       [   8.8 ,    7.2 ,    7.1 ,   89.27, 1020.4 ],
       [   8.3 ,    6.4 ,    5.2 ,   81.85, 1021.4 ],
       ...,
       [  11.2 ,   10.6 ,    9.1 ,   87.68, 1004.2 ],
       [  14.  ,   14.  ,   12.  ,   87.59, 1012.3 ],
       [  13.6 ,   13.6 ,   11.  ,   84.72, 1016.5 ]])

Training time: 0 ns
Training CV score: 0.419
Test accuracy: 0.435
