In [2]:
import os
import tempfile
import shutil
import pandas as pd
import numpy as np
from scipy.optimize import curve_fit
from sqlalchemy import create_engine, inspect
from bokeh.plotting import figure, show
from bokeh.io import push_notebook, output_notebook
from ipywidgets import interact
from IPython.display import display
import unittest
import warnings  # Import the warnings module

# Suppress FutureWarnings
warnings.simplefilter(action='ignore', category=FutureWarning)

# Initialize Bokeh for Jupyter Notebook
from bokeh.plotting import output_notebook
output_notebook()

class DataMappingError(Exception):
    """Exception raised for errors in mapping data."""
    pass

class DataVisualizationError(Exception):
    """Exception raised for errors in data visualization."""
    pass

class DeviationCalculationError(Exception):
    """Exception raised for errors in deviation calculation."""
    pass

class DataLoadingError(Exception):
    """Exception raised for errors in loading data."""
    pass


class DataLoader:
    """Base class for loading data from a file."""

    def __init__(self, file_path):
        """
        Initialize the DataLoader instance.

        Parameters:
        - file_path (str): The path to the data file.
        """
        self.file_path = file_path

    def load_data(self):
        """
        Load data from the specified file.

        Returns:
        - pandas.DataFrame: A DataFrame containing the loaded data.
        Raises:
        - DataLoadingError: If an error occurs while loading the data.
        """
        try:
            data = pd.read_csv(self.file_path)
            return data
        except Exception as e:
            raise DataLoadingError(f"Error loading data from {self.file_path}: {str(e)}")


def my_ideal_function(x, a, b):
    """
    Example ideal function.

    Parameters:
    - x: Input variable.
    - a, b: Parameters to be fitted.

    Returns:
    - Output of the ideal function.
    """
    return a * x + b


class TrainingDataLoader(DataLoader):
    """Class for loading training data into an SQLite database."""

    def __init__(self, file_path, db_path):
        """
        Initialize the TrainingDataLoader instance.

        Parameters:
        - file_path (str): The path to the training data file.
        - db_path (str): The path to the SQLite database.
        """
        super().__init__(file_path)
        self.db_path = db_path

    def compile_database(self):
        """
        Compile the training data into an SQLite database.

        Raises:
        - DataLoadingError: If an error occurs while compiling the database.
        """
        engine = create_engine(f'sqlite:///{self.db_path}')
        try:
            df = self.load_data()
            df.to_sql('training_data', engine, index=False, if_exists='replace')
        except Exception as e:
            raise DataLoadingError(f"Error compiling database: {str(e)}")
        finally:
            engine.dispose()  # Close the connection


class IdealFunctionsLoader(DataLoader):
    """Class for loading ideal functions into an SQLite database."""

    def __init__(self, file_path, db_path):
        """
        Initialize the IdealFunctionsLoader instance.

        Parameters:
        - file_path (str): The path to the ideal functions file.
        - db_path (str): The path to the SQLite database.
        """
        super().__init__(file_path)
        self.db_path = db_path

    def compile_database(self):
        """
        Compile the ideal functions into an SQLite database.

        Raises:
        - DataLoadingError: If an error occurs while compiling the database.
        """
        engine = create_engine(f'sqlite:///{self.db_path}')
        try:
            df = self.load_data()
            df.to_sql('ideal_functions', engine, index=False, if_exists='replace')
        except Exception as e:
            raise DataLoadingError(f"Error compiling ideal functions database: {str(e)}")
        finally:
            engine.dispose()  # Close the connection


class FitIdealFunctions:
    """Class for fitting ideal functions to training data."""

    def __init__(self, train_data, ideal_functions):
        self.train_data = train_data
        self.ideal_functions = ideal_functions

    def fit_functions(self):
        chosen_functions = pd.DataFrame(columns=['Function', 'Parameters'])

        # Dynamically get the column names starting with 'y'
        function_columns = [col for col in self.train_data.columns if col.startswith('y')]

        for i, col in enumerate(function_columns, start=1):
            x_data = self.train_data['x']
            y_data = self.train_data[col]

            initial_guess = [1.0, 0.0]  # Initial guess for parameters (a, b)

            try:
                parameters, covariance = curve_fit(my_ideal_function, x_data, y_data, p0=initial_guess)
            except RuntimeError as e:
                print(f"Fit failed for {col}: {str(e)}")
                continue
            except Exception as e:
                print(f"An error occurred during fitting for {col}: {str(e)}")
                continue

            if np.any(np.isnan(covariance)):
                print(f"Covariance matrix contains NaN values for {col}. Fit may be unreliable.")
                continue

            # Use 'y' followed by a number as the function name
            function_name = f'y{i}'

            chosen_functions = pd.concat(
                [chosen_functions, pd.DataFrame({'Function': [function_name], 'Parameters': [parameters]})],
                ignore_index=True
            )

        return chosen_functions



class AssignAndCalculateDeviation:
    """Class for assigning and calculating deviation for test data."""

    def __init__(self, test_data, chosen_functions):
        self.test_data = test_data
        self.chosen_functions = chosen_functions

    def assign_and_calculate_deviation(self):
        results = pd.DataFrame(columns=['x', 'y', 'Deviation', 'ChosenFunction'])
        for index, row in self.test_data.iterrows():
            x = row['x']
            y = row['y']
            min_deviation = np.inf
            chosen_function = None

            for _, function_row in self.chosen_functions.iterrows():
                function_name = function_row['Function']
                function_parameters = function_row['Parameters']

                predicted_y = my_ideal_function(x, *function_parameters)
                deviation = np.abs(predicted_y - y)

                if deviation < min_deviation:
                    min_deviation = deviation
                    chosen_function = function_name

            results = results.append({'x': x, 'y': y, 'Deviation': min_deviation, 'ChosenFunction': chosen_function},
                                     ignore_index=True)

        return results

class TestDataLoader(DataLoader):
    """Class for loading test data into an SQLite database."""

    def __init__(self, file_path, db_path):
        super().__init__(file_path)
        self.db_path = db_path

    def compile_database(self):
        """
        Compile the test data into an SQLite database.

        Raises:
        - DataLoadingError: If an error occurs while compiling the database.
        """
        engine = create_engine(f'sqlite:///{self.db_path}')
        try:
            df = self.load_data()
            df.to_sql('test_data', engine, index=False, if_exists='replace')
        except Exception as e:
            raise DataLoadingError(f"Error compiling test data database: {str(e)}")
        finally:
            engine.dispose()  # Close the connection


class TestDataMapper:
    """Class for mapping test data to chosen ideal functions and calculating deviation."""

    def __init__(self, test_data, chosen_functions):
        self.test_data = test_data
        self.chosen_functions = chosen_functions

    def map_and_calculate_deviation(self):
        results = pd.DataFrame(columns=['x', 'y', 'Deviation', 'ChosenFunction'])
        for index, row in self.test_data.iterrows():
            x = row['x']
            y = row['y']
            min_deviation = np.inf
            chosen_function = None

            for _, function_row in self.chosen_functions.iterrows():
                function_name = function_row['Function']
                function_parameters = function_row['Parameters']

                try:
                    predicted_y = my_ideal_function(x, *function_parameters)
                except Exception as e:
                    raise DataMappingError(f"Error mapping data for {function_name}: {str(e)}")

                deviation = np.abs(predicted_y - y)

                if deviation < min_deviation:
                    min_deviation = deviation
                    chosen_function = function_name

            results = results.append({'x': x, 'y': y, 'Deviation': min_deviation, 'ChosenFunction': chosen_function},
                                     ignore_index=True)

        return results

    
    
class Visualization:
    """Class for data visualization."""

    def __init__(self, train_data, test_data, results, chosen_functions):
        self.train_data = train_data
        self.test_data = test_data
        self.results = results
        self.chosen_functions = chosen_functions
        self.plot_handles = []

    def plot_data(self):
        function_columns = [col for col in self.train_data.columns if col.startswith('y')]
        for i, col in enumerate(function_columns, start=1):
            p = figure(title=f"Function {i} Visualization", x_axis_label='x', y_axis_label='y')
            train_handle = p.circle(self.train_data['x'], self.train_data[col],
                                    legend_label=f'Training Function {i}', size=8, color='blue')

            test_handle = p.circle(self.test_data['x'], self.test_data['y'], legend_label='Test Data', size=8,
                                   color='green')

            chosen_function_data = self.results[self.results['ChosenFunction'] == f'Function{i}']

            deviation_handle = p.line(chosen_function_data['x'], chosen_function_data['Deviation'],
                                      legend_label=f'Deviation Function {i}', line_width=2, line_color='red')

            self.plot_handles.append((train_handle, test_handle, deviation_handle))
            show(p, notebook_handle=True)

    def plot_test_data(self):
        p = figure(title="Test Data Visualization", x_axis_label='x', y_axis_label='y')
        test_handle = p.circle(self.test_data['x'], self.test_data['y'], legend_label='Test Data', size=8, color='green')

        for idx, function_row in self.chosen_functions.iterrows():
            function_name = function_row['Function']
            chosen_function_data = self.results[self.results['ChosenFunction'] == function_name]
            deviation_handle = p.line(chosen_function_data['x'], chosen_function_data['Deviation'],
                                      legend_label=f'Deviation {function_name}', line_width=2)

            # Add both handles to the list
            self.plot_handles.append((test_handle, deviation_handle))

        show(p, notebook_handle=True)

    def update_plots(self, i):
        for idx, (test_handle, deviation_handle) in enumerate(self.plot_handles, start=1):
            chosen_function_data = self.results[self.results['ChosenFunction'] == f'Function{idx}']

            test_handle.data_source.data['x'] = self.test_data['x']
            test_handle.data_source.data['y'] = self.test_data['y']
            deviation_handle.data_source.data['x'] = chosen_function_data['x']
            deviation_handle.data_source.data['y'] = chosen_function_data['Deviation']

            push_notebook(handle=test_handle[0])
            push_notebook(handle=deviation_handle[0])


# Combined unit tests
class CombinedTestFunctions(unittest.TestCase):
    """Unit tests for the provided functions."""

    def setUp(self):
        # Create a temporary directory for testing
        self.test_dir = tempfile.mkdtemp()

    def tearDown(self):
        # Remove the temporary directory
        shutil.rmtree(self.test_dir)

    def test_data_loader(self):
        file_path = os.path.join(self.test_dir, "test.csv")
        with open(file_path, 'w') as f:
            f.write("x,y\n1,2\n3,4\n")
        data_loader = DataLoader(file_path)
        loaded_data = data_loader.load_data()
        expected_data = pd.DataFrame({'x': [1, 3], 'y': [2, 4]})
        pd.testing.assert_frame_equal(loaded_data, expected_data)

    def test_training_data_loader(self):
        train_file_path = os.path.join(self.test_dir, "train.csv")
        db_path = os.path.join(self.test_dir, 'train_test.db')
        with open(train_file_path, 'w') as f:
            f.write("x,y1,y2,y3,y4\n1,2,3,4,5\n3,4,5,6,7\n")
        train_data_loader = TrainingDataLoader(train_file_path, db_path)
        train_data_loader.compile_database()

        engine = create_engine(f'sqlite:///{db_path}')
        inspector = inspect(engine)
        table_exists = inspector.has_table('training_data')
        self.assertTrue(table_exists)

        loaded_data = pd.read_sql('training_data', engine)
        expected_data = pd.DataFrame({'x': [1, 3], 'y1': [2, 4], 'y2': [3, 5], 'y3': [4, 6], 'y4': [5, 7]})
        pd.testing.assert_frame_equal(loaded_data, expected_data)

    def test_ideal_functions_loader(self):
        ideal_file_path = os.path.join(self.test_dir, "ideal.csv")
        db_path = os.path.join(self.test_dir, 'ideal_test.db')
        with open(ideal_file_path, 'w') as f:
            f.write("x,y1,y2,y3,y4,y5\n1,2,3,4,5,6\n3,4,5,6,7,8\n")
        ideal_functions_loader = IdealFunctionsLoader(ideal_file_path, db_path)
        ideal_functions_loader.compile_database()

        engine = create_engine(f'sqlite:///{db_path}')
        inspector = inspect(engine)
        table_exists = inspector.has_table('ideal_functions')
        self.assertTrue(table_exists)

        loaded_data = pd.read_sql('ideal_functions', engine)
        expected_data = pd.DataFrame({'x': [1, 3], 'y1': [2, 4], 'y2': [3, 5], 'y3': [4, 6], 'y4': [5, 7], 'y5': [6, 8]})
        pd.testing.assert_frame_equal(loaded_data, expected_data)


# Run combined unit tests without causing SystemExit
def run_tests():
    loader = unittest.TestLoader()
    suite = loader.loadTestsFromTestCase(CombinedTestFunctions)
    runner = unittest.TextTestRunner()
    result = runner.run(suite)

    if not result.wasSuccessful():
        print("Tests failed.")
        sys.exit(1)  # Exit with a non-zero status code


if __name__ == "__main__":
    # Example usage
    # base_path = "/Users/keshavrupani/Desktop/MSCS/PYTHON/Datasets1 %2==0 (1)/"  # Replace with your dataset path

    train_data_loader = TrainingDataLoader(os.path.join("train.csv"), "example.db")
    train_data_loader.compile_database()

    ideal_functions_loader = IdealFunctionsLoader(os.path.join("ideal.csv"), "example.db")
    ideal_functions_loader.compile_database()

    engine = create_engine("sqlite:///example.db")
    train_data = pd.read_sql('training_data', engine)
    ideal_functions = pd.read_sql('ideal_functions', engine)

    chosen_functions = FitIdealFunctions(train_data, ideal_functions).fit_functions()

    test_data_loader = TestDataLoader(os.path.join("test.csv"), "example.db")
    test_data_loader.compile_database()

    test_data = pd.read_sql('test_data', engine)

    test_data_mapper = TestDataMapper(test_data, chosen_functions)
    test_results = test_data_mapper.map_and_calculate_deviation()

    visualization = Visualization(train_data, test_data, test_results, chosen_functions)
    visualization.plot_data()
    visualization.plot_test_data()

    # Run combined unit tests without causing SystemExit
    run_tests()


...
----------------------------------------------------------------------
Ran 3 tests in 0.022s

OK
