# Real Estate Data Processing Pipeline

This notebook demonstrates the implementation and usage of the RealEstateDataset class for processing real estate data.

## Objectives:
1. Implement the RealEstateDataset class
2. Load and explore the housing dataset
3. Clean and preprocess the data
4. Generate insights and visualizations
5. Save the cleaned dataset

## 1. Install Required Packages

First, let's install the necessary Python packages for our real estate data analysis.

In [None]:
# Install required packages
import subprocess
import sys

def install_packages():
    packages = [
        'pandas>=1.5.0',
        'numpy>=1.21.0',
        'matplotlib>=3.5.0',
        'seaborn>=0.11.0'
    ]
    
    for package in packages:
        try:
            subprocess.check_call([sys.executable, '-m', 'pip', 'install', package])
            print(f"✅ Successfully installed {package}")
        except subprocess.CalledProcessError as e:
            print(f"❌ Failed to install {package}: {e}")

# Uncomment the line below to install packages
# install_packages()

## 2. Import Required Libraries

Import all necessary libraries for data processing and analysis.

In [None]:
# Import required libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Optional, Dict, Any
import warnings
import os

# Configure display settings
warnings.filterwarnings('ignore')
plt.rcParams['figure.figsize'] = (12, 8)
sns.set_style("whitegrid")

print("📚 Libraries imported successfully!")
print(f"🐼 Pandas version: {pd.__version__}")
print(f"🔢 NumPy version: {np.__version__}")

## 3. RealEstateDataset Class Implementation

Here we implement the RealEstateDataset class with the required methods.

In [None]:
class RealEstateDataset:
    """
    A class for handling real estate data processing including loading,
    cleaning, and analyzing real estate datasets.
    """
    
    def __init__(self):
        """Initialize the RealEstateDataset instance."""
        self.data: Optional[pd.DataFrame] = None
        self.original_data: Optional[pd.DataFrame] = None
        self.filepath: Optional[str] = None
    
    def load_data(self, filepath: str) -> pd.DataFrame:
        """
        Read the dataset and initialize a pandas DataFrame.
        
        Args:
            filepath (str): Path to the CSV file containing real estate data
            
        Returns:
            pd.DataFrame: Loaded dataset
        """
        try:
            self.filepath = filepath
            self.data = pd.read_csv(filepath)
            # Keep a copy of original data for comparison
            self.original_data = self.data.copy()
            
            print(f"✅ Data loaded successfully from {filepath}")
            print(f"📊 Dataset shape: {self.data.shape}")
            print("\n🔍 First 5 rows:")
            display(self.data.head())
            print("\n📋 Column names:")
            print(self.data.columns.tolist())
            print("\n🏷️  Data types:")
            print(self.data.dtypes)
            
            return self.data
            
        except FileNotFoundError:
            print(f"❌ Error: File '{filepath}' not found.")
            raise
        except Exception as e:
            print(f"❌ Error loading data: {str(e)}")
            raise
    
    def clean_data(self) -> pd.DataFrame:
        """
        Handle missing and invalid data.
        
        Returns:
            pd.DataFrame: Cleaned dataset
        """
        if self.data is None:
            raise ValueError("❌ No data loaded. Please call load_data() first.")
        
        print("🧹 Starting data cleaning process...")
        
        # Store initial state
        initial_rows = len(self.data)
        
        # 1. Handle missing values
        print("\n📊 Missing values before cleaning:")
        missing_before = self.data.isnull().sum()
        print(missing_before[missing_before > 0])
        
        # Numerical columns - fill with median
        numerical_cols = self.data.select_dtypes(include=[np.number]).columns
        for col in numerical_cols:
            if self.data[col].isnull().sum() > 0:
                median_val = self.data[col].median()
                self.data[col] = self.data[col].fillna(median_val)
                print(f"   ✓ Filled {col} missing values with median: {median_val:.2f}")
        
        # Categorical columns - fill with mode or drop
        categorical_cols = self.data.select_dtypes(include=['object']).columns
        for col in categorical_cols:
            if self.data[col].isnull().sum() > 0:
                if self.data[col].isnull().sum() / len(self.data) > 0.5:
                    # If more than 50% missing, drop the column
                    self.data = self.data.drop(columns=[col])
                    print(f"   ✓ Dropped column '{col}' (>50% missing)")
                else:
                    # Fill with mode
                    mode_val = self.data[col].mode()[0] if not self.data[col].mode().empty else 'Unknown'
                    self.data[col] = self.data[col].fillna(mode_val)
                    print(f"   ✓ Filled {col} missing values with mode: '{mode_val}'")
        
        # 2. Handle invalid entries
        print("\n🔍 Checking for invalid entries...")
        
        # Remove negative prices
        if 'Price' in self.data.columns:
            negative_prices = self.data['Price'] < 0
            if negative_prices.sum() > 0:
                self.data = self.data[~negative_prices]
                print(f"   ✓ Removed {negative_prices.sum()} rows with negative prices")
        
        # Remove invalid room counts
        for col in ['Bedrooms', 'Bathrooms']:
            if col in self.data.columns:
                invalid_rooms = self.data[col] < 0
                if invalid_rooms.sum() > 0:
                    self.data = self.data[~invalid_rooms]
                    print(f"   ✓ Removed {invalid_rooms.sum()} rows with negative {col}")
        
        # Remove invalid sizes
        if 'Size_sqft' in self.data.columns:
            invalid_size = self.data['Size_sqft'] <= 0
            if invalid_size.sum() > 0:
                self.data = self.data[~invalid_size]
                print(f"   ✓ Removed {invalid_size.sum()} rows with invalid size")
        
        # 3. Fix data types
        print("\n🔧 Fixing data types...")
        
        # Convert date columns
        date_columns = ['Date_Added']
        for col in date_columns:
            if col in self.data.columns:
                try:
                    self.data[col] = pd.to_datetime(self.data[col], errors='coerce')
                    print(f"   ✓ Converted {col} to datetime")
                except:
                    print(f"   ⚠️  Could not convert {col} to datetime")
        
        # 4. Remove duplicates
        duplicates = self.data.duplicated().sum()
        if duplicates > 0:
            self.data = self.data.drop_duplicates()
            print(f"   ✓ Removed {duplicates} duplicate rows")
        
        # 5. Reset index
        self.data = self.data.reset_index(drop=True)
        
        # Summary
        final_rows = len(self.data)
        rows_removed = initial_rows - final_rows
        
        print(f"\n✅ Data cleaning completed!")
        print(f"📊 Initial rows: {initial_rows}")
        print(f"📊 Final rows: {final_rows}")
        print(f"📊 Rows removed: {rows_removed}")
        
        return self.data
    
    def describe_data(self) -> Dict[str, Any]:
        """
        Print basic statistics and exploratory insights.
        
        Returns:
            Dict: Dictionary containing various statistics and insights
        """
        if self.data is None:
            raise ValueError("❌ No data loaded. Please call load_data() first.")
        
        print("📈 REAL ESTATE DATA ANALYSIS REPORT")
        print("=" * 50)
        
        # Basic info
        print(f"\n📊 DATASET OVERVIEW")
        print(f"   • Total properties: {len(self.data):,}")
        print(f"   • Number of features: {len(self.data.columns)}")
        
        # Descriptive statistics
        numerical_cols = self.data.select_dtypes(include=[np.number]).columns
        if len(numerical_cols) > 0:
            print(f"\n📊 NUMERICAL STATISTICS")
            display(self.data[numerical_cols].describe())
        
        insights = {}
        
        # Property type analysis
        if 'Type' in self.data.columns:
            print(f"\n🏠 PROPERTY TYPE DISTRIBUTION")
            type_counts = self.data['Type'].value_counts()
            type_percentages = self.data['Type'].value_counts(normalize=True) * 100
            
            for prop_type, count in type_counts.items():
                percentage = type_percentages[prop_type]
                print(f"   • {prop_type}: {count:,} ({percentage:.1f}%)")
            
            insights['property_types'] = type_counts.to_dict()
        
        # Price analysis by property type
        if 'Price' in self.data.columns and 'Type' in self.data.columns:
            print(f"\n💰 AVERAGE PRICES BY PROPERTY TYPE")
            avg_prices = self.data.groupby('Type')['Price'].agg(['mean', 'median', 'count'])
            display(avg_prices)
            
            insights['avg_prices_by_type'] = avg_prices['mean'].to_dict()
        
        # Size analysis by location
        if 'Size_sqft' in self.data.columns and 'Location' in self.data.columns:
            print(f"\n📏 AVERAGE SIZE BY LOCATION (Top 10)")
            avg_size = self.data.groupby('Location')['Size_sqft'].agg(['mean', 'count']).sort_values('mean', ascending=False)
            display(avg_size.head(10))
            
            insights['avg_size_by_location'] = avg_size['mean'].head(10).to_dict()
        
        return insights
    
    def save_cleaned_data(self, output_path: str) -> None:
        """
        Save the cleaned dataset to a CSV file.
        
        Args:
            output_path (str): Path where to save the cleaned data
        """
        if self.data is None:
            raise ValueError("❌ No data to save. Please load and clean data first.")
        
        try:
            # Ensure directory exists
            os.makedirs(os.path.dirname(output_path), exist_ok=True)
            
            self.data.to_csv(output_path, index=False)
            print(f"✅ Cleaned data saved to: {output_path}")
            print(f"📊 Saved {len(self.data)} rows and {len(self.data.columns)} columns")
        except Exception as e:
            print(f"❌ Error saving data: {str(e)}")
            raise

print("🏗️  RealEstateDataset class defined successfully!")

## 4. Data Loading and Initial Exploration

Now let's create an instance of our RealEstateDataset class and load the housing data.

In [None]:
# Create an instance of RealEstateDataset
dataset = RealEstateDataset()

# Load the housing data
data_path = '../data/raw/housing_data.csv'
dataset.load_data(data_path)

## 5. Data Cleaning

Clean the dataset by handling missing values and invalid entries.

In [None]:
# Clean the data
cleaned_data = dataset.clean_data()

# Check for any remaining missing values
print("\n📊 Missing values after cleaning:")
missing_after = dataset.data.isnull().sum()
remaining_missing = missing_after[missing_after > 0]
if len(remaining_missing) == 0:
    print("   ✅ No missing values remaining!")
else:
    print(remaining_missing)

## 6. Data Analysis and Insights

Generate descriptive statistics and insights from the cleaned data.

In [None]:
# Generate insights
insights = dataset.describe_data()

## 7. Data Visualizations

Create visualizations to better understand the real estate market data.

In [None]:
# Create comprehensive visualizations
def create_real_estate_visualizations(data):
    """
    Create comprehensive visualizations for real estate data analysis.
    """
    plt.style.use('default')
    fig, axes = plt.subplots(2, 2, figsize=(16, 12))
    fig.suptitle('Real Estate Market Analysis Dashboard', fontsize=16, fontweight='bold')
    
    # 1. Price distribution
    if 'Price' in data.columns:
        axes[0, 0].hist(data['Price'], bins=30, alpha=0.7, color='skyblue', edgecolor='black')
        axes[0, 0].set_title('Property Price Distribution')
        axes[0, 0].set_xlabel('Price ($)')
        axes[0, 0].set_ylabel('Frequency')
        axes[0, 0].ticklabel_format(style='plain', axis='x')
    
    # 2. Property type distribution
    if 'Type' in data.columns:
        type_counts = data['Type'].value_counts()
        colors = plt.cm.Set3(np.linspace(0, 1, len(type_counts)))
        axes[0, 1].pie(type_counts.values, labels=type_counts.index, autopct='%1.1f%%', colors=colors)
        axes[0, 1].set_title('Property Type Distribution')
    
    # 3. Price vs Size scatter plot
    if 'Price' in data.columns and 'Size_sqft' in data.columns:
        axes[1, 0].scatter(data['Size_sqft'], data['Price'], alpha=0.6, color='purple')
        axes[1, 0].set_title('Price vs Property Size')
        axes[1, 0].set_xlabel('Size (sqft)')
        axes[1, 0].set_ylabel('Price ($)')
        
        # Add correlation coefficient
        correlation = data['Price'].corr(data['Size_sqft'])
        axes[1, 0].text(0.05, 0.95, f'Correlation: {correlation:.3f}', 
                       transform=axes[1, 0].transAxes, fontsize=10,
                       bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
    
    # 4. Average price by location (top 10)
    if 'Price' in data.columns and 'Location' in data.columns:
        avg_price_by_location = data.groupby('Location')['Price'].mean().sort_values(ascending=True).tail(10)
        axes[1, 1].barh(range(len(avg_price_by_location)), avg_price_by_location.values, color='orange')
        axes[1, 1].set_title('Average Price by Location (Top 10)')
        axes[1, 1].set_xlabel('Average Price ($)')
        axes[1, 1].set_ylabel('Location')
        axes[1, 1].set_yticks(range(len(avg_price_by_location)))
        axes[1, 1].set_yticklabels(avg_price_by_location.index)
    
    plt.tight_layout()
    plt.show()
    
    print("📊 Visualizations created successfully!")

# Create visualizations
create_real_estate_visualizations(dataset.data)

## 8. Additional Analysis

Let's perform some additional analysis to gain deeper insights.

In [None]:
# Additional analysis
print("🔍 ADDITIONAL INSIGHTS\n")

# Correlation matrix for numerical features
numerical_cols = dataset.data.select_dtypes(include=[np.number]).columns
if len(numerical_cols) > 1:
    print("📊 Correlation Matrix:")
    correlation_matrix = dataset.data[numerical_cols].corr()
    
    plt.figure(figsize=(12, 10))
    sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0, 
                square=True, linewidths=0.5, cbar_kws={"shrink": .8})
    plt.title('Feature Correlation Matrix')
    plt.tight_layout()
    plt.show()

# Price analysis by bedrooms
if 'Price' in dataset.data.columns and 'Bedrooms' in dataset.data.columns:
    print("\n🛏️  Average Price by Number of Bedrooms:")
    price_by_bedrooms = dataset.data.groupby('Bedrooms')['Price'].agg(['mean', 'count']).sort_index()
    display(price_by_bedrooms)

# Market trends (if date information is available)
if 'Date_Added' in dataset.data.columns:
    dataset.data['Year_Added'] = dataset.data['Date_Added'].dt.year
    if not dataset.data['Year_Added'].isnull().all():
        print("\n📅 Properties Listed by Year:")
        yearly_counts = dataset.data['Year_Added'].value_counts().sort_index()
        display(yearly_counts)

print("\n✅ Additional analysis completed!")

## 9. Save Cleaned Dataset

Save the cleaned dataset for future use.

In [None]:
# Save the cleaned dataset
output_path = '../data/cleaned/housing_data_cleaned.csv'
dataset.save_cleaned_data(output_path)

print("\n🎉 Real Estate Data Processing Pipeline completed successfully!")
print("\n📋 Summary:")
print(f"   • Original dataset: {len(dataset.original_data)} rows")
print(f"   • Cleaned dataset: {len(dataset.data)} rows")
print(f"   • Data quality improved: {((len(dataset.data)/len(dataset.original_data)) * 100):.1f}% data retained")
print(f"   • Cleaned data saved to: {output_path}")

## 10. Test Environment Setup

Let's verify that our environment is properly configured and all components are working.

In [None]:
# Test environment setup
def test_environment():
    """
    Test that all required packages are installed and working correctly.
    """
    print("🧪 Testing Environment Setup\n")
    
    # Test package imports
    packages_to_test = {
        'pandas': pd,
        'numpy': np,
        'matplotlib': plt,
        'seaborn': sns
    }
    
    for package_name, package in packages_to_test.items():
        try:
            version = getattr(package, '__version__', 'Unknown')
            print(f"   ✅ {package_name}: {version}")
        except Exception as e:
            print(f"   ❌ {package_name}: Error - {e}")
    
    # Test RealEstateDataset class
    try:
        test_dataset = RealEstateDataset()
        print(f"   ✅ RealEstateDataset class: Working")
    except Exception as e:
        print(f"   ❌ RealEstateDataset class: Error - {e}")
    
    # Test file paths
    files_to_check = [
        '../data/raw/housing_data.csv',
        '../src/real_estate_dataset.py',
        '../requirements.txt'
    ]
    
    print("\n📁 File Structure Check:")
    for file_path in files_to_check:
        if os.path.exists(file_path):
            print(f"   ✅ {file_path}: Found")
        else:
            print(f"   ❌ {file_path}: Not found")
    
    print("\n🎯 Environment test completed!")

# Run environment test
test_environment()