In [1]:
import os
import imageio
import numpy as np
from sklearn.linear_model import SGDRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline

In [2]:
path = '../ColabNotebooks/dataset'

In [3]:
def extractFrames(file_name, frame_step):
  video = imageio.get_reader(file_name, 'ffmpeg')
  return np.array([frame for i, frame in enumerate(video) if i % frame_step == 0])

In [4]:
def preProcessing(frames):
  video_r_channel = np.zeros(frames.shape[0:3])
  video_g_channel = np.zeros(frames.shape[0:3])
  video_b_channel = np.zeros(frames.shape[0:3])

  for i, frame in enumerate(frames):
    for j, row in enumerate(frame):
      for k, pixel in enumerate(row):
        video_r_channel[i][j][k] = pixel[0]
        video_g_channel[i][j][k] = pixel[1]
        video_b_channel[i][j][k] = pixel[2]
        
  return video_r_channel, video_g_channel, video_b_channel

In [5]:
# Returns correlation value
def superimpose(frame: np.ndarray, kernel: np.ndarray):
  n = kernel.shape[0]
  mid = n // 2

  superimposed_matrix = frame * kernel
  return sum(superimposed_matrix.flatten()) - superimposed_matrix[mid][mid]

In [6]:
def isValid(index, length, skip):
  return index >= skip and index < length - skip

In [7]:
# Channel is a 3D matrix: (number of frames, height, width)
# each element represents the pixel value of the given channel

def convolve(channel, kernel):
  n = len(kernel) # Kernel is of dimension n x n
  skip = n // 2 # No. of rows and cols to be skipped

  num_frames, height, width = channel.shape

  # res_frames = np.zeros((len(frames), height - 2 * skip, width - 2 * skip))
  res_channel = np.zeros(channel.shape)

  for i, frame in enumerate(channel):
    print(f'Frame: {i}')

    for j, row in enumerate(frame):
      if not isValid(j, height, skip):
        continue

      for k, pixel in enumerate(row):
        if not isValid(k, width, skip):
          continue

        subframe = frame[j - skip: j + skip + 1, k - skip: k + skip + 1]

        res_channel[i][j][k] = superimpose(subframe, kernel)

  return res_channel[:, skip:-skip, skip:-skip]

In [8]:
def saveAsVideo(frames, file_name, fps):
  frames = frames.astype('uint8')
  writer = imageio.get_writer(file_name, fps=fps, macro_block_size=None)
  for frame in frames:
    writer.append_data(frame)

In [None]:
def get_training_data(frame, kernel_size):
  skip = kernel_size // 2 # No. of rows and cols to be skipped

  height, width = frame.shape

  X = []
  Y = []

  for i, row in enumerate(frame):
    if not isValid(i, height, skip):
      continue

    for j, pixel in enumerate(row):
      if not isValid(j, width, skip):
        continue

      subframe = frame[i - skip: i + skip + 1, j - skip: j + skip + 1]

      X.append(np.delete(subframe.flatten(), skip * kernel_size + skip))
      Y.append(subframe[skip, skip])

  return X, Y

In [63]:
#driver code
files = os.listdir(path)

frames = np.array(extractFrames(os.path.join(path, files[5]), 30))
video_r_channel, video_g_channel, video_b_channel = preProcessing(frames)

In [64]:
actual = video_r_channel[0][1:-1, 1:-1]
actual

array([[163., 163., 163., ..., 158., 158., 158.],
       [163., 163., 163., ..., 158., 158., 158.],
       [163., 163., 163., ..., 159., 159., 159.],
       ...,
       [ 52.,  51.,  51., ..., 253., 253., 253.],
       [ 52.,  52.,  52., ..., 253., 253., 253.],
       [ 52.,  52.,  52., ..., 253., 253., 253.]])

In [65]:
X, Y = get_training_data(video_b_channel[0], 3)

In [66]:
l_model = make_pipeline(StandardScaler(), SGDRegressor())
l_model.fit(X, Y)

Pipeline(steps=[('standardscaler', StandardScaler()),
                ('sgdregressor', SGDRegressor())])

In [55]:
l_model.score(X, Y)

0.9983248651893777

In [62]:
print(l_model['sgdregressor'].coef_)
print(l_model['sgdregressor'].intercept_)

[-17.03503111  32.33820182 -15.58306979  33.81088394  33.71901943
 -15.63631079  32.35098649 -17.04748517]
[79.22787353]


In [56]:
pred_frame = l_model.predict(X).reshape(actual.shape)

In [57]:
err_frame = abs(pred_frame - actual).astype('uint8')

In [58]:
imageio.imwrite('org_gradient_descent_error_b.jpg', err_frame)

In [63]:
sum(abs(actual.flatten() - l_model.predict(X))) / len(actual.flatten())

0.24688925665398725

## Extras

In [None]:
saveAsVideo(video_r_channel, 'video_r.mp4', 5)
saveAsVideo(video_g_channel, 'video_g.mp4', 5)
saveAsVideo(video_b_channel, 'video_b.mp4', 5)

In [None]:
predicted = convolve(np.array([video_r_channel[0]]), kernel).astype('uint8')
predicted

Frame: 0


array([[[144, 144, 144, ..., 140, 140, 140],
        [144, 144, 144, ..., 140, 140, 140],
        [144, 144, 144, ..., 140, 140, 140],
        ...,
        [ 45,  45,  45, ..., 224, 224, 224],
        [ 46,  46,  45, ..., 224, 224, 224],
        [ 47,  46,  46, ..., 224, 224, 224]]], dtype=uint8)

In [None]:
imageio.imwrite('predicted_image_r.jpg', predicted[0])

In [None]:
diff = abs(actual - predicted[0]).astype('uint8')
diff

array([[19, 19, 19, ..., 18, 18, 18],
       [19, 19, 19, ..., 18, 18, 18],
       [19, 19, 19, ..., 19, 19, 19],
       ...,
       [ 7,  6,  6, ..., 29, 29, 29],
       [ 6,  6,  7, ..., 29, 29, 29],
       [ 5,  6,  6, ..., 29, 29, 29]], dtype=uint8)

In [None]:
(1 / len(diff.flatten())) * sum(diff.flatten())

13.366895691824933

In [None]:
imageio.imwrite('error_image_r.jpg', diff)