In [1]:
import pandas as pd
import os
# from logger import LoggerSetup
from typing import Any

class DataHandler:
    def __init__(self):
        # self.log = LoggerSetup(logger_file = 'data_handler.log',
        #                        logger_name = 'data_handler').get_logger()
        pass

    def load_data(self, input_data : str = None):
        try:
            _, extension = os.path.splitext(input_data)
            if extension == '.csv':
                data = pd.read_csv(input_data)
                return data
            
            elif extension == ['.xlx', '.xlsx']:
                data = pd.read_excel(input_data)
                return data
            
            else:
                print("File not found. Please load file with extension: 'csv', 'xls', 'xlsx'.")

        except FileNotFoundError as e:
            print(f"Unable to find the document you're trying to load. {e}")

    def save_data(self, result : Any = None, output_file : str = None):
        try:
            print('Saving file...')
            result.to_csv(output_file, encoding = 'utf-8', index = False)
            print(f'Saved results to {output_file}.')

        except Exception as e:
            self.log.error(f'Unable to save CSV. {e}')

In [5]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats.mstats import winsorize
# from src import DataHandler
# from logger import LoggerSetup

class Preprocessor:
    def __init__(self, input_data : str = None):
        self.handler    = DataHandler()
        self.input_data = self.handler.load_data(input_data)
        # self.log        = LoggerSetup(logger_file = 'preprocessor.log',
        #                               logger_name = 'preprocessor').get_logger()
        
    def view_data(self):
        return self.input_data.head(10)

    def data_size(self):
        return self.input_data.shape

    def check_null(self):
        return self.input_data.isnull().sum()

    def check_unique_values(self):
        return self.input_data.nunique()

    def drop_columns(self, column):
        data = self.input_data.drop(column, axis = 1)
        self.input_data = data
        return self.input_data

    def check_duplicates(self):
        return self.input_data.duplicated().value_counts()

    def remove_duplicate_rows(self):
        return self.input_data.drop_duplicates()

    def check_column_types(self):
        return self.input_data.dtypes

    def correct_spelling(self, original, column, replace):
        self.input_data[column] = self.input_data[column].replace({original : replace})
        return self.input_data

    def checking_outliers(self, column_name : str = None):
        # Column name from quantity, price and sale
        sns.boxplot(self.input_data[column_name])
        plt.title(f"Outliers in {column_name}")
        plt.show()

    # def winsorize_outliers(self, column_name : str = None):
    #     winsorized_data = {}
    #     winsorized_data[column_name] = winsorize(self.input_data[column_name], limits = [0.00, 0.05])
    #     print(f"Mean of original data {column_name}:", np.mean(self.input_data[column_name]))
    #     print(f"Mean of winsorized data {column_name}:", np.mean(winsorized_data[column_name]))
    #     self.input_data[column_name] = winsorized_data[column_name]

    #     return self.input_data

    def transform_values(self, column_name : str = None):
        self.input_data[column_name] = abs(self.input_data[column_name])
        return self.input_data

In [27]:
preprocessor = Preprocessor(input_data = '../data/pharma-data.csv')
preprocessor.check_unique_values()

Distributor             29
Customer Name          751
City                   749
Country                  2
Latitude               655
Longitude              709
Channel                  2
Sub-channel              4
Product Name           240
Product Class            6
Quantity              1735
Price                  210
Sales                20546
Month                   12
Year                     4
Name of Sales Rep       13
Manager                  4
Sales Team               4
dtype: int64

In [7]:
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.exceptions as pe
import plotly.graph_objects as go
from scipy.stats import chisquare
from typing import List, Dict, Any
from scipy.stats import skew, kurtosis


class DataAnalysis:
    def __init__(self, input_data : str = None):
        self.handler = DataHandler()
        self.input_data = self.handler.load_data(input_data)

    def data_description(self):
        return self.input_data.describe()

    def calculate_kpi(self):
        total_sale     = sum(self.input_data['Sale'])
        total_cost     = sum(self.input_data['Price'])
        total_quantity = sum(self.input_data['Quantity'])
        total_profit   = total_sale - total_cost
        
        average_sale   = total_sale/self.input_data.value_counts()
        average_cost   = total_cost/self.input_data.value_counts()

        profit_margin  = total_profit/total_cost*100

        return (total_sale, total_cost, total_quantity, 
                total_profit, average_sale, average_cost, 
                profit_margin)

    def express_skewness(self, column_name : str = None):
        skewness = skew(self.input_data[column_name])
        return skewness

    def express_kurtosis(self, column_name : str = None):
        kurtosis_val = kurtosis(self.input_data[column_name])
        return kurtosis_val

    def check_distribution(self, column = None):
        observation = self.input_data[column].value_counts().sort_index()
        total_observations = sum(observation)
        expected = [total_observations/len(observation)]*len(observation)

        chi2stat , p_value = chisquare(observation, expected)

        print(f'Chi-Square Statistic: {chi2stat}\n P-Value: {p_value}')

    def correlation_test(self):
        data = self.input_data.select_dtypes(include=['number'])
        correlation_matrix = data.corr(method='pearson')

        # Create interactive heatmap
        fig = px.imshow(correlation_matrix,
                        text_auto=True,
                        color_continuous_scale='RdYlGn',
                        title='Pearson Correlation Matrix')
        return fig

    def covariance_calculation(self, column1, column2):
        covariance_matrix = np.cov(self.input_data[column1], self.input_data[column2])
        return covariance_matrix

da = DataAnalysis(input_data = '../data/preprocessed_data.csv')

In [25]:
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.exceptions as pe
import plotly.graph_objects as go
from scipy.stats import chisquare
from typing import List, Dict, Any

class ExploratoryDataAnalysis:
    def __init__(self, input_data):
        self.handler = DataHandler()
        self.input_data = self.handler.load_data(input_data = input_data)
        
    # DISTRIBUTION PLOTS
    def distribution(self, column_name):
        fig = px.histogram(self.input_data,
                           x        = column_name,
                           title    = f'Distribution of {column_name} using Histogram',
                           nbins    = 20,)
        return fig

    def violin_plot(self, column_name):
        fig = px.violin(self.input_data,
                        x        = column_name,
                        title    = f'Distribution of {column_name} using Histogram',)
        return fig

    def bar_plot_categories(self, column_name):
        fig = px.bar(self.input_data,
                     x     = column_name,
                     title = f'Count of {column_name} using Bar Graph',
                     color = column_name)
        return fig

    def pie_chart(self, column_name):
        column_sale = self.input_data.groupby(column_name, as_index = False)['Sales'].sum()
        fig = px.pie(column_sale,
                     values = 'Sales',
                     names  = column_name,
                     title  = f'Sale percentage based on {column_name}',
                     color = column_name,)
        return fig

eda = ExploratoryDataAnalysis(input_data = '../data/preprocessed_data.csv')