# Market Value Prediction Model
---

This model will use the usa-real-estate dataset of data scraped from `realtor.com`.

## Imports
---

In [None]:
import pandas as pd;
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import utils;
from sklearn.preprocessing import LabelEncoder;
from sklearn.model_selection import train_test_split;
import torch;
from torch import nn;
from torch.utils.data import DataLoader, Dataset;
from tqdm.auto import tqdm;
from typing import Tuple
import torch.onnx;

DEVICE: str = "cuda" if torch.cuda.is_available() else "cpu";
DEVICE

## Loading Data
---

In [None]:
df = pd.read_csv("realtor-data.zip.csv");
df.head(5), df.columns

## Data Pre-Processing
---

in this section, I have a lot of bad data, I will have two renditions of my dataframe, one with missing data replaced by the median, one replaced by the mean, and one with missing data fully removed. They will be placed in `whole_df`.

In [None]:
dataset_outlier_mode: str = "median"; # median | stripped | mean

# Only operating on listings for sale
whole_df = df[df['status'] != "sold"];

# Street encoded value cannot be reproduced when given new data.
# prev_sold_date useless, not a time series problem.
whole_df = whole_df.drop(columns=['street', 'status', 'prev_sold_date', 'city', 'brokered_by'], axis=1);

# Before Cleaning
utils.dataFrameStatus(whole_df)

if dataset_outlier_mode == "mean":
    whole_df['bed'] = whole_df['bed'].fillna(whole_df['bed'].mean());
    whole_df['bath'] = whole_df['bath'].fillna(whole_df['bath'].mean());
    whole_df['acre_lot'] = whole_df['acre_lot'].fillna(whole_df['acre_lot'].mean());
    whole_df['house_size'] = whole_df['house_size'].fillna(whole_df['house_size'].mean());
elif dataset_outlier_mode == "median":
    whole_df['bed'] = whole_df['bed'].fillna(whole_df['bed'].median());
    whole_df['bath'] = whole_df['bath'].fillna(whole_df['bath'].median());
    whole_df['acre_lot'] = whole_df['acre_lot'].fillna(whole_df['acre_lot'].median());
    whole_df['house_size'] = whole_df['house_size'].fillna(whole_df['house_size'].median());

whole_df = whole_df.dropna();

state_map = {
    'Puerto Rico': 'PR',
    'Virgin Islands': 'VI',
    'Massachusetts': 'MA',
    'Connecticut': 'CT',
    'New Jersey': 'NJ',
    'New York': 'NY',
    'New Hampshire': 'NH',
    'Vermont': 'VT',
    'Rhode Island': 'RI',
    'Wyoming': 'WY',
    'Maine': 'ME',
    'Pennsylvania': 'PA',
    'West Virginia': 'WV',
    'Delaware': 'DE',
    'Ohio': 'OH',
    'Maryland': 'MD',
    'Virginia': 'VA',
    'Colorado': 'CO',
    'District of Columbia': 'DC',
    'North Carolina': 'NC',
    'Kentucky': 'KY',
    'South Carolina': 'SC',
    'Tennessee': 'TN',
    'Georgia': 'GA',
    'Alabama': 'AL',
    'Florida': 'FL',
    'Mississippi': 'MS',
    'Texas': 'TX',
    'Missouri': 'MO',
    'Arkansas': 'AR',
    'Louisiana': 'LA',
    'Indiana': 'IN',
    'Illinois': 'IL',
    'Michigan': 'MI',
    'Wisconsin': 'WI',
    'Iowa': 'IA',
    'Minnesota': 'MN',
    'South Dakota': 'SD',
    'Nebraska': 'NE',
    'North Dakota': 'ND',
    'Montana': 'MT',
    'Idaho': 'ID',
    'Kansas': 'KS',
    'Oklahoma': 'OK',
    'New Mexico': 'NM',
    'Utah': 'UT',
    'Nevada': 'NV',
    'Washington': 'WA',
    'Oregon': 'OR',
    'Arizona': 'AZ',
    'California': 'CA',
    'Hawaii': 'HI',
    'Guam': 'GU',
    'Alaska': 'AK'
}

whole_df['state'] = whole_df['state'].map(state_map);

le = LabelEncoder();
whole_df['state'] = le.fit_transform(whole_df['state']);

# After Cleaning
print("\nAfter Cleaning\n");
utils.dataFrameStatus(whole_df)
len(whole_df), whole_df.head(5)


## Splitting Data
---

In [None]:
BATCH_SIZE: int = 32;

class CustomDataset(Dataset):
    def __init__(self, df: pd.DataFrame) -> None:
        super().__init__()
        self.df = df

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):
        col_names = [col for col in self.df.columns if col != "price"]
        features = torch.tensor(self.df.iloc[index][col_names].values, dtype=torch.float32)
        label = torch.tensor(self.df.iloc[index]['price'], dtype=torch.float32)

        return features, label;


train_df, test_df = train_test_split(whole_df, test_size=0.2, train_size=0.8);

train_dataset = CustomDataset(train_df);
test_dataset = CustomDataset(test_df);

train_loader = DataLoader(
    dataset=train_dataset, 
    batch_size=BATCH_SIZE, 
    shuffle=True);

test_loader = DataLoader(
    dataset=test_dataset, 
    batch_size=BATCH_SIZE, 
    shuffle=False);

next(iter(train_loader)), next(iter(test_loader))

## Model Architecture
---

In [None]:
class LinearBaseline(nn.Module):
    def __init__(self, input_features, num_blocks, hidden_units=64, dropout=0.3) -> None:
        super().__init__();

        self.num_blocks = num_blocks;

        self.input = nn.Linear(in_features=input_features, out_features=hidden_units);

        self.block = nn.Sequential(
            nn.Linear(in_features=hidden_units, out_features=hidden_units),
            nn.ReLU(),
            nn.BatchNorm1d(hidden_units),
            nn.Dropout(dropout)
        );
    
        self.classifier = nn.Linear(in_features=hidden_units, out_features=1);

    def forward(self, x: torch.Tensor):
        x = self.input(x);
        for _ in range(self.num_blocks):
            x = self.block(x);

        return self.classifier(x);

model = LinearBaseline(hidden_units=256,
                       dropout=0.3,
                       input_features=len(next(iter(train_loader))[0][0]),
                       num_blocks=3).to(DEVICE);

model.state_dict()

# Train Step
---

In [None]:
def train_step(
    model: LinearBaseline,
    loader: DataLoader,
    loss_fn: nn.Module,
    optim: torch.optim.Optimizer) -> Tuple[float, float]:

    running_loss = 0.0;
    running_acc = 0.0;
    total_samples = 0;
    
    for X, y in tqdm(loader):
        X: torch.Tensor = X.to(DEVICE);
        y: torch.Tensor = y.to(DEVICE);

        # Ensure model output matches target shape
        outputs: torch.Tensor = model(X).squeeze(-1);

        # Calculate loss only if shapes match
        if outputs.shape == y.shape:
            loss = loss_fn(outputs, y);
            
            loss.backward();
            optim.step();

            running_loss += loss.item();
            running_acc += utils.batchAccuracy(outputs, y);

            total_samples += len(y);

    # Return average loss per sample
    return running_loss / total_samples if total_samples > 0 else 0.0, running_acc / total_samples if total_samples > 0 else 0.0;


## Test Step
---

In [None]:
def test_step(
    model: LinearBaseline,
    loader: DataLoader,
    loss_fn: nn.Module) -> Tuple[float, float]:
    
    running_loss = 0.0;
    running_acc = 0.0;
    total_samples = 0;
    
    for X, y in tqdm(loader):
        X: torch.Tensor = X.to(DEVICE);
        y: torch.Tensor = y.to(DEVICE);

        # Ensure model output matches target shape
        outputs: torch.Tensor = model(X).squeeze(-1);

        # Calculate loss only if shapes match
        if outputs.shape == y.shape:
            loss = loss_fn(outputs, y);
            running_loss += loss.item();
            running_acc += utils.batchAccuracy(outputs, y);
            total_samples += len(y);

    # Return average loss per sample
    return running_loss / total_samples if total_samples > 0 else 0.0, running_acc / total_samples if total_samples > 0 else 0.0;

## Training Model
---

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2);
loss_fn = torch.nn.MSELoss();
EPOCHS = 25;

# loop over the dataset multiple times
for epoch in tqdm(range(EPOCHS)):
    
    model.train();
    train_loss, train_acc = train_step(
        loader=train_loader,
        loss_fn=loss_fn,
        model=model,
        optim=optimizer,
    );

    with torch.inference_mode():
        model.eval();
        test_loss, test_acc = test_step(
            loader=test_loader,
            loss_fn=loss_fn,
            model=model,
        );

    print(f"Training Accuracy: {train_acc:.2f}%, Test Accuracy: {test_acc:.2f}%");
    print(f"Epoch {epoch+1}/{EPOCHS}");
    print(f"Training Loss: {train_loss:.4f}%, Test Loss: {test_loss:.4f}%");

print('Finished Training')

## Saving the Model
---

In [None]:
dummy_input = torch.randn(6);
torch.onnx.export(model,
                  dummy_input,
                  "model.onnx",
                  opset_version=11)