In [1]:
import numpy as np
import pandas as pd
import scipy as sp
import matplotlib.pyplot as plt
import pip
# pip.main(["install", "openpyxl"])
%matplotlib inline

In [29]:
"""
DataPrep: Data pre-process and preperation for manipulation and process

parameters:
    arg df_logXXXX:  data log with entry numbers (.xlsx file)

    arg df_dataXXXX: data set with entry numbers (.xlsx file)

return:
    df_logXXXX:  Cleaned up test data sets( data frame)

    df_log:      Concatinated data logs (data frame)

    df_data:     Concatinated data sets (data frame)
"""

import numpy as np
import pandas as pd

class DataPrep:

    """
    function: runlog_cleanup

        clean up and align variables

    function: data_cleanup

        check data sets and concatinate them into single data set
    """
    def __init__(self):
        """
        function:
            initialization of variables

        parameters:
            arg df_log:  Data log from data_prep output (data frame)

            arg df_data: Data set from data_prep output (data frame)

            arg test:    Test entries (list)

            arg run_num: Run numbers correspondign to test entires (list)

        return:
            data logs and sets
        """
        # import data log
        self.df_log2298 = pd.read_excel(r'~/SCALOS/project/runlogs/Autosort Run Log 2298.xlsx')
        self.df_log2320 = pd.read_excel(r'~/SCALOS/project/runlogs/Autosort Run Log 2320.xlsx')
        self.df_log2326 = pd.read_excel(r'~/SCALOS/project/runlogs/Autosort Run Log 2326.xlsx')
        self.df_log2331 = pd.read_excel(r'~/SCALOS/project/runlogs/Autosort Run Log 2331.xlsx')
        # import data sets
        self.df_data2298 = pd.read_csv(r'~/SCALOS/project/data/finaldata_uw2298.csv')
        self.df_data2320 = pd.read_csv(r'~/SCALOS/project/data/finaldata_uw2320.csv')
        self.df_data2326 = pd.read_csv(r'~/SCALOS/project/data/finaldata_uw2326.csv')
        self.df_data2331 = pd.read_csv(r'~/SCALOS/project/data/finaldata_uw2331.csv')


    def runlog_cleanup(self, df_log2298, df_log2320, df_log2326, df_log2331):
        """
        function:
            runlog_cleanup

        parameters:
            arg df_logXXXX = data log with entry numbers (.xlsx file)

        return:
            df_log:     Concatinate data log from inputs

            df_logXXXX: Clean up individual data logs
        """
#         df_log2298 = self.df_log2298
#         df_log2320 = self.df_log2320
#         df_log2326 = self.df_log2326
#         df_log2331 = self.df_log2331
    # clean up uw2298
        # delete unncessary column
        del df_log2298["Riley's Stress Level"]
        # rename entries titles
        df_log2298.rename(columns = {'FLAP L/R':'IB FLAP L/R'}, inplace = True)
        df_log2298.rename(columns = {'AIL L/R':'OB AIL L/R'}, inplace = True)
        df_log2298.rename(columns = {'LE DEF':'LE IB/OB'}, inplace = True)
        df_log2298.rename(columns = {'TRIP DEF':'TRIP DOTS'}, inplace = True)
        # add enties
        df_log2298.insert(1,'TEST', 2298)
        df_log2298['Nacelle Blockage L/R']= np.nan
        df_log2298['Spoiler L/R']= np.nan
        temp = df_log2298['DATE']
        del df_log2298['DATE']
        df_log2298['DATE']= temp
        df_log2298.columns.tolist()

    # clean up uw2320
        # rename entries titles
        df_log2320.rename(columns = {'FLAP L/R':'IB FLAP L/R'}, inplace = True)
        df_log2320.rename(columns = {'AIL L/R':'OB AIL L/R'}, inplace = True)
        df_log2320.rename(columns = {'TRIP DEF':'TRIP DOTS'}, inplace = True)
        # add enties
        df_log2320.insert(1,'TEST', 2320)
        df_log2320['Nacelle Blockage L/R']= np.nan
        df_log2320['Spoiler L/R']= np.nan
        temp = df_log2320['DATE']
        del df_log2320['DATE']
        df_log2320['DATE']= temp

    # clean up uw2326
        # add enties
        df_log2326.insert(1,'TEST', 2326)
        df_log2326['Nacelle Blockage L/R']= np.nan
        df_log2326['Spoiler L/R']= np.nan
        temp = df_log2326['DATE']
        del df_log2326['DATE']
        df_log2326['DATE']= temp

    # clean up uw2331
        df_log2331.insert(1,'TEST', 2331)

        # column title are consistant
        if df_log2298.columns.tolist() !=  df_log2320.columns.tolist():
            raise ValueError("Either 2298 or 2320 data is not right!")

        if df_log2298.columns.tolist() !=  df_log2326.columns.tolist():
            raise ValueError("Either 2298 or 2326 data is not right!")

        if df_log2298.columns.tolist() !=  df_log2331.columns.tolist():
            raise ValueError("Either 2298 or 2331 data is not right!")

        # Concatinate all run logs into single data frame
        df_log = pd.concat([df_log2298,
                            df_log2320,
                            df_log2326,
                            df_log2331],
                           ignore_index=True,axis=0)

        return df_log, df_log2298, df_log2320, df_log2326, df_log2331

    def data_cleanup(self, df_data2298, df_data2320, df_data2326, df_data2331):
        """
        function:
            data_cleanup

        parameters:
            arg df_dataXXXX = data set with entry numbers (.xlsx file)

        return:
            df_set:     Concatinate data sets from inputs
        """
#         df_data2298 = self.df_data2298
#         df_data2320 = self.df_data2320
#         df_data2326 = self.df_data2326
#         df_data2331 = self.df_data2331

        if df_data2298.columns.tolist() !=  df_data2320.columns.tolist():
            raise ValueError("Either 2298 or 2320 data is not right!")

        if df_data2298.columns.tolist() !=  df_data2326.columns.tolist():
            raise ValueError("Either 2298 or 2326 data is not right!")

        if df_data2298.columns.tolist() !=  df_data2331.columns.tolist():
            raise ValueError("Either 2298 or 2331 data is not right!")

        #  Concatinate all data into single data frame
        df_data = pd.concat([df_data2298,
                             df_data2320,
                             df_data2326,
                             df_data2331],
                            ignore_index=True,axis=0)

        return df_data


In [5]:
class DataProcess:

    """
    function: data_extract

        Extract data from input test entries with corresponding run numbers

    function: data_interp_derivatives

        Truncate data within max min and min max range (no exterpolation)
        Linear interpolate data on with integer pitch (P) or yaw (Y) run
        Compute derivaties with respect to alpha or beta
    """
    def __init__(self, df_log, df_data, test, run_num):
        """
        function:
            initialization of variables

        parameters:
            arg df_log:  Data log from data_prep output (data frame)

            arg df_data: Data set from data_prep output (data frame)

            arg test:    Test entries (list)

            arg run_num: Run numbers correspondign to test entires (list)

        return:
            self
        """

        self.df_log = df_log
        self.df_data = df_data
        self.test = test
        self.run_num = run_num

    def data_extract(self):
        """
        function:
            Data extraction

        parameters:
            arg df_log:  Data log from data_prep output (data frame)

            arg df_data: Data set from data_prep output (data frame)

            arg test:    Test entries (list)

            arg run_num: Run numbers correspondign to test entires (list)

        return:
            df_log_sub:  Extracted data log from test and run numbers (data frame)

            df_data_sub: Extracted data set from test and run numbers (data frame)
        """

        df_log = self.df_log
        df_data = self.df_data
        test = self.test
        run_num = self.run_num
        # Unit test
        if len(test) != len(run_num):
            raise ValueError("Test entries and run numbers must be consistent!")

        df_data_sub = pd.DataFrame()
        df_log_sub = pd.DataFrame()
        # Unit test
        for i in range(len(test)):
            # Test Entries must be integer
            if not isinstance(test[i], int):
                raise ValueError("Test entries must be integer!")

            # Entries must be not empty
            if not np.any(test[i]) or not np.any(run_num[i]):
                raise ValueError("Test entries and run numbers must be not empty!")

            # Test entries must be valid
            if not any(np.unique(df_log[df_log.columns.tolist()[1]]) == test[i]):
                raise ValueError("Test entries are invalid!")

            # Run number must be a list
            if not isinstance(run_num[i], list):
                raise ValueError("Run numbers are invalid!")

            for j in range(len(run_num[i])):
                # Each run number must be valid
                if run_num[i][j] < 0 or run_num[i][j] > np.max(
                    df_log[df_log.columns.tolist()[0]][
                        df_log[df_log.columns.tolist()[1]] == test[i]]):
                    raise ValueError("Run numbers are invalid!")

                # Entries must be not weight tare
                if np.any( pd.isna( df_log[df_log.columns.tolist()[2]][
                    (df_log[df_log.columns.tolist()[1]] == test[i]) &
                    (df_log[df_log.columns.tolist()[0]] == run_num[i][j])
                ])):
                    raise ValueError("Test num and corresponding run num is weight tare")

                # Concatinate sub data log
                df_log_sub = pd.concat(
                    [df_log_sub,
                     df_log[
                        (df_log[df_log.columns.tolist()[1]] == test[i]) &
                        (df_log[df_log.columns.tolist()[0]] == run_num[i][j]
                        )]], ignore_index=True, axis=0)
                # Concatinate sub data set
                df_data_sub = pd.concat(
                    [df_data_sub,
                     df_data[(df_data[df_data.columns.tolist()[1]] == test[i]) &
                             (df_data[df_data.columns.tolist()[0]] == run_num[i][j])]
                    ],ignore_index=True, axis=0)

        # Run type must be consistant
        if len(pd.unique(df_log_sub[df_log.columns.tolist()[4]])) > 1:
            raise ValueError("Run type is inconsistant!")
        df_log_sub["RUN NO."] = df_log_sub["RUN NO."].astype(int)

        df_data_sub[["RUN","TEST"]] = df_data_sub[["RUN","TEST"]].astype(int)

        return df_log_sub, df_data_sub

    def data_interp_derivative(self):
        """
        function:
            Data inpterolation and derivatives

        parameters:
            arg df_log:  Data log from data_extract output (data frame)

            arg df_data: Data set from data_extract output (data frame)

            arg test:    Test entries (list)

            arg run_num: Run numbers correspondign to test entires (list)

        return:
            df_data_interp:      Interpoalted data sets extracted data sets (data frame)

            df_data_derivative:  Data derivatives from extracted data sets (data frame)
        """

        df_log = self.df_log
        df_data = self.df_data
        test = self.test
        run_num = self.run_num

        # Check run type
        if pd.unique(df_log[df_log.columns.tolist()[4]]) == 'P6':
            alphabeta = df_data.columns.tolist()[3]
        elif pd.unique(df_log[df_log.columns.tolist()[4]]) == 'Y6':
            alphabeta = df_data.columns.tolist()[4]
        else:
            raise ValueError("Run type error!")

        max_list =[]
        min_list =[]

        # find the max and min among all runs
        for i in range(len(test)):
            for j in range(len(run_num[i])):
                max_list.append(np.max(
                    df_data[alphabeta][
                        (df_data[df_data.columns.tolist()[1]] == test[i]) &
                        (df_data[df_data.columns.tolist()[0]] == run_num[i][j])
                    ]))
                min_list.append(
                    np.min(df_data[alphabeta][
                        (df_data[df_data.columns.tolist()[1]] == test[i]) &
                        (df_data[df_data.columns.tolist()[0]] == run_num[i][j])
                    ]))
        # interpolate alpha or beta from min to max
        alphabeta_interp= np.arange(np.ceil(np.max(min_list)), np.floor(np.min(max_list))+1, 1)

        # Check interp data not extrapolate
        if np.min(alphabeta_interp) != np.ceil(np.max(min_list)) or np.max(
            alphabeta_interp) != np.floor(np.min(max_list)):
            raise ValueError("Data range incorrect")

        df_data_interp = pd.DataFrame()
        df_data_derivative = pd.DataFrame()

        # Loop through tests and run numbers
        for i in range(len(test)):
            temp_interp = pd.DataFrame()
            temp_derivative = pd.DataFrame()
            for k in range(len(run_num[i])):
                for j in range(len(df_data.columns.tolist())):
                    # Variables do not need to be interpreted
                    if j <= 8:
                        if df_data.columns.tolist()[j] == alphabeta:
                            temp_interp[df_data.columns.tolist()[j]] = alphabeta_interp
                            temp_derivative[df_data.columns.tolist()[j]] = alphabeta_interp
                        else:
                            temp_fun = sp.interpolate.interp1d(
                                df_data[alphabeta][(
                                    df_data[df_data.columns.tolist()[1]] == test[i]
                                ) & (
                                    df_data[df_data.columns.tolist()[0]] == run_num[i][k]
                                )],
                                df_data[df_data.columns.tolist()[j]][
                                    (df_data[df_data.columns.tolist()[1]] == test[i]) &
                                    (df_data[df_data.columns.tolist()[0]] == run_num[i][k]
                                    )], kind = 'nearest')
                            temp_interp[df_data.columns.tolist()[j]] = temp_fun(
                                alphabeta_interp)
                            temp_derivative[df_data.columns.tolist()[j]] = temp_fun(
                                alphabeta_interp)
                    else:
                        temp_fun = sp.interpolate.interp1d(
                            df_data[alphabeta][(
                                df_data[df_data.columns.tolist()[1]] == test[i]
                            ) & (
                                df_data[df_data.columns.tolist()[0]] == run_num[i][k]
                            )], df_data[df_data.columns.tolist()[j]][(
                                df_data[df_data.columns.tolist()[1]] == test[i]
                            ) & (
                                df_data[df_data.columns.tolist()[0]] == run_num[i][k]
                            )], kind = 'linear')
                        temp_interp[df_data.columns.tolist()[j]] = temp_fun(
                            alphabeta_interp)
                        temp_derivative[df_data.columns.tolist()[j]] = np.gradient(
                            temp_fun(alphabeta_interp), alphabeta_interp)

                # Concatinate data interpolation
                df_data_interp = pd.concat(
                    [df_data_interp, temp_interp]
                    , ignore_index = True, axis = 0)

                # Concatinate data derivatives
                df_data_derivative = pd.concat(
                    [df_data_derivative, temp_derivative]
                    , ignore_index = True, axis = 0)

        return df_data_interp, df_data_derivative


In [6]:
"""
DataPlot: Data extract, interpolation and derivatives

parameters:
    arg plot_type: Plot type specificaiton (to be complete)

    arg plot_vars: Plot variables from data set variables

    arg df_log:    Data log from data_prep output (data frame)

    arg df_data:   Data set from data_prep output (data frame)

    arg test:      Test entries (list)

    arg run_num:   Run numbers correspondign to test entires (list)

return:
    Plots

"""
import pandas as pd
import matplotlib.pyplot as plt

class DataPlot:

    """
    function: plt_data

        Plot input data for visulization

    """
    def __init__(self, plot_vars, df_log, df_data, test, run_num):
        """
        function:
            initialization of variables

        parameters:
            arg plot_type: Plot type specificaiton (to be complete)

            arg plot_vars: Plot variables from data set variables

            arg df_log:    Data log from data_prep output (data frame)

            arg df_data:   Data set from data_prep output (data frame)

            arg test:      Test entries (list)

            arg run_num:   Run numbers correspondign to test entires (list)

        return:
            self
        """
#         self.plot_type = plot_type
        self.plot_vars = plot_vars
        self.df_log = df_log
        self.df_data = df_data
        self.test = test
        self.run_num = run_num

    def plt_data(self):
        """
        function:
            Plot data

        parameters:
            arg plot_type: Plot type specificaiton (to be complete)

            arg plot_vars: Plot variables from data set variables

            arg df_log:    Data log from data_prep output (data frame)

            arg df_data:   Data set from data_prep output (data frame)

            arg test:      Test entries (list)

            arg run_num:   Run numbers correspondign to test entires (list)

        return:
            plots
        """
#         plot_type = self.plot_type
        plot_vars = self.plot_vars
        df_log = self.df_log
        df_data = self.df_data
        test = self.test
        run_num = self.run_num

        # Run type must be consistant
        if pd.unique(df_log[df_log.columns.tolist()[4]]) == 'P6':
            alphabeta = df_data.columns.tolist()[3]
            x_label = "\\alpha"
        elif pd.unique(df_log[df_log.columns.tolist()[4]]) == 'Y6':
            alphabeta = df_data.columns.tolist()[4]
            x_label = "\\beta"
        else:
            raise ValueError("Run type error!")

        # Plot data
        for j in range(len(plot_vars)):
            plt.figure()
            for i in range(len(test)):
                for k in range(len(run_num[i])):
                    plt.scatter(
                        df_data[alphabeta][
                            (df_data[df_data.columns.tolist()[1]] == test[i]) &
                            (df_data[df_data.columns.tolist()[0]] == run_num[i][k])],
                        df_data[plot_vars[j]][
                            (df_data[df_data.columns.tolist()[1]] == test[i]) &
                            (df_data[df_data.columns.tolist()[0]] == run_num[i][k])],
                        label='UW'+str(test[i])+' Run'+str(run_num[i][k]))
            plt.xlabel(r"$"+x_label+r" (^\circ)$")
            if plot_vars[j] == "LOD":
                y_label = "C_L/C_D"
            else:
                y_label = plot_vars[j][0]+"_"+plot_vars[j][1]+"("+plot_vars[j][2:]+") "
            plt.ylabel(r"$"+y_label+ "$")
            # Obly display legend on first plot
            if j == 0:
                plt.legend()
            plt.title(r"$"+y_label + " vs. "+ x_label+"$")
            plt.grid(True)
            plt.show()


In [None]:
Dataprep.data_runlog

In [27]:
"""
DataPrep test script
"""
import unittest
# import numpy as np
# import pandas as pd
# from src import DataPrep

# Define a class in which the tests will run
class TestDataPrep(unittest.TestCase):
    """
    Unit test class

        smoke test    = function working properly 
        one shot test = function work as expected 
        edge tes      = incorrect input detection
    """
    def __init__(self):
        """
        initialization of data input
        """
       # import data log
        df_log2298 = pd.read_excel(r'~/SCALOS/project/runlogs/Autosort Run Log 2298.xlsx')
        df_log2320 = pd.read_excel(r'~/SCALOS/project/runlogs/Autosort Run Log 2320.xlsx')
        df_log2326 = pd.read_excel(r'~/SCALOS/project/runlogs/Autosort Run Log 2326.xlsx')
        df_log2331 = pd.read_excel(r'~/SCALOS/project/runlogs/Autosort Run Log 2331.xlsx')
        # import data sets
        df_data2298 = pd.read_csv(r'~/SCALOS/project/data/finaldata_uw2298.csv')
        df_data2320 = pd.read_csv(r'~/SCALOS/project/data/finaldata_uw2320.csv')
        df_data2326 = pd.read_csv(r'~/SCALOS/project/data/finaldata_uw2326.csv')
        df_data2331 = pd.read_csv(r'~/SCALOS/project/data/finaldata_uw2331.csv')
        
        return df_log2298, df_log2320, df_log2326, df_log2331, df_data2298, df_data2320, df_data2326, df_data2331
    
    def test_runlog_cleanup_smoke(self, df_log2298, df_log2320, df_log2326, df_log2331):
        """
        smoke test

        check if the test run through 
        """
        DataPrep.runlog_cleanup(self, df_log2298, df_log2320, df_log2326, df_log2331)

    def test_data_cleanup_smoke(self, df_data2298, df_data2320, df_data2326, df_data2331):
        """
        smoke test

        check if the test run through 
        """
        DataPrep.data_cleanup(self, df_data2298, df_data2320, df_data2326, df_data2331)
        
suite = unittest.TestLoader().loadTestsFromTestCase(TestDataPrep)
_ = unittest.TextTestRunner().run(suite)

TypeError: __init__() takes 1 positional argument but 2 were given

In [25]:
"""
DataProcess test script
"""
import unittest
# import numpy as np
# import pandas as pd
# from src import DataPrep
# from src import DataProcess

# Define a class in which the tests will run
class TestDataProcess(unittest.TestCase):
    """
    Unit test class

        smoke test    = function working properly 
        one shot test = function work as expected 
        edge tes      = incorrect input detection
    """
    def __init__(self):
        # import data log
        self1.df_log2298 = pd.read_excel(r'~/SCALOS/project/runlogs/Autosort Run Log 2298.xlsx')
        self1.df_log2320 = pd.read_excel(r'~/SCALOS/project/runlogs/Autosort Run Log 2320.xlsx')
        self1.df_log2326 = pd.read_excel(r'~/SCALOS/project/runlogs/Autosort Run Log 2326.xlsx')
        self1.df_log2331 = pd.read_excel(r'~/SCALOS/project/runlogs/Autosort Run Log 2331.xlsx')
        # import data sets
        self1.df_data2298 = pd.read_csv(r'~/SCALOS/project/data/finaldata_uw2298.csv')
        self1.df_data2320 = pd.read_csv(r'~/SCALOS/project/data/finaldata_uw2320.csv')
        self1.df_data2326 = pd.read_csv(r'~/SCALOS/project/data/finaldata_uw2326.csv')
        self1.df_data2331 = pd.read_csv(r'~/SCALOS/project/data/finaldata_uw2331.csv')
        df_log, df_log2298, df_log2320, df_log2326, df_log2331 = DataPrep.data_cleanup(self1)
        df_data = DataPrep.runlog_cleanup
        self.df_log = df_log
        self.df_data = df_data
    
    def test_data_extract_smoke(self):
        """
        smoke test

        check if the test run through 
        """
        self.test = [2320]
        self.run_num = [[49, 51, 37, 56, 57, 26]]
        DataProcess.data_extract(self)

#     def test_data_interp_derivative_smoke(self):
#         """
#         smoke test

#         check if the test run through 
#         """
#         DataProcess.data_interp_derivative(self)
        
suite = unittest.TestLoader().loadTestsFromTestCase(TestDataProcess)
_ = unittest.TextTestRunner().run(suite)

TypeError: __init__() takes 1 positional argument but 2 were given