In [1]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.impute import SimpleImputer
from scipy import stats
import dask.dataframe as dd
import logging
import matplotlib.pyplot as plt
import seaborn as sns

# Configure logging
logging.basicConfig(filename='data_processing.log', level=logging.INFO)

# Load Data
def load_data(file_path, file_type='csv'):
    try:
        if file_type == 'csv':
            return pd.read_csv(file_path)
        elif file_type == 'excel':
            return pd.read_excel(file_path)
        else:
            raise ValueError("Unsupported file type")
    except Exception as e:
        logging.error(f"Error loading data: {e}")
        return None

# Outlier Detection and Handling
def handle_outliers(df, method='z_score', threshold=3):
    try:
        if method == 'z_score':
            z_scores = np.abs(stats.zscore(df.select_dtypes(include=np.number)))
            return df[(z_scores < threshold).all(axis=1)]
        elif method == 'iqr':
            Q1 = df.quantile(0.25)
            Q3 = df.quantile(0.75)
            IQR = Q3 - Q1
            return df[~((df < (Q1 - 1.5 * IQR)) | (df > (Q3 + 1.5 * IQR))).any(axis=1)]
        else:
            raise ValueError("Unsupported outlier detection method")
    except Exception as e:
        logging.error(f"Error handling outliers: {e}")
        return df

# Missing Value Imputation
def impute_missing_values(df, strategy='mean'):
    try:
        imputer = SimpleImputer(strategy=strategy)
        df_imputed = pd.DataFrame(imputer.fit_transform(df), columns=df.columns)
        return df_imputed
    except Exception as e:
        logging.error(f"Error imputing missing values: {e}")
        return df

# Normalization
def normalize_data(df, method='min_max'):
    try:
        if method == 'min_max':
            scaler = MinMaxScaler()
        elif method == 'z_score':
            scaler = StandardScaler()
        else:
            raise ValueError("Unsupported normalization method")
        df_scaled = pd.DataFrame(scaler.fit_transform(df), columns=df.columns)
        return df_scaled
    except Exception as e:
        logging.error(f"Error normalizing data: {e}")
        return df

# Data Processing Pipeline
def process_data(file_path, file_type='csv'):
    try:
        df = load_data(file_path, file_type)
        if df is not None:
            df = handle_outliers(df)
            df = impute_missing_values(df)
            df = normalize_data(df)
        return df
    except Exception as e:
        logging.error(f"Error processing data: {e}")
        return None

# Batch Processing with Dask
def batch_process_data(file_paths, file_type='csv'):
    try:
        dfs = [dd.from_pandas(process_data(fp, file_type), npartitions=1) for fp in file_paths]
        ddf = dd.concat(dfs)
        return ddf.compute()
    except Exception as e:
        logging.error(f"Error in batch processing: {e}")
        return None

# Data Validation
def validate_data(df):
    try:
        # Example validation rule: Check for negative values in numeric columns
        if (df.select_dtypes(include=np.number) < 0).any().any():
            logging.warning("Data contains negative values.")
        # Add more validation rules as needed
    except Exception as e:
        logging.error(f"Error validating data: {e}")

# Data Visualization
def visualize_data(df):
    try:
        sns.pairplot(df)
        plt.show()
    except Exception as e:
        logging.error(f"Error visualizing data: {e}")

# Example usage
if __name__ == "__main__":
    file_paths = ['data1.csv', 'data2.csv']
    processed_data = batch_process_data(file_paths)

    if processed_data is not None:
        validate_data(processed_data)
        visualize_data(processed_data)


ModuleNotFoundError: No module named 'sklearn'