In [None]:
import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, r2_score
import matplotlib.pyplot as plt
import unittest

# Load and inspect the data
df = pd.read_csv('houses.csv')
print(df.info())  # Display information about the DataFrame
print(df.shape)   # Print the shape of the DataFrame
pd.set_option('display.max_columns', None)
print(df.head())  # Display the first few rows of the DataFrame

# Drop unwanted columns
columns_drop = [
    'description', 'Nearby School', 'Nearby Mall', 'Ad List', 'Category', 'Developer',
    'Address', 'Completion Year', 'Floor Range', 'Firm Type', 'Firm Number', 'REN Number',
    'Bus Stop', 'Mall', 'Park', 'School', 'Hospital', 'Highway', 'Railway Station',
    'Nearby Railway Station', '# of Floors', 'Total Units'
]
df = df.drop(columns=columns_drop)

# Check for missing values
print(df.isna().sum())  # Print the count of missing values in each column

# Extract numerical values from "Property Size" column
df['Property Size_in_sq_ft'] = df['Property Size'].str[:-6].astype(int)
df = df.drop(columns='Property Size')

# Modify the "Facilities" column to display the number of facilities
df['Amount of Facilities'] = df['Facilities'].apply(lambda x: 0 if x.strip() == '-' else len(x.split(',')))
df = df.drop(columns='Facilities')

# Extract numerical values from "price" column
df['Price_in_RM'] = df['price'].str[2:].replace(' ', '', regex=True).astype(int)
df = df.drop(columns='price')

# Replace '-' value with 0 in "Parking Lot" column
df['Parking Lot'] = df['Parking Lot'].replace('-', '0', regex=True).astype(int)

# Remove rows where "Bedroom" or "Bathroom" columns contain '-'
df = df[~df['Bathroom'].str.contains('-', na=False)]
df = df[~df['Bedroom'].str.contains('-', na=False)]

# Convert "Bedroom" and "Bathroom" columns to integer type
df[['Bedroom', 'Bathroom']] = df[['Bedroom', 'Bathroom']].astype(int)

# Rearrange the columns in the DataFrame
rearrange_columns = [
    'Building Name', 'Property Type', 'Property Size_in_sq_ft', 'Bedroom', 'Bathroom',
    'Amount of Facilities', 'Parking Lot', 'Land Title', 'Tenure Type', 'Price_in_RM'
]
df = df[rearrange_columns]

# Export the cleaned data to a new CSV file
df.to_csv('house_data_cleaned.csv', index=False)

# Reload cleaned data
df = pd.read_csv('house_data_cleaned.csv')
df = df.drop(columns=['Building Name'])  # Drop unnecessary column for modeling
print(df.info())

# Define features and target variable
X = df.drop(columns='Price_in_RM')
y = df['Price_in_RM']

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=45)

# Preprocessing for categorical and numerical columns
categorical_columns = ['Property Type', 'Land Title', 'Tenure Type']
numerical_columns = ['Property Size_in_sq_ft', 'Bedroom', 'Bathroom', 'Amount of Facilities', 'Parking Lot']

categorical_transformer = OneHotEncoder(handle_unknown='ignore')
numerical_transformer = StandardScaler()

preprocessor = ColumnTransformer(
    transformers=[
        ('cat', categorical_transformer, categorical_columns),
        ('num', numerical_transformer, numerical_columns)
    ]
)

# Select the machine learning model
model = LinearRegression()

# Create a pipeline that includes preprocessing and model training
my_pipeline = Pipeline(steps=[('preprocessor', preprocessor), ('model', model)])

# Train the model
my_pipeline.fit(X_train, y_train)

# Predict prices for the test set
predicted_value = my_pipeline.predict(X_test)
actual_value = y_test

# Compare predicted prices with actual prices
price_comparison = pd.DataFrame({'Predicted Price': predicted_value, 'Actual Price': actual_value})
print(price_comparison)

# Evaluate the model's performance
mae = mean_absolute_error(actual_value, predicted_value)
r2 = r2_score(actual_value, predicted_value)

print('Mean Absolute Error: ', round(mae, 2))
print('R2 Score: ', round(r2, 5))

# Plot the actual vs predicted prices
plt.scatter(actual_value, predicted_value, alpha=0.7, edgecolors='k')
plt.xlabel('Actual')
plt.ylabel('Predicted')
plt.title('Actual vs Predicted Price')
plt.show()

# Define a function to clean the data
def clean_data(df):
    """
    Cleans the input DataFrame by performing the following steps:
    1. Drops unwanted columns.
    2. Handles missing values.
    3. Extracts numerical values from specific columns.
    4. Converts specific columns to appropriate data types.
    5. Rearranges columns.

    Args:
    df (pd.DataFrame): The input DataFrame to be cleaned.

    Returns:
    pd.DataFrame: The cleaned DataFrame.
    """
    columns_drop = [
        'description', 'Nearby School', 'Nearby Mall', 'Ad List', 'Category', 'Developer',
        'Address', 'Completion Year', 'Floor Range', 'Firm Type', 'Firm Number', 'REN Number',
        'Bus Stop', 'Mall', 'Park', 'School', 'Hospital', 'Highway', 'Railway Station',
        'Nearby Railway Station', '# of Floors', 'Total Units'
    ]
    df = df.drop(columns=columns_drop)
    df['Property Size_in_sq_ft'] = df['Property Size'].str[:-6].astype(int)
    df = df.drop(columns='Property Size')
    df['Amount of Facilities'] = df['Facilities'].apply(lambda x: 0 if x.strip() == '-' else len(x.split(',')))
    df = df.drop(columns='Facilities')
    df['Price_in_RM'] = df['price'].str[2:].replace(' ', '', regex=True).astype(int)
    df = df.drop(columns='price')
    df['Parking Lot'] = df['Parking Lot'].replace('-', '0', regex=True).astype(int)
    df = df[~df['Bathroom'].str.contains('-', na=False)]
    df = df[~df['Bedroom'].str.contains('-', na=False)]
    df[['Bedroom', 'Bathroom']] = df[['Bedroom', 'Bathroom']].astype(int)
    rearrange_columns = [
        'Building Name', 'Property Type', 'Property Size_in_sq_ft', 'Bedroom', 'Bathroom',
        'Amount of Facilities', 'Parking Lot', 'Land Title', 'Tenure Type', 'Price_in_RM'
    ]
    df = df[rearrange_columns]
    return df

# Unit Tests
class TestDataCleaning(unittest.TestCase):
    """Unit tests for data cleaning functions."""

    def setUp(self):
        self.df_raw = pd.read_csv('houses.csv')
        self.df_cleaned = clean_data(self.df_raw)

    def test_drop_unwanted_columns(self):
        df = self.df_raw.drop(columns=columns_drop)
        self.assertEqual(df.shape[1], self.df_raw.shape[1] - len(columns_drop))

    def test_extract_numerical_property_size(self):
        df = self.df_raw.copy()
        df['Property Size_in_sq_ft'] = df['Property Size'].str[:-6].astype(int)
        self.assertTrue(pd.api.types.is_integer_dtype(df['Property Size_in_sq_ft']))

    def test_count_facilities(self):
        df = self.df_raw.copy()
        df['Amount of Facilities'] = df['Facilities'].apply(lambda x: 0 if x.strip() == '-' else len(x.split(',')))
        self.assertTrue(pd.api.types.is_integer_dtype(df['Amount of Facilities']))

    def test_extract_numerical_price(self):
        df = self.df_raw.copy()
        df['Price_in_RM'] = df['price'].str[2:].replace(' ', '', regex=True).astype(int)
        self.assertTrue(pd.api.types.is_integer_dtype(df['Price_in_RM']))

    def test_replace_parking_lot(self):
        df = self.df_raw.copy()
        df['Parking Lot'] = df['Parking Lot'].replace('-', '0', regex=True).astype(int)
        self.assertTrue(pd.api.types.is_integer_dtype(df['Parking Lot']))

    def test_remove_invalid_bed_bath(self):
        df = self.df_raw.copy()
        df = df[~df['Bathroom'].str.contains('-', na=False)]
        df = df[~df['Bedroom'].str.contains('-', na=False)]
        self.assertFalse((df['Bathroom'] == '-').any())
        self.assertFalse((df['Bedroom'] == '-').any())

    def test_rearrange_columns(self):
        df = self.df_raw.copy()
        df = clean_data(df)
        expected_columns = [
            'Building Name', 'Property Type', 'Property Size_in_sq_ft', 'Bedroom', 'Bathroom',
            'Amount of Facilities', 'Parking Lot', 'Land Title', 'Tenure Type', 'Price_in_RM'
        ]
        self.assertEqual(list(df.columns), expected_columns)

class TestPipeline(unittest.TestCase):
    """Integration tests for the machine learning pipeline."""

    def test_pipeline_predict(self):
        X = df_cleaned.drop(columns='Price_in_RM')
        y = df_cleaned['Price_in_RM']

        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=45)

        categorical_columns = ['Property Type', 'Land Title', 'Tenure Type']
        numerical_columns = ['Property Size_in_sq_ft', 'Bedroom', 'Bathroom', 'Amount of Facilities', 'Parking Lot']

        categorical_transformer = OneHotEncoder(handle_unknown='ignore')
        numerical_transformer = StandardScaler()

        preprocessor = ColumnTransformer(
            transformers=[
                ('cat', categorical_transformer, categorical_columns),
                ('num', numerical_transformer, numerical_columns)
            ]
        )

        model = LinearRegression()

        my_pipeline = Pipeline(steps=[('preprocessor', preprocessor), ('model', model)])

        my_pipeline.fit(X_train, y_train)

        predicted_value = my_pipeline.predict(X_test)

        self.assertEqual(predicted_value.shape, y_test.shape)
        self.assertIsInstance(predicted_value, np.ndarray)
        self.assertTrue((predicted_value > 0).all())

# Run the tests
unittest.main(argv=[''], verbosity=2, exit=False)
