# Data Cleaning Pipeline for Sensor Datasets
This notebook cleans the synthetic sensor datasets stored in S3 for anomaly detection model training.

In [None]:
# Install required packages
!pip install pandas numpy boto3 -q

In [None]:
import pandas as pd
import numpy as np
import boto3
import json
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns

# Configuration
INPUT_BUCKET = "synthetic-data-4"
OUTPUT_BUCKET = "processedd-synthetic-data"
REGION = "us-east-1"

# Initialize S3 client
s3_client = boto3.client('s3', region_name=REGION)

print(f"Input bucket: s3://{INPUT_BUCKET}")
print(f"Output bucket: s3://{OUTPUT_BUCKET}")

In [None]:
# Define datasets to process
datasets = [
    "datasets/arc_data.csv",
    "datasets/asd_data.csv", 
    "datasets/basement_data.csv",
    "datasets/laundry_data.csv",
    "datasets/voc_data.csv"
]

print("Datasets to process:")
for dataset in datasets:
    print(f"  - {dataset}")

In [None]:
def clean_dataset(df, dataset_name):
    """Clean a single dataset"""
    print(f"\n=== Cleaning {dataset_name} ===")
    print(f"Original shape: {df.shape}")
    
    # 1. Remove duplicates
    df_clean = df.drop_duplicates()
    print(f"After removing duplicates: {df_clean.shape}")
    
    # 2. Handle missing values
    print(f"Missing values per column:")
    print(df_clean.isnull().sum())
    df_clean = df_clean.dropna()
    print(f"After removing null values: {df_clean.shape}")
    
    # 3. Convert timestamp to datetime
    if 'timestamp' in df_clean.columns:
        df_clean['timestamp'] = pd.to_datetime(df_clean['timestamp'])
        df_clean = df_clean.sort_values('timestamp')
        print("Converted timestamp to datetime and sorted")
    
    # 4. Convert boolean strings to integers
    if 'is_anomaly' in df_clean.columns:
        df_clean['is_anomaly'] = df_clean['is_anomaly'].map({
            'True': 1, 'False': 0, True: 1, False: 0
        })
        print("Converted is_anomaly to integers")
    
    # 5. Handle numeric columns and remove outliers
    if 'value' in df_clean.columns:
        df_clean['value'] = pd.to_numeric(df_clean['value'], errors='coerce')
        df_clean = df_clean.dropna(subset=['value'])
        
        # Remove outliers (values beyond 3 standard deviations)
        mean_val = df_clean['value'].mean()
        std_val = df_clean['value'].std()
        
        if std_val > 0:
            outlier_mask = abs(df_clean['value'] - mean_val) <= 3 * std_val
            outliers_removed = len(df_clean) - outlier_mask.sum()
            df_clean = df_clean[outlier_mask]
            print(f"Removed {outliers_removed} outliers")
    
    print(f"Final shape: {df_clean.shape}")
    reduction = (1 - len(df_clean) / len(df)) * 100
    print(f"Data reduction: {reduction:.2f}%")
    
    return df_clean

In [None]:
# Process each dataset
results = {}

for dataset_path in datasets:
    dataset_name = dataset_path.split('/')[-1].replace('.csv', '')
    
    try:
        print(f"\n{'='*60}")
        print(f"PROCESSING: {dataset_name}")
        print(f"{'='*60}")
        
        # Read dataset from S3
        print(f"Reading s3://{INPUT_BUCKET}/{dataset_path}...")
        df = pd.read_csv(f"s3://{INPUT_BUCKET}/{dataset_path}")
        
        # Clean the dataset
        df_clean = clean_dataset(df, dataset_name)
        
        # Save cleaned dataset to S3
        output_path = f"s3://{OUTPUT_BUCKET}/cleaned-data/{dataset_name}_cleaned.csv"
        print(f"\nSaving to {output_path}...")
        df_clean.to_csv(output_path, index=False)
        
        # Store results
        results[dataset_name] = {
            'status': 'success',
            'original_rows': len(df),
            'cleaned_rows': len(df_clean),
            'reduction_percent': round((1 - len(df_clean) / len(df)) * 100, 2),
            'output_path': output_path
        }
        
        print(f"✅ Successfully processed {dataset_name}")
        
    except Exception as e:
        print(f"❌ Error processing {dataset_name}: {str(e)}")
        results[dataset_name] = {
            'status': 'error',
            'message': str(e)
        }

In [None]:
# Create and display summary
summary = {
    'cleaning_timestamp': datetime.now().isoformat(),
    'input_bucket': INPUT_BUCKET,
    'output_bucket': OUTPUT_BUCKET,
    'total_datasets': len(results),
    'successful_cleanings': sum(1 for r in results.values() if r['status'] == 'success'),
    'failed_cleanings': sum(1 for r in results.values() if r['status'] == 'error'),
    'details': results
}

print("\n" + "="*80)
print("CLEANING SUMMARY")
print("="*80)
print(f"Total datasets processed: {summary['total_datasets']}")
print(f"Successful: {summary['successful_cleanings']}")
print(f"Failed: {summary['failed_cleanings']}")
print("\nDataset Details:")

for dataset, result in results.items():
    if result['status'] == 'success':
        print(f"  ✅ {dataset}: {result['reduction_percent']}% reduction ({result['original_rows']:,} → {result['cleaned_rows']:,} rows)")
    else:
        print(f"  ❌ {dataset}: FAILED - {result.get('message', 'Unknown error')}")

In [None]:
# Save summary report to S3
summary_json = json.dumps(summary, indent=2)
summary_path = f"s3://{OUTPUT_BUCKET}/cleaning-reports/summary.json"

with open('/tmp/summary.json', 'w') as f:
    f.write(summary_json)

s3_client.upload_file('/tmp/summary.json', OUTPUT_BUCKET, 'cleaning-reports/summary.json')
print(f"\n📊 Summary report saved to {summary_path}")

In [None]:
# Visualize cleaning results
if any(r['status'] == 'success' for r in results.values()):
    successful_results = {k: v for k, v in results.items() if v['status'] == 'success'}
    
    datasets_names = list(successful_results.keys())
    original_rows = [r['original_rows'] for r in successful_results.values()]
    cleaned_rows = [r['cleaned_rows'] for r in successful_results.values()]
    
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
    
    # Bar chart of row counts
    x = np.arange(len(datasets_names))
    width = 0.35
    
    ax1.bar(x - width/2, original_rows, width, label='Original', alpha=0.8)
    ax1.bar(x + width/2, cleaned_rows, width, label='Cleaned', alpha=0.8)
    ax1.set_xlabel('Datasets')
    ax1.set_ylabel('Number of Rows')
    ax1.set_title('Dataset Sizes: Before vs After Cleaning')
    ax1.set_xticks(x)
    ax1.set_xticklabels(datasets_names, rotation=45)
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    
    # Pie chart of data reduction
    reductions = [r['reduction_percent'] for r in successful_results.values()]
    ax2.pie(reductions, labels=datasets_names, autopct='%1.1f%%', startangle=90)
    ax2.set_title('Data Reduction Percentage by Dataset')
    
    plt.tight_layout()
    plt.show()
    
    print("\n🎉 Data cleaning completed! Your datasets are ready for model training.")
else:
    print("\n⚠️ No datasets were successfully processed. Please check the errors above.")