In [1]:
import numpy as np
from itertools import combinations, count
from numpy.linalg import det, inv, norm
from dsci_project_assignment import Gradient_Descent
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import QuantileTransformer, MinMaxScaler
import scipy.stats as ss
from scipy.stats import multivariate_normal as multi_n
from scipy.stats import uniform as uni
import pandas as pd

In [2]:
rng = np.random.default_rng(seed=0)
pd.set_option('display.max_columns', 20)
# Fetching Data:
data  = fetch_california_housing(as_frame=True)['frame'].copy()
data = np.c_[np.ones((data.shape[0],1)),data.to_numpy()]
minmax = MinMaxScaler()
transformer = QuantileTransformer(n_quantiles=5000, output_distribution='uniform', random_state=0)
data[:,:-1] =transformer.fit_transform(data[:,:-1],data[:,-1])

# Train/Val/Test Split:
val = train_test_split(data[:,:-1],data[:,-1], train_size=0.8, random_state=0)


# Kernel:
def kernel(data: np.ndarray):
    rows,columns= data.shape
    K = np.zeros((columns,columns))
    for i in range(columns):
        for j in range(columns):
            K[i,j] = np.exp((-1/2)*norm(data[:,i]-data[:, j])**2)
    return K
def gauss_pdf(x, mu, sig):
    exp = np.exp(-pow(x-mu, 2)/(2*pow(sig,2)))
    return 1/(np.sqrt(2*np.pi)*sig)*exp
def gaussian_kernel(u):
    '''
    Kernel Density Function
    '''
    return 1/(np.sqrt(2*np.pi))*np.exp(-0.5*pow(u,2))
def kde(x, data, bandwidth):
    n = len(data)
    estimate = 0
    for xi in data:
        estimate += gaussian_kernel((x-xi)/bandwidth)
    return estimate/ (n*bandwidth)
def weight_prior(weights: np.ndarray, dims: int, mean: np.ndarray[float], kernel: np.ndarray[float]):
    return multi_n.logpdf(weights, mean=mean, cov=kernel)
# Multivariate Posterior 
def observe_variance(y_pred: np.ndarray, y_true: np.ndarray, dim: int):
    n = len(y_pred)
    residuals = np.square(y_true-y_pred).mean()
    return residuals
def simp_three_eighth(f: callable, bounds: list, n: int):
    if n%3 != 0:
        raise ValueError('n must be divisible by 3')
    a, b = bounds
    step_size = (b-a)/n
    fa = f(a)
    fb = f(b)
    sum = 0
    for i in range(1, n):
        multiplier = 2 if i%3 else 3
        sum += multiplier*f(a+i*step_size)
    sum *= (3/8)*step_size
    return sum

# def maximum_likelihood(data: np.ndarray, y_true: np.ndarray,weights:np.ndarray) -> tuple[float]:
#     rows, columns = data.shape
#     y_pred = data.dot(weights)
#     obs_var = np.square(y_pred-y_true).mean()
#     lhood = np.sum(np.log((1/np.sqrt(2*np.pi*obs_var))*np.exp(-np.square(y_true-y_pred)/(2*obs_var))))
#     return lhood, obs_var
def multi_likelihood(y_pred: np.ndarray, y_true: list, obs_var: float):
    '''
    Returns p(y|X,w)
    '''
    post  = np.sum(ss.norm.logpdf(y_true, loc=y_pred, scale=obs_var))
    return post
def marginal_likelihood(data: np.ndarray, y_true: np.ndarray, obs_var: float, dims: int, w_samp: int, w_mean: np.ndarray, w_var: np.ndarray):
    '''
    Novice attempt at approximating the marginal likelihood.
    Returns p(y|X): summing over the weights
    **This approach requires mcmc for better accuracy; thus, don't use this callable in practice.**
    '''
    n = len(data)
    weight_samp = multi_n.rvs(mean=w_mean, cov=w_var, size=w_samp)
    marg = -np.inf
    for w in weight_samp:
        # print(w)
        marg = np.logaddexp(marg, multi_likelihood(data.dot(w), y_true, obs_var) + weight_prior(w, dims, w_mean, w_var))
    return marg - np.log(w_samp)

def alt_weight_post(data: np.ndarray, y: np.ndarray, w: np.ndarray, obs_var: float, w_var: float):
    sigma_n = np.linalg.inv(data.T.dot(data)/obs_var + np.linalg.inv(w_var))
    wn = np.dot(sigma_n, np.dot(data.T, y)/obs_var +np.dot(np.linalg.inv(w_var), w))
    return wn, sigma_n


def mcmc(y_true, y_pred, obs_var, dims):
    weight_i1 = multi_n.rvs(mean=np.zeros((dims, )), cov=np.eye(dims))
    weight = []
    for i in count():
        weight_i2 = uni.rvs(loc = np.zeros((dims,)), scale=1)
        prior_i1 = multi_n.pdf()
        lhood_i = multi_likelihood(y_pred, y_true,obs_var)
    pass 


transformer = QuantileTransformer(n_quantiles=5000, output_distribution='normal', random_state=0)

# Scaled Training Data:
transformer.fit(val[0])
XS = transformer.transform(val[0])

# # normali = lambda x: (x-x.min())/(x.max()-x.min())
# # XS = val[0].copy()
# # Performing Gradient Descent:
# gd = Gradient_Descent(XS, val[2], 1e-10)
# newtheta,*_ = gd.fit(1e-3)

# w, obs_var = maximum_likelihood(XS, val[2], np.zeros((9,)), np.eye(9))
# wn, sigma_n = alt_weight_post(XS, val[2], w, obs_var, np.eye(9))

# w_var = np.eye(9)
# w_mean = np.zeros((9,))
# weight_set = list()
# train_var = list()
# for i in range(1000):
#     w, obs_var = maximum_likelihood(XS, val[2],w_mean,w_var)
#     train_var.append(obs_var)
#     wn, sigma_n = alt_weight_post(XS, val[2], w, obs_var, w_var)
#     w_var = sigma_n
#     w_mean = wn
#     weight_set.append(w)  

# print(f'Initial Weights:{weight_set[0]} with model variance: {np.square(XS.dot(weight_set[0])-val[2]).mean()}')
# best_var = min(train_var)
# best_param = weight_set[train_var.index(best_var)]
# print(f' Last Weight:{best_param} with model variance:{np.square(XS.dot(best_param)-val[2]).mean()}')
# prior_w = multi_n.logpdf(newtheta, mean=np.zeros((9,)), cov=np.eye(9))
# XBIAS = np.c_[np.ones((XS.shape[0],1)), XS]
# print(gd.loglikelihood(8))
# sigma2 = observe_variance(XBIAS.dot(newtheta), val[2].to_numpy(), 9)
# w_mean, w_vari = alt_weight_post(XBIAS, val[2].to_numpy(), newtheta, sigma2, np.eye(9))
# print(multi_n.pdf(XBIAS,mean=w_mean, cov=w_vari))
# print(f'Observed Variance:{sigma2}')
# print(f'Likelihood:{multi_likelihood(XBIAS.dot(newtheta), val[2].to_numpy(), sigma2)}')
# print(f'Marginal Likelihood:{marginal_likelihood(XBIAS, val[2].to_numpy(), sigma2, 9,w_samp=5000, w_mean=np.zeros((9,)),w_var=np.eye(9))}')
# print(f'Exponential Probability of Parameters:{(multi_likelihood(XBIAS.dot(newtheta), val[2].to_numpy(), sigma2)+prior_w)-(marginal_likelihood(XBIAS, val[2].to_numpy(), sigma2, 9,w_samp=5000, w_mean=np.zeros((9,)),w_var=np.eye(9)))}')
# print(kernel(XBIAS.dot(newtheta)))
# post_y = multi_likelihood(XBIAS.dot(newtheta), val[2].to_numpy(), sigma2, 8)

In [None]:
weights = np.random.randn(1, val[0].shape[1])
weights

In [6]:
def dependent_likelihood(data, y_true, ideal_error):
    '''
    This function attempts to find all such weights (w) that
    likely explains output distribution. I wish to compute the marginal 
    likelihood.
    p(w|X,y)
    '''

    # w = multi_n.rvs(mean=np.zeros((data.shape[1],)), cov=np.eye(data.shape[1]),size=1, random_state=0)
    w = ss.uniform.rvs(loc=np.zeros((data.shape[1],)), scale=2*np.eye(data.shape[1]), random_state=0)[:,0]
    wl = [w]
    tot,accepted = 0,0
    scale = np.eye(data.shape[1])
    # pot_prob = []
    y_pred = data.dot(wl[-1])
    pred_error = np.mean(np.square(y_true-y_pred))
    while np.isclose(pred_error,ideal_error, atol=1e-5) != True:
        candidate = multi_n.rvs(mean=wl[-1], cov=scale,size=1)
        # print(candidate)
        y_pred = data.dot(wl[-1])
        error = np.mean(np.square(y_true-y_pred))
        llhood_w = multi_likelihood(y_true,y_pred, error)

        y_pred_u = data.dot(candidate)
        error_u = np.mean(np.square(y_true-y_pred_u))
        llhood_u = multi_likelihood(y_true, y_pred, error_u)
        log_prob = llhood_u/llhood_w
        # print(log_prob)
        # if np.random.random()<log_prob:
        if log_prob<=1 and log_prob>0:
            accepted+=1
            wl.append(candidate)
            print(candidate)
            # wl.pop(0) if tot>6 else None
            # scale = kernel(np.vstack((wl[-5:-1], wl[-1]))) if tot>3 else np.eye(data.shape[1])
            # print(scale)
            y_pred = data.dot(wl[-1])
            pred_error = np.mean(np.square(y_true-y_pred))
            print(f'The Prediction error of accepted sample:\t{pred_error} and acceptance log prob.:\t{log_prob}')
            
            print(f'Percent Accepted:\t{accepted/(tot+1)}')
        else:
            # wl.append(wl[-1])
            # scale = kernel(np.vstack((wl[-2:-1], wl[-1]))) if tot>3 else np.eye(data.shape[1])
            # print(f'Percent Accepted:\t{accepted/tot}') if tot%20_000 else None
            pass
            # wl.pop()
            # wl.append(wl[-1])
            # tvec = np.vstack((wl[-2], wl[-1])) if len(wl) == 2 else np.vstack((wl[-3:-1], wl[-1]))
            # scale.append(kernel(tvec))
            # y_pred = data.dot(wl[-1])
            # pred_error = np.mean(np.square(y_true-y_pred))
            # print(f'Percent Accepted:\t{accepted/len(wl)}') if tot%10_000 else None
            # print(f'The Prediction error of unaccepted sample:\t{pred_error} and acceptance prob.:\t{log_prob}')  
        tot+=1
        
    return wl

In [7]:
wl = dependent_likelihood(val[0], val[2], 1)

[ 1.15242834 -0.09459476 -0.23005919 -0.26354219  0.29830636 -0.12961158
  1.40022662  0.10563053  0.84633351]
The Prediction error of accepted sample:	3.2690319900333296 and acceptance log prob.:	0.864997254393169
Percent Accepted:	0.5
[ 0.99023014  0.4889536  -0.57220481 -2.26176059  1.61954746  0.55449242
  1.99575642  1.02393797  0.68065436]
The Prediction error of accepted sample:	3.150645950880551 and acceptance log prob.:	0.9888422352962656
Percent Accepted:	0.4
[ 0.41651116  0.85969267 -0.84296088 -1.92362344  2.59573082 -0.47378799
  0.99601799  1.9290039   1.09717308]
The Prediction error of accepted sample:	2.680744967440082 and acceptance log prob.:	0.9546111421057849
Percent Accepted:	0.1875
[-0.10310646  1.85555749 -0.46828733 -1.99276808  2.43067528 -0.2951857
  1.05531214  0.73367109  0.59487767]
The Prediction error of accepted sample:	1.8901770672183504 and acceptance log prob.:	0.9231306007045597
Percent Accepted:	0.07692307692307693
[-0.34506033  4.29335156 -1.16304

KeyboardInterrupt: 

In [None]:
print(np.square(val[0].dot(wl[-1])-val[2]).mean())
g = multi_n.rvs(mean=np.mean(np.array(wl), axis=0), cov=kernel(np.array(wl)))
print(np.square(val[0].dot(g)-val[2]).mean())

In [None]:
print(np.square(val[1].dot(g)-val[3]).mean())

In [None]:
X = np.c_[np.random.normal(loc=0, scale=1, size=(1000, 3)), np.ones(shape=(1000,))]
weight = np.random.randn(4,1)
print(f'Model Weights:\t{weight}')
y = X.dot(weight)

In [None]:
weights = dependent_likelihood(X, y)
[np.mean(np.square(y - X.dot(weights[i]))) for i in range(len(weights))]

In [None]:
weights

In [None]:
weight