In [0]:
import os
import cv2

import numpy as np
import matplotlib.pyplot as plt

from fastai.vision import *
from fastai import *

In [12]:
class Video:
    
    from google.colab import drive
    drive.mount('/content/gdrive', force_remount=True)

    def __init__(self):
        self.data = None
        self.learn = None
        self.learners = {} # store name:learn,data

        
    def video_to_frames(self,filepath,vid_fname,dest,verbose=False):
        '''
        convert mp4 video file to jpg frames
        filepath    : path cwd, str
        vid_fname   : filename of mp4 to process, str
        dest        : destination folder to create, str
        verbose     : whether to print processing status, bool 
        '''
        import cv2
        import numpy as np
        import os
        print(filepath+vid_fname)
        # Playing video from file:
        cap = cv2.VideoCapture(filepath+vid_fname)
        
        try:
            path = filepath+dest
            if not os.path.exists(path):
                print('adding:\n'.format(path))
                os.mkdir(path)
        except OSError:
            print ('Error: Creating directory of data')

        # get total frames in cv2 object
        n_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        print('total frames:',n_frames)
        currentFrame = 0
        for i in range(n_frames):     
            # Capture frame-by-frame
            ret, frame = cap.read()

            # Saves image of the current frame in jpg file
            name = path+'/frame' + str(currentFrame) + '.jpg'
            if verbose:
                print ('Creating...' + name)
            cv2.imwrite(name, frame)

            # To stop duplicate images
            currentFrame += 1

        # When everything done, release the capture
        cap.release()
        cv2.destroyAllWindows()
        print('\n{} converted to .jpg frames in:\n{}'.format(vid_fname,filepath+dest))
        

    def pil_image(self,path,folder,fname):
        '''
        opens jpg from path+folder+fname as pil image
        path    : dir path, str
        folder  : name of folder with jpgs, str
        fname   : name of jpg file, str
        returns : jpg 
        '''
        return Image.open(path+folder+'/'+fname)

    
    def image_array(self,path,folder,fname):
        '''
        opens jpg from path+folder+fname as np.array
        path    : dir path, str
        folder  : name of folder with jpgs, str
        fname   : name of jpg file, str
        returns : jpg image as np.array
        '''
        return np.asarray(Image.open(path+folder+'/'+fname))
    
    

    def frames_to_video(self,path,frames_folder,fname_out,
                        pred_dict,
                        fontsz=1,coord=(40,75),color=(255,255,255),thickness=2,
                        threshold=.9,fps=10):
        '''
        converts jpg frames to avi video
        # https://medium.com/@iKhushPatel/convert-video-to-images-images-to-video-using-opencv-python-db27a128a481
        path          : full path to cwd, str
        frames_folder : name of folder where jpg frames are stored, str
        fname_out     : file name of output .avi video, str
        pred_dict     : hash of cnn_name: pretrained cnn_leaner,ImageDataBunch
        fontsz        : font size of text overlay, float
        coord         : loc of text overlay, tuple of int
        color         : rgb color, tuple of int
        thickness     : font boldness, int
        threshold     : prediction proba threshold to overlay text, float
        returns       : None
        '''

        from os.path import isfile, join

        root,dirs,files = next(os.walk(path+frames_folder))
        print('Reading .jpg frames in:\n{}'.format(path+frames_folder))

        pathIn= path+frames_folder 
        pathOut = path+fname_out
        fps = fps
        frame_array = []
        file_name_list = [root+'/'+files[i] for i in range(len(files))]
        # must sort else video is reversed
        for i in range(len(file_name_list)):
            img = cv2.imread(file_name_list[i]) # img .jpg
            im_arr = np.asarray(img) # convert to np array for overlay_text 
            # add prediction text annotation
            pred_tensor = pred_dict[files[i]][1]
            cnn_name = pred_dict[files[i]][0]
            data_bunch = self.learners[cnn_name].data 
            class_pred, pred_proba = self.get_prediction(pred_tensor,
                                                         data_bunch.classes)
            # text string to overlay 
            if pred_proba >= threshold:
                s='{} {:.3f}'.format(class_pred,pred_proba).upper()
            else:
                s = '.'
            
            img = self.overlay_text(im_arr,s,coord=coord,
                     font=cv2.FONT_HERSHEY_SIMPLEX,
                     fontsz=fontsz,color=color,thickness=thickness)        
            height, width, layers = img.shape
            size = (width,height)

            #insert frame into an image array
            frame_array.append(img)
            
        out = cv2.VideoWriter(pathOut,cv2.VideoWriter_fourcc(*'DIVX'), fps, size)
        for i in range(len(frame_array)):
            # writing to a image array
            out.write(frame_array[i])
        out.release()
        print('\n.jpg frames to .avi video conversion complete')

        
    def store_predictions(self,file_path):
        '''
        make predictions on jpg frames in file_path+frame_folder
        and save the predictions in a dictionary along with the jpg image
        file_path    : base_dir, str
        returns      : OrderedDict fname str as key: list [tensor,jpg img] as value
        '''
        print('\ncomputing predictions in:\n{}'.format(file_path))
        root,dirs,files = next(os.walk(file_path))
        # populate row,col size counter dict
        from collections import OrderedDict
        d = OrderedDict()
        for i in range(len(files)):
            img_pth = root+'/'+files[i] 
            # fastai open_image for compatible image object in predict method
            img = open_image(img_pth)
            pred_max = -1
            best_torch_pred = None
            best_classes = []
            best_learn_key = None
            # obtain predictions for each learner
            for learn_key in self.learners:
                learn = self.learners[learn_key]
                data = learn.data
                curr_torch_pred = learn.predict(img)
                
                # determine which learner has strongest prediction proba  
                curr_proba = self.get_prediction(curr_torch_pred,data.classes)[1] 
                if curr_proba > pred_max:
                    # assign prediction and corresponding attributes
                    pred_max = curr_proba
                    best_torch_pred = curr_torch_pred
                    best_classes = data.classes
                    best_learn_key = learn_key
            d[files[i]] = [best_learn_key,best_torch_pred,best_classes]
        return d 
    
    def get_prediction(self,torch_pred,classes):
        '''
        obtains class pred from trained cnn_learner pytorch tensor output
        torch_pred   : prediction probas, pytorch tensor
        classes      : classes in fastai data obj .classes, list
        returns      : tuple of (class,proba) 
        '''
        predlist = (torch_pred[2]).tolist()
        pred_ix = np.argmax(predlist)
        return classes[pred_ix],predlist[pred_ix]

    def overlay_text(self,img_arr,text_str,coord=(50,75),
                     font=cv2.FONT_HERSHEY_SIMPLEX,
                     fontsz=2,color=(240,240,0),thickness=2):
        '''
        overlay text to an image array
        img_arr    : image, np.array
        text_str   : value to write, str
        coord      : loc of text, tuple of int
        font       : cv2 specified font
        fontsz     : fontScale of text, int
        color      : rgb tuple of 3 ints
        thickness  : thickness of font, int
        returns    : image, jpg
        '''
        img = cv2.putText(img_arr,text_str,coord,font,fontsz,color,thickness)
        return img


root_dir = "/content/gdrive/My Drive/"
dir = 'fellowshipai-data/jeffrey/video_challenge/'
base_dir = root_dir + dir


v = Video()


Mounted at /content/gdrive


### Load Pretrained `cnn_learner`s
- doneness and raw foods models
- raw group's model requires OverSamplingCallback 
- store dict of `cnn_learner pkl` filename as key : pretrained `cnn_learner` object as values  

In [0]:
from torch.utils.data.sampler import WeightedRandomSampler

__all__ = ['OverSamplingCallback']

class OverSamplingCallback(LearnerCallback):
    def __init__(self,learn:Learner,weights:torch.Tensor=None):
        super().__init__(learn)
        self.weights = weights
    def on_train_begin(self, **kwargs):
        self.labels = self.learn.data.train_dl.dataset.y.items
        _, counts = np.unique(self.labels,return_counts=True)
        if self.weights is None: self.weights = torch.DoubleTensor((1/counts)[self.labels]) 
        self.label_counts = np.bincount([self.learn.data.train_dl.dataset.y[i].data for i in range(len(self.learn.data.train_dl.dataset))])
        self.total_len_oversample = int(self.learn.data.c*np.max(self.label_counts))
        self.learn.data.train_dl.dl.batch_sampler = BatchSampler(WeightedRandomSampler(self.weights,self.total_len_oversample), self.learn.data.train_dl.batch_size,False)



# path = 'fellowshipai-data/final_3_class_data_train_test_split/'
# path = 'fellowshipai-data/data_backup/'

path = 'fellowshipai-data/jeffrey/video_challenge/models'
model_path = root_dir+path

saved_models = ['rn34_base_jeff.pkl','nyc_raw_food.pkl']


v.learners = {model_name:load_learner(model_path,model_name) for model_name in saved_models}


### Transform .mp4 video to .jpg 
`video_to_frames` method processes an `.mp4` in Google Drive cwd and makes a new folder to store `jpg` files as output

In [7]:
# v.video_to_frames(base_dir,'Chicken - 11679.mp4','chicken_11679',verbose=False)
# v.video_to_frames(base_dir,'SteakAvo.mp4','steak_avo',verbose=False)
v.video_to_frames(base_dir,'SteakAvo.mp4','steak_avo',verbose=False)


/content/gdrive/My Drive/fellowshipai-data/jeffrey/video_challenge/SteakAvo.mp4
total frames: 1209

SteakAvo.mp4 converted to .jpg frames in:
/content/gdrive/My Drive/fellowshipai-data/jeffrey/video_challenge/steak_avo


### Predict Class on `jpg` frames Data and Store Pytorch prediction tensors and images in OrderedDict

- OrderedDict key is filename of `jpg` frame 
- OrderedDict value is list of pretrained `cnn_learner` `.pkl` filename, Pytorch prediction tensor, list of corresonding class labels for the learner. 

In [8]:
# frames_file_path = base_dir+'chicken_11679'
frames_file_path = base_dir+'steak_avo'


# chicken_pred_dict = v.store_predictions(frames_file_path)
# chicken_pred_dict

steak_avo_pred_dict = v.store_predictions(frames_file_path)
steak_avo_pred_dict


computing predictions in:
/content/gdrive/My Drive/fellowshipai-data/jeffrey/video_challenge/steak_avo


OrderedDict([('frame0.jpg',
              ['rn34_base_jeff.pkl',
               (Category cooked, tensor(0), tensor([0.5480, 0.0212, 0.4307])),
               ['cooked', 'overcooked', 'raw']]),
             ('frame1.jpg',
              ['rn34_base_jeff.pkl',
               (Category cooked, tensor(0), tensor([0.5514, 0.0212, 0.4274])),
               ['cooked', 'overcooked', 'raw']]),
             ('frame2.jpg',
              ['rn34_base_jeff.pkl',
               (Category raw,
                tensor(2),
                tensor([7.6469e-04, 1.0830e-04, 9.9913e-01])),
               ['cooked', 'overcooked', 'raw']]),
             ('frame3.jpg',
              ['rn34_base_jeff.pkl',
               (Category raw,
                tensor(2),
                tensor([7.2589e-04, 1.3524e-04, 9.9914e-01])),
               ['cooked', 'overcooked', 'raw']]),
             ('frame4.jpg',
              ['rn34_base_jeff.pkl',
               (Category raw,
                tensor(2),
                tens

### Overlay `cnn_leaner` Class Prediction Annotations and Convert `.jpg` Frames back to `.avi` Video

- The method `frames_to_video` loops over each group's pretrained models and selects the class for a given `jpg` frame which has the maximum probability among all proabilities in pytorch prediction tensor.  

- The method writes to the `jpg` the predicted class and the corresponding class probability.

- Finally, the method saves converts frames

In [14]:

video_names = [('steak_avo','steak_avo.avi',steak_avo_pred_dict)]

for tup in video_names:
    v.frames_to_video(base_dir,tup[0],tup[1],
                      tup[2],
                      coord=(70,50),fontsz=.6,color=(250,250,250),thickness=2,
                      threshold=.925,fps=5)   



Reading .jpg frames in:
/content/gdrive/My Drive/fellowshipai-data/jeffrey/video_challenge/steak_avo

.jpg frames to .avi video conversion complete


In [82]:
chicken_pred_dict

OrderedDict([('frame0.jpg',
              ['rn34_base_jeff.pkl',
               (Category cooked, tensor(0), tensor([0.5170, 0.0078, 0.4752])),
               ['cooked', 'overcooked', 'raw']]),
             ('frame1.jpg',
              ['rn34_base_jeff.pkl',
               (Category raw, tensor(2), tensor([0.4521, 0.0068, 0.5411])),
               ['cooked', 'overcooked', 'raw']]),
             ('frame2.jpg',
              ['rn34_base_jeff.pkl',
               (Category cooked, tensor(0), tensor([0.5101, 0.0077, 0.4822])),
               ['cooked', 'overcooked', 'raw']]),
             ('frame3.jpg',
              ['rn34_base_jeff.pkl',
               (Category raw, tensor(2), tensor([0.4770, 0.0066, 0.5163])),
               ['cooked', 'overcooked', 'raw']]),
             ('frame4.jpg',
              ['rn34_base_jeff.pkl',
               (Category cooked, tensor(0), tensor([0.5012, 0.0075, 0.4913])),
               ['cooked', 'overcooked', 'raw']]),
             ('frame5.jpg',
        