In [26]:
import pandas as pd
import numpy as np
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from bokeh.plotting import figure, show
from bokeh.layouts import gridplot
from bokeh.models import ColumnDataSource
from bokeh.transform import dodge
import matplotlib.pyplot as plt
import os
from sqlalchemy import create_engine
import unittest

class DataAnalysis:
#Data analysis class for performing data analysis tasks.

    def __init__(self, train_file, ideal_file, test_file):
        self.train_file = train_file
        self.ideal_file = ideal_file
        self.test_file = test_file
        self.train_data = None
        self.ideal_data = None
        self.test_data = None

    def read_data(self):
        #Read data from CSV files
        self.train_data = pd.read_csv(self.train_file)
        self.ideal_data = pd.read_csv(self.ideal_file)
        self.test_data = pd.read_csv(self.test_file)

    def visualize_training_data(self):
        #Visualize the training data using Bokeh library
        p = figure(title='Training Data', x_axis_label='x', y_axis_label='y', width=500, height=300)
        p.circle(self.train_data['x'], self.train_data['y1'], legend_label='y1', size=4, color='blue')
        p.circle(self.train_data['x'], self.train_data['y2'], legend_label='y2', size=4, color='red')
        p.circle(self.train_data['x'], self.train_data['y3'], legend_label='y3', size=4, color='green')
        p.circle(self.train_data['x'], self.train_data['y4'], legend_label='y4', size=4, color='orange')
        show(p)

    def visualize_test_data(self):
        #Visualize the test data using Bokeh library
        p = figure(title='Test Data', x_axis_label='x', y_axis_label='y', width=500, height=300)
        p.circle(self.test_data['x'], self.test_data['y'], legend_label='y', size=4, color='purple')
        show(p)

    def fit_polynomial_models(self):
        models = {}
        for col in self.train_data.columns[1:]:
            #Fit a polynomial model to each column in the training data
            model = np.polyfit(self.train_data['x'], self.train_data[col], 3)
            models[col] = model
        return models

    def predict(self, models):
        predictions = {}
        for col, model in models.items():
            poly_func = np.poly1d(model)
            #Predict the values for each column in the test data 
            predictions[col] = poly_func(self.test_data['x'])
        return predictions

    def calculate_performance_metrics(self, models):
        metrics = {}
        for col, model in models.items():
            predictions = self.predict({col: model})
            y_true = self.test_data['y']
            y_pred = predictions[col]
            #Calculate performance metrics for each column
            mae = mean_absolute_error(y_true, y_pred)
            mse = mean_squared_error(y_true, y_pred)
            r2 = r2_score(y_true, y_pred)
            metrics[col] = {'MAE': mae, 'MSE': mse, 'R2 Score': r2}
        return metrics

    def plot_performance_metrics(self, metrics):
        names = list(metrics.keys())
        mae_values = [metrics[name]['MAE'] for name in names]
        mse_values = [metrics[name]['MSE'] for name in names]
        r2_values = [metrics[name]['R2 Score'] for name in names]
        data = dict(names=names, mae_values=mae_values, mse_values=mse_values, r2_values=r2_values)
        #width of each bar
        bar_width = 0.4
        offset = 0.4

        #Creating the figure
        p = figure(x_range=names, width=600, height=400, title='Performance Metrics')
        p.xaxis.axis_label = 'Columns'
        p.yaxis.axis_label = 'Metric Value'

        #Ploting the performance metrics (MAE, MSE, R2 Score)
        p.vbar(x=dodge('names', -bar_width-offset, range=p.x_range), top='mae_values', width=bar_width,
               source=ColumnDataSource(data), color='blue', legend_label='MAE')

        p.vbar(x=dodge('names', -offset, range=p.x_range), top='mse_values', width=bar_width,
               source=ColumnDataSource(data), color='green', legend_label='MSE')

        p.vbar(x=dodge('names', bar_width+offset, range=p.x_range), top='r2_values', width=bar_width,
               source=ColumnDataSource(data), color='orange', legend_label='R2 Score')

        #Show figure
        p.legend.location = 'top_right'
        show(p)


    def save_to_database(self, metrics):
        #Saving the performance metrics to database using SQLAlchemy
        engine = create_engine('sqlite:///metrics.db')
        with engine.begin() as conn:
            for col, values in metrics.items():
                df = pd.DataFrame(values, index=[col])
                df.to_sql('metrics', conn, if_exists='append')

    def find_best_ideal_functions(self):
        ideal_functions = {}
        for y_col in self.train_data.columns[1:]:
            differences = {}
            for ideal_col in self.ideal_data.columns[1:]:
                #Calculating the difference between the train data column and each ideal data column
                difference = np.sum(np.abs(self.train_data[y_col] - self.ideal_data[ideal_col]))
                differences[ideal_col] = difference
            best_ideal_col = min(differences, key=differences.get)
            ideal_functions[y_col] = best_ideal_col
        return ideal_functions

    def print_ideal_columns(self):
        # Find the best ideal column for each train data column and print the results
        ideal_functions = self.find_best_ideal_functions()
        print("Best Ideal Columns:")
        for y_col, ideal_col in ideal_functions.items():
            print(f"{y_col}: {ideal_col}")

class DataAnalysisWrapper:
#Wrapper class for data analysis.
    

    def __init__(self, train_file, ideal_file, test_file):
        self.train_file = train_file
        self.ideal_file = ideal_file
        self.test_file = test_file
        self.analysis = None

    def perform_data_analysis(self):
        # Performing the data analysis
        self.load_data()
        self.visualize_data()
        models = self.fit_models()
        predictions = self.predict_values(models)
        metrics = self.calculate_metrics(models)
        self.plot_metrics(metrics)
        self.save_metrics(metrics)
        self.print_ideal_columns()

    def load_data(self):
        #Create an instance of DataAnalysis class
        #load the data
        analysis = DataAnalysis(self.train_file, self.ideal_file, self.test_file)
        analysis.read_data()
        self.analysis = analysis

    def visualize_data(self):
        #Visualize the training and test data
        self.analysis.visualize_training_data()
        self.analysis.visualize_test_data()

    def fit_models(self):
        #Fit polynomial models to the training data
        models = self.analysis.fit_polynomial_models()
        print("Polynomial models fitted successfully!")
        return models

    def predict_values(self, models):
        #Generate predictions for the test data
        predictions = self.analysis.predict(models)
        print("Predictions generated successfully!")
        return predictions

    def calculate_metrics(self, models):
        #Calculate performance metrics for the predictions
        metrics = self.analysis.calculate_performance_metrics(models)
        print("Performance metrics calculated successfully!")
        return metrics

    def plot_metrics(self, metrics):
        #Plot the performance metrics
        self.analysis.plot_performance_metrics(metrics)

    def save_metrics(self, metrics):
        #Save the performance metrics to a database
        self.analysis.save_to_database(metrics)
        print("Performance metrics saved to the database!")

    def print_ideal_columns(self):
        #Print best ideal columns for each train data column
        self.analysis.print_ideal_columns()

#Load data set
train_file = 'train.csv'
ideal_file = 'ideal.csv'
test_file = 'test.csv'

class TestDataAnalysis(unittest.TestCase):
    def setUp(self):
        self.train_file = 'train.csv'
        self.ideal_file = 'ideal.csv'
        self.test_file = 'test.csv'

    def test_data_analysis(self):
        wrapper = DataAnalysisWrapper(self.train_file, self.ideal_file, self.test_file)
        wrapper.perform_data_analysis()

        # Add assertions to test the expected outcomes
        # For example, check if the metrics are calculated and saved successfully
        self.assertIsNotNone(wrapper.analysis.calculate_performance_metrics(wrapper.analysis.fit_polynomial_models()))
        # ...


#Create a test suite
test_suite = unittest.TestLoader().loadTestsFromTestCase(TestDataAnalysis)

#Run the tests
unittest.TextTestRunner().run(test_suite)



Polynomial models fitted successfully!
Predictions generated successfully!
Performance metrics calculated successfully!


.
----------------------------------------------------------------------
Ran 1 test in 1.458s

OK


Performance metrics saved to the database!
Best Ideal Columns:
y1: y48
y2: y31
y3: y30
y4: y40


<unittest.runner.TextTestResult run=1 errors=0 failures=0>