In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import time
%matplotlib notebook
import warnings
warnings.filterwarnings('ignore')
from tqdm import tqdm_notebook
import cv2

# Import image

In [13]:
import glob

start = time.time()

LR = [cv2.imread(file).astype(np.float32) for file in glob.glob("./train_set/LR/*.jpg")]
HR = [cv2.imread(file).astype(np.float32) for file in glob.glob("./train_set/HR/*.jpg")]
end = time.time()
print('Time:',end - start)

Time: 10.034084558486938


###### Metric

In [14]:
import math

def MSE(y_true,y_predict):
    temp = (y_true - y_predict)**2
    return sum(temp.reshape(-1))/(temp.shape[0]*temp.shape[1]*temp.shape[2])

def psnr(y_true,y_predict):
    temp = (y_true - y_predict)**2
    mse = sum(temp.reshape(-1))/(temp.shape[0]*temp.shape[1]*temp.shape[2])
    MAXI = 255
    return 20*math.log10(MAXI)-10*math.log10(mse)

# Use traditional method

In [15]:
LR_original = [cv2.imread(file) for file in glob.glob("./train_set/LR/*.jpg")]

LR_to_HR = [cv2.resize(lr, (lr.shape[1]*2, lr.shape[0]*2), interpolation = cv2.INTER_NEAREST) for lr in LR_original] 

In [16]:
start = time.time()

m = []
for i in np.arange(1500):
    m.append(psnr(HR[i],LR_to_HR[i]))

end = time.time()
print('psnr:',np.mean(m))
print('Time:',end - start)


psnr: 25.783482291479018
Time: 187.99971079826355


# Get Features X and respond y using baseline

### Define basic parameters

In [17]:
seed = 1000
channels = np.arange(3)
sample_size = 100

In [18]:
# Get X and y for single pair of LR and HR
def get_X_and_y(LR,HR,n_sample = sample_size,seed = seed):
    # determine seed
    np.random.seed(seed)
    
    # Find neighbor
    def get_neighbor_X(a,i,j):
        return([a[i-1,j-1],a[i-1,j],a[i-1,j+1],a[i,j-1],a[i,j+1],a[i+1,j-1],a[i+1,j],a[i+1,j+1]],a[i,j])

    def get_neighbor_y(a,i,j):
        return([a[i,j],a[i+1,j],a[i,j+1],a[i+1,j+1]])
    
    # padding LR image
    BLACK = [0, 0, 0]
    image_padding = cv2.copyMakeBorder(LR, 1 , 1, 1, 1, cv2.BORDER_CONSTANT, value=BLACK)
    
    y1 = np.zeros((1*n_sample,4))
    y2 = np.zeros((1*n_sample,4))
    y3 = np.zeros((1*n_sample,4))
    
    X1 = np.zeros((1*n_sample,8))
    X2 = np.zeros((1*n_sample,8))
    X3 = np.zeros((1*n_sample,8))
    
    result = [X1,X2,X3]
    Y = [y1,y2,y3]
    
#     height = LR.shape[1]
#     width = LR.shape[0]
    width = LR.shape[1]
    height = LR.shape[0]
    
    # Random pick n_sample point per image
    pts_row = np.random.randint(1, height + 1,size = n_sample)
    pts_col = np.random.randint(1, width + 1,size = n_sample)
    
    for X,y,channel in zip(result,Y,channels):
        index = 0
        for i,j in zip(pts_row,pts_col):
            X_neighbor,central = get_neighbor_X(image_padding[:,:,channel],i,j)
            y_neigbor = get_neighbor_y(HR[:,:,channel],2*(i-1),2*(j-1))
            # Get X
            X[index] = X_neighbor - central
            # Get y
            y[index] = y_neigbor - central
            index +=1
    
    
    # Stack X&Y to 3d
    return np.dstack(result),np.dstack(Y)

In [19]:
def get_Whole_X_and_y(LR,HR,n_sample = sample_size,seed = seed):
    flag = 0
    for lr,hr in tqdm_notebook(zip(LR,HR)):
        if (flag == 0):
            X,y = get_X_and_y(lr,hr,n_sample,seed)
            flag = 1
        else:
            X_temp,y_temp = get_X_and_y(lr,hr,n_sample,seed)
            X = np.vstack([X,X_temp])
            y = np.vstack([y,y_temp])
    return X,y

In [20]:
# n_sample should be 1000
start = time.time()
X,y = get_Whole_X_and_y(LR,HR,n_sample = sample_size)
end = time.time()
print('Time:',end - start)


Time: 18.56043004989624


In [21]:
print(X.shape,y.shape)

(150000, 8, 3) (150000, 4, 3)


# Train & Test split

In [22]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,shuffle = True ,random_state=seed)

In [23]:
print(X_train.shape,y_train.shape)

(120000, 8, 3) (120000, 4, 3)


In [24]:
print(X_test.shape,y_test.shape)

(30000, 8, 3) (30000, 4, 3)


# Prepare functions

###### Prediction (12 models)

In [25]:
# test paramter == True means you want to predict X_test, False mean you want to predict X_train
# model_list = [model1,model2,model3,model4,model5,model6,model7,model8,model9,model10,model2,model3]
def Prediction(X_tr,y_tr,X_ts,model_list,test = True):
    
    if (test == False):
        X_test = X_tr
    else:
        X_test = X_ts
    
    prediction1 = np.zeros((X_test.shape[0],1,3))
    prediction2 = np.zeros((X_test.shape[0],1,3))
    prediction3 = np.zeros((X_test.shape[0],1,3))
    prediction4 = np.zeros((X_test.shape[0],1,3))
    
    predictions = [prediction1,prediction2,prediction3,prediction4]
    
    index = 0
    for channel in channels:
        for i,prediction in enumerate(predictions):
            model_list[index].fit(X_tr[:,:,channel],y_tr[:,i,channel].reshape(-1,1))
            prediction[:,:,channel] = model_list[index].predict(X_test[:,:,channel]).reshape(-1,1)
            index += 1
    return np.concatenate(predictions,axis = 1)


In [37]:
def fit_models(X_train,y_train,model_list):
    index = 0
    for channel in channels:
        for i in np.arange(4):
            model_list[index].fit(X_train[:,:,channel],y_train[:,i,channel].reshape(-1,1))
            index += 1

def Predict_test(X_test,model_list):
    prediction1 = np.zeros((X_test.shape[0],1,3))
    prediction2 = np.zeros((X_test.shape[0],1,3))
    prediction3 = np.zeros((X_test.shape[0],1,3))
    prediction4 = np.zeros((X_test.shape[0],1,3))
    
    predictions = [prediction1,prediction2,prediction3,prediction4]
    
    index = 0
    for channel in channels:
        for i,prediction in enumerate(predictions):
            prediction[:,:,channel] = model_list[index].predict(X_test[:,:,channel]).reshape(-1,1)
            index += 1
    return np.concatenate(predictions,axis = 1)

###### CV

In [27]:
from sklearn.model_selection import KFold

# Output testing error and training error
def cross_validation(X_train,y_train,model_list,metric,folds= 3):
    kf = KFold(n_splits=folds)
    val_error = []
    tr_error = []
    for train_index, val_index in tqdm_notebook(kf.split(X_train)):
        X_tr,y_tr = X_train[train_index],y_train[train_index]
        X_val,y_val = X_train[val_index],y_train[val_index]
        
        y_predict_val = Prediction(X_tr,y_tr,X_val,model_list,test = True)
        y_predict_tr = Prediction(X_tr,y_tr,X_val,model_list,test = False)
        # Metric 
        val_error.append(round(metric(y_val,y_predict_val),2)) # change metric here
        tr_error.append(round(metric(y_tr,y_predict_tr),2)) # change metric here
        
    return val_error,tr_error
        

###### Grid Search

In [28]:
from sklearn.metrics import make_scorer
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import mean_squared_error

def my_grid_search(X_train,y_train,model,parameters,cv = 3,n_jobs = 6):
    
    
    reg = GridSearchCV(model,
                       cv=cv,
                      param_grid = parameters,
                       scoring = make_scorer(mean_squared_error,
                                        greater_is_better=False),
                      n_jobs = n_jobs)
    reg.fit(X_train, y_train)
    
    print('Best params',reg.best_params_)
    print('Best MSE:',-reg.score(X_train, y_train))
    
    return reg.best_params_

def grid_search_all_model(X_train,y_train,model,parameters,cv = 3):
    best_params = []
    index = 0
    for channel in channels:
        for i in np.arange(4):
            X_tr = X_train[:,:,channel]
            y_tr = y_train[:,i,channel]
            
            print('Model',index)
            model = model.fit(X_tr,y_tr)
            best_params.append(my_grid_search(X_tr,y_tr,model,parameters,cv = cv))
            index += 1
            
    return best_params

###### Super resolution

In [288]:
def get_X_and_y_super_resolution(LR,HR):
    
    def get_neighbor_X(a,i,j):
        return([a[i-1,j-1],a[i-1,j],a[i-1,j+1],a[i,j-1],a[i,j+1],a[i+1,j-1],a[i+1,j],a[i+1,j+1]],a[i,j])

    def get_neighbor_y(a,i,j):
        return([a[i,j],a[i+1,j],a[i,j+1],a[i+1,j+1]])
    
    BLACK = [0, 0, 0]
    image_padding = cv2.copyMakeBorder(LR, 1 , 1, 1, 1, cv2.BORDER_CONSTANT, value=BLACK)
    
    height = LR.shape[1]
    width = LR.shape[0]
    height_HR = HR.shape[1]
    width_HR = HR.shape[0]
    
    y1 = np.zeros((height*width,4))
    y2 = np.zeros((height*width,4))
    y3 = np.zeros((height*width,4))
    
    X1 = np.zeros((height*width,8))
    X2 = np.zeros((height*width,8))
    X3 = np.zeros((height*width,8))
    
    result = [X1,X2,X3]
    Y = [y1,y2,y3]
    
    for X,y,channel in zip(result,Y,channels):
        index = 0
        for i,iy in zip(np.arange(1,width + 1),np.arange(0,width_HR,2)):
            for j,jy in zip(np.arange(1,height + 1),np.arange(0,height_HR,2)):
                # Get X neighbor
                neighbor,central = get_neighbor_X(image_padding[:,:,channel],i,j)
                # save central
                # Get y neighbor
                y_neigbor = get_neighbor_y(HR[:,:,channel],iy,jy)
                
                # Get X
                X[index] = neighbor - central
                # Get y
                y[index] = y_neigbor - central
                index +=1
    
    
    # Stack X&Y to 3d
    return np.dstack(result),np.dstack(Y)

def super_resolution(LR,HR,model_list):
    X,y = get_X_and_y_super_resolution(LR,HR)
    predict_y = Predict_test(X,model_list)
    
    c = LR.reshape(-1,3)
    # dim_y = (height*width) X 4
    height_HR = HR.shape[1]
    width_HR = HR.shape[0]
    
    picture = np.zeros(HR.shape)
    
    for channel in channels:
        index = 0 
        for i in np.arange(0,width_HR,2):
            for j in np.arange(0,height_HR,2):
                picture[i,j,channel] = predict_y[index,0,channel] + c[index,channel]
                picture[i+1,j,channel] = predict_y[index,1,channel] + c[index,channel]
                picture[i,j+1,channel] = predict_y[index,2,channel]+ c[index,channel]
                picture[i+1,j+1,channel] = predict_y[index,3,channel]+ c[index,channel]
                
                index += 1
    PSNR = psnr(y,predict_y)
    return picture.astype(np.uint8),PSNR

# Train using base model (gradient boosting)

In [29]:
from sklearn.ensemble import GradientBoostingRegressor

model = GradientBoostingRegressor()

###### Grid Search

In [27]:
parameters_tunning = {
    'learning_rate':[0.01, 0.1, 0.2, 0.3], # tuning (started with higher learning rate), hight value --> overfit
    'min_samples_split' : [2, 5, 10], # tuning,high values --> under-fitting
    'min_samples_leaf' : [2, 4,8,10], # tuning, similar to min_samples_split
}

In [39]:
start = time.time()
best = grid_search_all_model(X_train,y_train,model,parameters_tunning,cv = 3)

end = time.time()
print('Time:',end - start)

Model 0
Best params {'min_samples_split': 10, 'min_samples_leaf': 10, 'learning_rate': 0.2}
Best MSE: 153.43463651232437
Model 1
Best params {'min_samples_split': 2, 'min_samples_leaf': 4, 'learning_rate': 0.1}
Best MSE: 155.41662506564035
Model 2
Best params {'min_samples_split': 2, 'min_samples_leaf': 10, 'learning_rate': 0.1}
Best MSE: 158.14722412775924
Model 3
Best params {'min_samples_split': 2, 'min_samples_leaf': 10, 'learning_rate': 0.1}
Best MSE: 154.6713901026176
Model 4
Best params {'min_samples_split': 5, 'min_samples_leaf': 10, 'learning_rate': 0.2}
Best MSE: 142.50915268779144
Model 5
Best params {'min_samples_split': 10, 'min_samples_leaf': 10, 'learning_rate': 0.1}
Best MSE: 143.65584948963198
Model 6
Best params {'min_samples_split': 2, 'min_samples_leaf': 10, 'learning_rate': 0.1}
Best MSE: 147.99993570980564
Model 7
Best params {'min_samples_split': 2, 'min_samples_leaf': 8, 'learning_rate': 0.1}
Best MSE: 143.9337088722936
Model 8
Best params {'min_samples_split': 

In [30]:
best = [{'learning_rate': 0.2, 'min_samples_leaf': 10, 'min_samples_split': 10},
 {'learning_rate': 0.1, 'min_samples_leaf': 4, 'min_samples_split': 2},
 {'learning_rate': 0.1, 'min_samples_leaf': 10, 'min_samples_split': 2},
 {'learning_rate': 0.1, 'min_samples_leaf': 10, 'min_samples_split': 2},
 {'learning_rate': 0.2, 'min_samples_leaf': 10, 'min_samples_split': 5},
 {'learning_rate': 0.1, 'min_samples_leaf': 10, 'min_samples_split': 10},
 {'learning_rate': 0.1, 'min_samples_leaf': 10, 'min_samples_split': 2},
 {'learning_rate': 0.1, 'min_samples_leaf': 8, 'min_samples_split': 2},
 {'learning_rate': 0.2, 'min_samples_leaf': 8, 'min_samples_split': 2},
 {'learning_rate': 0.1, 'min_samples_leaf': 8, 'min_samples_split': 2},
 {'learning_rate': 0.1, 'min_samples_leaf': 10, 'min_samples_split': 2},
 {'learning_rate': 0.1, 'min_samples_leaf': 10, 'min_samples_split': 10}]

In [31]:
start = time.time()

model_list = []

for i in np.arange(12):
    model_list.append(GradientBoostingRegressor(**best[i],random_state = seed))

end = time.time()
print('Time:',end - start)

Time: 0.0003039836883544922


### Do CV

###### n_sample = 100

In [42]:
start = time.time()

val_error,tr_error = cross_validation(X_train,y_train,model_list,psnr,folds= 3)
print(' Validation error:',val_error,'\n','Train error:',tr_error,'\n\n')

end = time.time()
print('Time:',end - start)

 Validation error: [26.04, 26.2, 26.16] 
 Train error: [26.47, 26.39, 26.41] 


Time: 309.83047127723694


###### n_sample = 1000

In [94]:
start = time.time()

val_error,tr_error = cross_validation(X_train,y_train,model_list,psnr,folds= 3)
print(' Validation error:',val_error,'\n','Train error:',tr_error,'\n\n')

end = time.time()
print('Time:',end - start)


 Validation error: [24.93, 24.89, 24.9] 
 Train error: [24.93, 24.95, 24.95] 


Time: 4603.56507897377


### Predict

###### n_sample = 100

In [35]:
X_test.shape

(30000, 8, 3)

###### Fit models

In [39]:
fit_models(X_train,y_train,model_list)

###### Predict Test set

In [165]:
start = time.time()

prediction = Predict_test(X_test,model_list)
print('PSNR:',psnr(prediction, y_test),'\n\n')

end = time.time()
print('Time:',end - start)

PSNR: 26.138381481749647 


Time: 0.517711877822876


###### Super resolution test image

In [320]:
index = 1000

picture,PSNR = super_resolution(LR[index],HR[index],model_list)
print('PSNR is {:.3}'.format(PSNR))

PSNR is 26.4


In [322]:
plt.figure()
plt.imshow(HR[index].astype(np.uint8))

<IPython.core.display.Javascript object>

<matplotlib.image.AxesImage at 0x7fdab86aceb8>

In [323]:
plt.figure()
plt.imshow(picture)

<IPython.core.display.Javascript object>

<matplotlib.image.AxesImage at 0x7fdab8619b38>

###### n_sample = 1000

In [95]:
start = time.time()

prediction = Prediction(X_train,y_train,X_test,model_list,test = True)
print('PSNR:',psnr(prediction, y_test),'\n\n')

end = time.time()
print('Time:',end - start)


PSNR: 24.90888108941163 


Time: 1338.4358065128326


### Xgboost

In [127]:
# from xgboost import XGBRegressor    

# start = time.time()

# model_list_xgb = []
# model = XGBRegressor(n_jobs = 6,
#                      eval_metric = 'rmse',
#                      random_state = seed)

# params_xgb = {'n_estimators' : [100, 200,300],
#               'learning_rate':[0.01,0.1,0.2,0.3]
#              }

# best_param = grid_search_all_model(X_train,y_train,model,params_xgb)

# # for i in np.arange(12):
# #     model_list_xgb.append(XGBRegressor(learning_rate = 0.1,
# #                      n_jobs = 6,
# #                      n_estimators = 200,
# #                      eval_metric = 'rmse',
# #                      random_state = seed))

# end = time.time()
# print('Time:',end - start)


In [None]:
start = time.time()

model_list_xgb = []
for i in np.arange(12):
    model_list_xgb.append(XGBRegressor(learning_rate = 0.1,
                                        n_jobs = 6,
                                        n_estimators = 200,
                                        random_state = seed))



val_error,tr_error = cross_validation(X_train,y_train,model_list_xgb,psnr,folds= 3)
print(' Validation error:',val_error,'\n','Train error:',tr_error,'\n\n')

end = time.time()
print('Time:',end - start)

In [101]:
start = time.time()

val_error,tr_error = cross_validation(X_train,y_train,model_list_xgb,psnr,folds= 3)
print(' Validation error:',val_error,'\n','Train error:',tr_error,'\n\n')

end = time.time()
print('Time:',end - start)

 Validation error: [24.95, 25.12, 25.07] 
 Train error: [25.55, 25.46, 25.48] 


Time: 166.49743270874023


In [105]:
start = time.time()

prediction = Prediction(X_train,y_train,X_test,model_list_xgb,test = True)
print('PSNR:',psnr(prediction, y_test),'\n\n')

end = time.time()
print('Time:',end - start)

PSNR: 25.053560815915965 


Time: 46.624295711517334


In [None]:
# def get_X_and_y(LR,HR):
    
#     def get_neighbor_X(a,i,j):
#         return([a[i-1,j-1],a[i-1,j],a[i-1,j+1],a[i,j-1],a[i,j+1],a[i+1,j-1],a[i+1,j],a[i+1,j+1]],a[i,j])

#     def get_neighbor_y(a,i,j):
#         return([a[i,j],a[i+1,j],a[i,j+1],a[i+1,j+1]],a[i,j])
    
#     BLACK = [0, 0, 0]
#     image_padding = cv2.copyMakeBorder(LR, 1 , 1, 1, 1, cv2.BORDER_CONSTANT, value=BLACK)
    
#     height = LR.shape[1]
#     width = LR.shape[0]
    
#     y1 = np.zeros((height*width,4))
#     y2 = np.zeros((height*width,4))
#     y3 = np.zeros((height*width,4))
    
#     X1 = np.zeros((height*width,8))
#     X2 = np.zeros((height*width,8))
#     X3 = np.zeros((height*width,8))
    
#     result = [X1,X2,X3]
#     Y = [y1,y2,y3]
    
#     for X,y,channel in zip(result,Y,channels):
#         index = 0
#         for i,iy in zip(np.arange(1,width + 1),np.arange(0,width_HR,2)):
#             for j,jy in zip(np.arange(1,height + 1),np.arange(0,height_HR,2)):
#                 neighbor,central = get_neighbor_X(image_padding[:,:,channel],i,j)
#                 y_neigbor,y_central = get_neighbor_y(HR[:,:,channel],iy,jy)
#                 # Get X
#                 X[index] = neighbor - central
#                 # Get y
#                 y[index] = y_neigbor - y_central
#                 index +=1
    
    
#     # Stack X&Y to 3d
#     return np.dstack(result),np.dstack(Y)