In [None]:
# Standard library imports
import os

# Scientific computing libraries
import pandas as pd
import numpy as np

# Data visualization libraries
import matplotlib.pyplot as plt
import seaborn as sns

# Machine learning libraries
from sklearn.preprocessing import LabelEncoder, MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score

# Time series analysis libraries
import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.stats.outliers_influence import variance_inflation_factor

# Deep learning libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from torchvision.models import models

# Utility libraries
import optuna # Hyperparameter optimization
import tqdm  # Progress bar visualization

# Custom libraries
from effKAN import KAN

In [None]:
#read car data
data = pd.read_csv('./data/car_data.csv')

In [None]:
data.head()

In [None]:
data.info()

In [None]:
#check for missing values
data.isnull().sum()

In [None]:
data = data.fillna('0')

# Graph

## Univariable Analysis

In [None]:
sampled_data = data.sample(frac=0.01, random_state=42)

object_columns = sampled_data.select_dtypes(include=['object']).columns
object_columns = object_columns.drop(['Car_id','Customer Name','Date'])

if not os.path.exists('./graph/univariate'):
    os.makedirs('./graph/univariate')

# For Date Count
sales_counts = sampled_data['Date'].value_counts().sort_index().reset_index()
sales_counts.columns = ['Date', 'Sales']
plt.figure(figsize=(15, 5))
sns.barplot(x='Date', y='Sales', data=sales_counts, hue='Date')
plt.title('Sales Count by Date')
plt.xticks([])
plt.savefig('./graph/univariate/sales_count_by_date.png')
plt.show()

for i in range(len(object_columns)):
    plt.figure(figsize=(15, 5))
    sns.countplot(x=object_columns[i], data=sampled_data, hue=object_columns[i])
    plt.title(f'Count Plot for {object_columns[i]}')
    plt.xlabel(object_columns[i])
    plt.ylabel('Count')
    plt.xticks(rotation=90)
    plt.savefig(f'./graph/univariate/{object_columns[i]}_countplot.png')

plt.show()

In [None]:
plt.figure(figsize=(15, 5))
sns.kdeplot(x=sampled_data['Annual Income'], data=sampled_data)
plt.title(f'KDE Plot for Annual Income')
plt.xlabel('Annual Income')
plt.ylabel('KDE')
plt.xticks(rotation=90)
plt.savefig(f'./graph/univariate/Annual_Income_KDEplot.png')
plt.show()

## Bivariable Analysis

In [None]:
if not os.path.exists('./graph/bivariate'):
    os.makedirs('./graph/bivariate')

for i in range(len(object_columns)):
    plt.figure(figsize=(15, 5))
    sns.violinplot(x=object_columns[i], y='Price ($)', data=sampled_data, hue=object_columns[i])
    plt.title(f'Violin Plot for {object_columns[i]} vs Price')
    plt.xlabel(object_columns[i])
    plt.ylabel('Price')
    plt.xticks(rotation=90)
    plt.savefig(f'./graph/bivariate/{object_columns[i]}_vs_price_violinplot.png')

plt.show()


In [None]:
plt.figure(figsize=(15, 5))
sns.boxplot(x=sampled_data['Annual Income'], data=sampled_data)
plt.title(f'KDE Plot for Annual Income')
plt.xlabel('Annual Income')
plt.ylabel('KDE')
plt.xticks(rotation=90)
plt.savefig(f'./graph/bivariate/Annual_Income_KDE_plot.png')

plt.show()

## Multi-variate Analysis

In [None]:
if not os.path.exists('./graph/multivariate'):
    os.makedirs('./graph/multivariate')

sns.pairplot(sampled_data)
plt.savefig('./graph/multivariate/pairplot.png')
plt.show()

# Convert, Encode, Normalization

## Convert `Date` to independent variable

In [None]:
data['Date'] = pd.to_datetime(data['Date'])
data['Year'] = data['Date'].dt.year
data['Month'] = data['Date'].dt.month
data['Day'] = data['Date'].dt.day

data = data.drop(['Date'], axis=1)

## Encoding

In [None]:
data = data.drop(['Car_id','Customer Name'], axis=1)

In [None]:
object_columns = data.select_dtypes(include=['object']).columns

le = LabelEncoder()
for i in range(len(object_columns)):
    data[object_columns[i]] = le.fit_transform(data[object_columns[i]])

## Normalizing

In [None]:
num_columns = data.select_dtypes(include=['int64', 'float64', 'int32']).columns
num_columns = num_columns.drop(['Price ($)'])

scaler = MinMaxScaler()
for i in range(len(num_columns)):
    data[num_columns[i]] = scaler.fit_transform(data[num_columns[i]].values.reshape(-1, 1))

# Features Analysis

In [None]:
data.head()

In [None]:
summary_stats = data.describe()

print("Summary of Statistics:")
summary_stats

In [None]:
# Skewness and kurtosis
skewness = data.skew()
kurtosis = data.kurtosis()
# Display skewness and kurtosis values
print("\nSkewness:")
print(skewness)
print("\nKurtosis:")
print(kurtosis)

In [None]:
# Correlation matrix
correlation_matrix = data.corr()

# Correlation heatmap
plt.figure(figsize=(20, 10))
sns.heatmap(correlation_matrix, annot=True, cmap="coolwarm", linewidths=0.5, fmt = ".3f")
plt.title("Correlation Heatmap")
plt.show()

In [None]:
# Calculate Multicollinearity
y = data.drop(["Price ($)"], axis =1)
X = sm.add_constant(y)

# Calculate VIF for each variable
vif = pd.DataFrame()
vif["variable"] = X.columns
vif["VIF"] = [variance_inflation_factor(X.values, i) for i in range(X.shape[1])]

print(vif)

In [None]:
high_vif_variables = vif[vif["VIF"] >= 5]["variable"]
regression_data = X.drop(high_vif_variables, axis=1)

regression_data.info()

# Regression

## KAN

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

X = regression_data
y = data['Price ($)']

train_X, test_X, train_y, test_y = train_test_split(X, y, test_size=0.2, random_state=42)

train_X = torch.tensor(train_X.values, dtype=torch.float32).to(device)
train_y = torch.tensor(train_y.values, dtype=torch.float32).view(-1, 1).to(device)
test_X = torch.tensor(test_X.values, dtype=torch.float32).to(device)
test_y = torch.tensor(test_y.values, dtype=torch.float32).view(-1, 1).to(device)

trainset = TensorDataset(train_X, train_y)
trainloader = DataLoader(trainset, batch_size=512, shuffle=True)

def objective(trial):
    learning_rate = trial.suggest_float("learning_rate", 1e-5, 1e-1, log=True)
    weight_decay = trial.suggest_float("weight_decay", 1e-5, 1e-2, log=True)
    hidden_layers = trial.suggest_int("n_hidden_layers", 2, 5)
    hidden_units = trial.suggest_int("hidden_units", 2, 64)

    model = KAN([12] + [hidden_units] * hidden_layers + [1])
    model.to(torch.float32).to(device)

    optimizer = optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay, foreach=False)
    criterion = nn.MSELoss()

    train_loss = 0
    model.train()
    with tqdm(trainloader, desc="Training") as pbar:
        for _, (X, y) in enumerate(pbar):
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            output = model(X)
            loss = criterion(output, y)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

    return train_loss


study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=100)

best_trial = study.best_trial
print("Best Learning Rate:", best_trial.params["learning_rate"])
print("Best Weight Decay:", best_trial.params["weight_decay"])
print("Best Number of Hidden Layers:", best_trial.params["n_hidden_layers"])
print("Best Hidden Units:", best_trial.params["hidden_units"])

In [None]:
best_learning_rate = best_trial.params["learning_rate"]
best_weight_decay = best_trial.params["weight_decay"]
best_hidden_layers = best_trial.params["n_hidden_layers"]
best_hidden_units = best_trial.params["hidden_units"]

model = KAN([12] + [best_hidden_units] * best_hidden_layers + [1])
model.to(torch.float32).to(device)

optimizer = optim.AdamW(model.parameters(), lr=best_learning_rate, weight_decay=best_weight_decay, foreach=False)
criterion = nn.MSELoss()

epochs = 100
train_losses = []

for epoch in range(epochs):
    model.train()
    train_loss = 0
    with tqdm(trainloader, desc=f"Epoch {epoch+1}/{epochs}") as pbar:
        for i, (X, y) in enumerate(pbar):
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            output = model(X)
            loss = criterion(output, y)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            pbar.set_postfix(loss=loss.item())
    train_losses.append(loss.item())

In [None]:
plt.figure(figsize=(15, 5))
plt.plot(train_losses)
plt.title("Training Loss")

model.eval()

with torch.no_grad():
    y_pred = model(test_X).cpu().numpy()

mse = mean_squared_error(test_y.cpu(), y_pred)
r2 = r2_score(test_y.cpu(), y_pred)

print(f"Mean Squared Error: {mse/len(test_y)}")
print(f"R2 Score: {r2}")

## ResNet

In [None]:
class ResNet(nn.Module):
    def __init__(self, num_classes):
        super(ResNet, self).__init__()
        self.num_classes = num_classes
        
        self.resnet = models.resnet18()

        self.resnet.conv1 = nn.Conv2d(self.num_classes, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.resnet.fc = nn.Linear(512, 1)

        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = x.view(-1, self.num_classes, 1, 1)
        x = self.resnet(x)
        return x

In [None]:
def objective(trial):
    learning_rate = trial.suggest_float("learning_rate", 1e-5, 1e-1, log=True)
    weight_decay = trial.suggest_float("weight_decay", 1e-5, 1e-2, log=True)

    model = ResNet(12)
    model.to(torch.float32).to(device)

    optimizer = optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay, foreach=False)
    criterion = nn.MSELoss()

    train_loss = 0
    model.train()
    with tqdm(trainloader, desc="Training") as pbar:
        for _, (X, y) in enumerate(pbar):
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            output = model(X)
            loss = criterion(output, y)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

    return train_loss


study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=100)

best_trial = study.best_trial
print("Best Learning Rate:", best_trial.params["learning_rate"])
print("Best Weight Decay:", best_trial.params["weight_decay"])

In [None]:
best_learning_rate = best_trial.params["learning_rate"]
best_weight_decay = best_trial.params["weight_decay"]

model = ResNet(12)
model.to(device)
model.float()

optimizer = optim.AdamW(model.parameters(), lr=best_learning_rate, weight_decay=best_weight_decay, foreach=False)
criterion = nn.MSELoss()

epochs = 100
train_losses = []

for epoch in range(epochs):
    model.train()
    train_loss = 0
    with tqdm(trainloader, desc=f"Epoch {epoch+1}/{epochs}") as pbar:
        for i, (X, y) in enumerate(pbar):
            X, y  = X.to(device), y.to(device)
            optimizer.zero_grad()
            output = model(X)
            loss = criterion(output, y)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            pbar.set_postfix(loss=loss.item())
    train_losses.append(loss.item())

In [None]:
plt.figure(figsize=(15, 5))
plt.plot(train_losses)
plt.title("Training Loss")

model.eval()

with torch.no_grad():
    y_pred = model(test_X).cpu().numpy()

mse = mean_squared_error(test_y.cpu(), y_pred)
r2 = r2_score(test_y.cpu(), y_pred)

print(f"Mean Squared Error: {mse/len(test_y)}")
print(f"R2 Score: {r2}")

# Linear Sequence Analysis

In [None]:
df = pd.read_csv('./data/car_data.csv')
df['Date'] = pd.to_datetime(df['Date'], format='%m/%d/%Y')
df.head()

In [None]:
new_df = df.loc[:, ['Date', 'Price ($)']]
new_df.head()

In [None]:
new_df.set_index('Date', inplace = True)
data_monthly_mean = new_df.resample('M').mean()

In [None]:
# Moving Average 
# Calculate Simple Moving Average (SMA)
sma_period = 10
new_df['SMA'] = data_monthly_mean['Price ($)'].rolling(window=sma_period).mean().reindex(new_df.index, method='ffill')

# Calculate Exponential Moving Average (EMA)
ema_period = 10
data_monthly_mean['EMA'] = data_monthly_mean['Price ($)'].ewm(span=ema_period, adjust=False).mean()
new_df['EMA'] = data_monthly_mean['EMA'].reindex(new_df.index, method='ffill')

# Calculate Cummulative Moving Average (CMA)
new_df['CMA'] = data_monthly_mean['Price ($)'].expanding(min_periods=1).mean().reindex(new_df.index, method='ffill')

# Calculate Weighted Moving Average (WMA)
wma_period = 10 
weights = pd.Series(range(1, wma_period + 1))
def weighted_moving_average(prices):
    return np.dot(prices, weights) / weights.sum()

new_df['WMA'] = data_monthly_mean['Price ($)'].rolling(window=wma_period).apply(weighted_moving_average, raw=True).reindex(new_df.index, method='ffill')

In [None]:
plt.figure(figsize = (10, 5))
plt.plot(data_monthly_mean['Price ($)'], label = 'Price')
plt.plot(new_df['SMA'], label = 'SMA')
plt.plot(new_df['EMA'], label = 'EMA')
plt.plot(new_df['CMA'], label = 'CMA')
plt.plot(new_df['WMA'], label = 'WMA')
plt.title('Moving Averages for Monthly Mean Total Price ($)')
plt.xlabel('Year')
plt.ylabel('Price ($)')
plt.legend()
plt.show()

In [None]:
def adf_test(series):
    result = adfuller(series, autolag = 'AIC')
    print(f'ADF Statistics: {result[0]}')
    print(f'p-value: {result[1]}')
    print(f'Critical values: {result[4]}')

In [None]:
print("Original Data ADF Test:")
adf_test(data_monthly_mean['Price ($)'])

In [None]:
plt.figure(figsize = (14, 8))

# ACF plot
plt.subplot(2, 1, 1)
plot_acf(data_monthly_mean['Price ($)'], lags = 10, ax = plt.gca())
plt.title('Autocorrelated Function (ACF)')

# PACF plot
plt.subplot(2, 1, 2)
plot_pacf(data_monthly_mean['Price ($)'], lags = 10, ax = plt.gca())
plt.title('Partial Autocorrelated Function (PACF)')
plt.tight_layout()
plt.show()

In [None]:
from statsmodels.tsa.stattools import acf, pacf
acf_values = acf(data_monthly_mean['Price ($)'], nlags = 10)
pacf_values = pacf(data_monthly_mean['Price ($)'], nlags = 10)

In [None]:
# Calculate the 95% confidence interval threshold
n = len(data_monthly_mean['Price ($)'])
threshold = 1.96/np.sqrt(n)

# Count significant values for p and q
significant_p_values = sum(abs(pacf_values[1:]) > threshold)
significant_q_values = sum(abs(acf_values[1:]) > threshold)

print(f"Number significant p values: {significant_p_values}")
print(f"Number significant q values: {significant_q_values}")

In [None]:
p = 2
d = 0 
q = 1

In [None]:
model = ARIMA(data_monthly_mean['Price ($)'], order = (p, d, q))
results = model.fit()
print(results.summary())