In [2]:
from collections import defaultdict, OrderedDict, deque
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px

In [3]:
#Finding outliers by Tukey method with adjustment option for iqr boundaries.

def outliers_iqr(data, feature, left_iqr=1.5, right_iqr=1.5, log_scale=False):
    """Function for finding outliers by Tukey method with adjustment option for iqr's multiplier.

    Args:
        data (DataFrame): DataFrame which will be used to find outliers.
        feature (string): Name of a column in DF which will be inspected for outliers.
        left_iqr (float, optional): lower boundary multiplier. Defaults to 1.5.
        right_iqr (float, optional): upper boundary multiplier. Defaults to 1.5.
        log_scale (bool, optional): converting data in logarithmic representation in case of lognormal destribution of
        original data. Defaults to False.

    Returns:
        DataFrame: returns two copies of the original DataFrame contain DF's with outliers and cleaned data.
    """    ''''''
    
    if log_scale:
        x = np.log(data[feature])
    else:
        x = data[feature]
    
    quartile1, quartile3 = x.quantile(0.25), x.quantile(0.75)
    
    iqr = quartile3 - quartile1
    
    lower_bound = quartile1 - (iqr*left_iqr)
    
    upper_bound = quartile3 + (iqr*right_iqr)
    
    outliers = data[(x<lower_bound) | (x>upper_bound)]
    
    cleaned = data[(x>lower_bound) & (x<upper_bound)]
    
    print(f'Number of outliers by Yukey\'s method: {outliers.shape[0]}')
    print(f'Resulting number of lines cleared of outliers: {cleaned.shape[0]}')
    
    return outliers, cleaned

In [4]:
#Finding outliers by z-method with adjustment option for boundaries.

def outliers_z_score(data, feature, left_mod=3, right_mod=3, log_scale=False):
    """Function for finding outliers by z-method with adjustment option for left and right multiplier.

    Args:
        data (DataFrame): DataFrame which will be used to find outliers.
        feature (string): Name of a column in DF which will be inspected for outliers.
        left_mod (int, optional): lower boundary multiplier. Defaults to 3.
        right_mod (int, optional): upper boundary multiplier. Defaults to 3.
        log_scale (bool, optional): converting data in logarithmic representation in case of lognormal destribution of
        original data. Defaults to False.

    Returns:
        DataFrame: returns two copies of the original DataFrame contain DF's with outliers and cleaned data.
    """    ''''''
    
    if log_scale:
        x = np.log(data[feature]+1)
    else:
        x = data[feature]
    
    mu = x.mean()
    
    sigma = x.std()
    
    lower_bound = mu - left_mod * sigma
    
    upper_bound = mu + right_mod * sigma
    
    outliers = data[(x < lower_bound) | (x > upper_bound)]
    
    cleaned = data[(x > lower_bound) & (x < upper_bound)]
    
    print(f'Number of outliers by z-method: {outliers.shape[0]}')
    print(f'Resulting number of lines cleared of outliers: {cleaned.shape[0]}')
    
    return outliers, cleaned

In [8]:
#Duplicate finding function in lines

def dupl_data_remove(data, immune_col=None):
    """Function for finding full dupliacates in lines

    Args:
        data (DataFrame): DataFrame which will be used to find duplicates.
        immune_col (str or tupple, optional): name of the column/s which the function will pass. Defaults to None.

    Returns:
        DataFrame: returns the copy of the original DataFrame cleaned from full duplicates in line.
    """    ''''''
    
    if immune_col is None:                                                          
        dupl_columns = list(data.columns)                                           
    
    else:
        dupl_columns = list(data.columns)
        try:
            for col in immune_col:
                dupl_columns.remove(immune_col)  
        except ValueError:
            pass                                          
    
    mask = data.duplicated(subset=dupl_columns)                                   
    
    data_duplicates = data[mask]                                                  
    print(f'Number of duplicates: {data_duplicates.shape[0]}')                
    
    data_dedupped = data.drop_duplicates(subset=dupl_columns)                       
    print(f'Resulting number of lines cleared of duplicates: {data_dedupped.shape[0]}')
    
    return data_dedupped

In [None]:
#Duplicate finding function in lines (old version)

def dupl_data_remove(data, immune_col=None):
    """Function for finding full dupliacates in lines

    Args:
        data (DataFrame): DataFrame which will be used to find duplicates.
        immune_col (str or tupple, optional): name of the column/s which the function will pass. Defaults to None.

    Returns:
        DataFrame: returns the copy of the original DataFrame cleaned from full duplicates in line.
    """    ''''''
    
    if immune_col is None:                                                          
        dupl_columns = list(data.columns)                                           
    else:
        dupl_columns = list(data.columns)                                           
        dupl_columns.remove(immune_col)                                             
    
    mask = data.duplicated(subset=dupl_columns)                                   
    
    data_duplicates = data[mask]                                                  
    print(f'Number of duplicates: {data_duplicates.shape[0]}')                
    
    data_dedupped = data.drop_duplicates(subset=dupl_columns)                       
    print(f'Resulting number of lines cleared of duplicates: {data_dedupped.shape[0]}')
    
    return data_dedupped

In [6]:
#Функция по поиску не информативных признаков

def low_info_col_drop(data, top_freq_thresh=0.95, nuniq_thresh=0.95):
    """_summary_

    Args:
        data (_type_): _description_
        top_freq_thresh (float, optional): _description_. Defaults to 0.95.
        nuniq_thresh (float, optional): _description_. Defaults to 0.95.

    Returns:
        _type_: _description_
    """    ''''''
    
    low_info_col_list= []                                                            #
    
    for col in data.columns:                                                         #
        top_freq = data[col].value_counts(normalize=True).max()                      #
        nunique_ratio = data[col].nunique() / data[col].count()                      #
        
    if top_freq > top_freq_thresh:                                                   #
        low_info_col_list.append(col)                                                #
        print(f'{col}: {round(top_freq*100, 2)}% одинаковых значений')               #
    
    if nunique_ratio > nuniq_thresh:                                                 #
        low_info_col_list.append(col)                                                #
        print(f'{col}: {round(nunique_ratio*100, 2)}% уникальных значений')          #
        
    return low_info_col_list

In [7]:
#Function for cleaning data frame from null data features by conditional level of null data.

def null_data_col_drop(data, null_thresh=0.4, immune_col=None):
    """Function for cleaning the data of empty data features.

    Args:
        data (DataFrame): DataFrame which will be used for cleaning.
        null_thresh (float, optional): minimal threshold for removing the feature. Defaults to 0.4.
        immune_col (str or tupple, optional): name of the column/s which the function will pass. Defaults to None.

    Returns:
        DataFrame: returns the copy of the original DataFrame cleaned from null data columns.
    """    ''''''
    
    temp_data = data.replace({0 : np.nan})                         #creating the temp frame with replased 0 by NaN
    null_data_col_list = []                                        #empty list for future drop of columns
    
    for col in temp_data.columns:
        try:
            col_null = round(temp_data[col].isnull().value_counts(normalize=True), 2)[True]   #percent of NaN in every column
        except KeyError:                                                                      #if there is no NaN
            col_null = 0
        
        if col_null > null_thresh:                     #comparison with acceptable level of empty data in column
            null_data_col_list.append(col)             
            print(f'{col}: {col_null*100}% zero values')
    
    if immune_col is None:                             #checkin' is there any column/s that the func should skip in drop process
        pass
    else:
        try:
            for col in immune_col:
                null_data_col_list.remove(immune_col)
        except ValueError:
            pass
        
    drop_data = data.drop(null_data_col_list, axis=1)    
    print(f'{drop_data.shape[1]} features with less than {null_thresh*100}% null data')        
    
    return drop_data