## Importing

In [1]:
import os
import numpy as np
import cv2
import multiprocessing as mp
import time
import re
import functools
min_confidence = 0.2
# size = 6000

## for renaming the file (optional)

In [None]:
%%time
path = './images/'
i = 0
for filename in os.listdir(path):
    os.rename(os.path.join(path,filename), os.path.join(path,'image_'+str(i)+'.jpg'))
    i = i +1

## Video Input

### Splitting

In [2]:
def video_splitting(path,queue):
    vidcap = cv2.VideoCapture(path) # big_buck_bunny_720p_5mb.mp4

    success,image = vidcap.read()
    count = 0

    #clear directory of frames
#     dir = './frames'
#     for f in os.listdir(dir):
#         os.remove(os.path.join(dir, f))

    # success = False
    while success:
#     for i in range(100):
        queue.put([image,count])
#         cv2.imwrite("./frames/frame%d.jpg" % count, image)     # save frame as JPEG file    
        success,image = vidcap.read()
#         print('Read a new frame: ', success)
        count += 1
    
    pid = os.getpid()
    print(f"Splitting Done and Frames obtained: {count} with {pid}")

### Joining

In [3]:
def video_stitch(video_name,queue,fps,frame_count):
    
#     images = ["./image_output/{}_output.jpg".format(i) for i in range(len(os.listdir('./image_output')))]
#     images = ["./frames/frame{}.jpg".format(i) for i in range(len(os.listdir('./frames')))]

    # print(len(images))
    # print(type(images[0]))
    # print(images[0])

    image_temp = queue.get()

    height, width = image_temp.shape[:2]

    fourcc = cv2.VideoWriter_fourcc(*'mp4v') 
    video = cv2.VideoWriter(video_name, fourcc, fps, (width, height))
    
    video.write(image_temp)
    
#     for image in images:
#     while True:
    for i in range(frame_count-1):
            
        video.write(queue.get())
#         if(i%100==0):
#             print(i," stitch done")

    pid = os.getpid()
    print(f'stitch {pid} done')

    cv2.destroyAllWindows()
    video.release()
    
#     print("destroyed")

## class define and model load

In [4]:
CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",
    "bottle", "bus", "car", "cat", "chair", "cow", "diningtable",
    "dog", "horse", "motorbike", "person", "pottedplant", "sheep",
    "sofa", "train", "tvmonitor"]
COLORS = np.random.uniform(0, 255, size=(len(CLASSES), 3))

net = cv2.dnn.readNetFromCaffe("../MobileNetSSD_deploy.prototxt.txt", "../MobileNetSSD_deploy.caffemodel")

## function for detection

In [5]:
def prep(image, count, q):
    #image load
#     image = cv2.imread(img)
    (h, w) = image.shape[:2]
    blob = cv2.dnn.blobFromImage(cv2.resize(image, (300, 300)), 0.007843, (300, 300), 127.5)
#     index = re.sub("\D", "", img)
    q.put([blob,image,count])
    return None

def detect(queueIn, queueOut):

    arr, image, num = queueIn.get()
    (h, w) = image.shape[:2]
    #load image in model
    net.setInput(arr)
    detections = net.forward()

    #look for match in image
    for i in np.arange(0, detections.shape[2]):

        #extract confidence from detected object
        confidence = detections[0, 0, i, 2]

        #filter out low confidence objects
        if confidence > min_confidence:

            idx = int(detections[0, 0, i, 1])
            box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])

            (startX, startY, endX, endY) = box.astype("int")

            # display the prediction
            label = "{}: {:.2f}%".format(CLASSES[idx], confidence * 100)
#             print("[INFO] {}".format(label))
            cv2.rectangle(image, (startX, startY), (endX, endY), COLORS[idx], 2)

            y = startY - 15 if startY - 15 > 15 else startY + 15
            cv2.putText(image, label, (startX, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, COLORS[idx], 2)

    # show the output image
#     cv2.imwrite("./image_output/{}_output.jpg".format(num), image)
    queueOut.put(image)
    return None

# Parallel

In [6]:
# qw = ["./frames/frame{}.jpg".format(i) for i in range(739)]
m = mp.Manager()
q_size = 100
detect_q = m.Queue(q_size)
split_q = m.Queue(q_size)
stitch_q = m.Queue(q_size)

In [51]:
stitch_q.qsize()

0

In [None]:
for i in range(100):
    print(stitch_q.get())

## Parallel with 3 processors

In [7]:
def fun(queueIn,queueOut,frame_count,num):
    
    for i in range(frame_count):
#         print(num,": ",i,"prep")
        
#         if(i>10 and queueIn.empty()):
#             print("empty prep")
#             break
            
        image,count = queueIn.get()
        prep(image, count, queueOut)
        
#         if(i%250 == 0):
#             print(num, i,"prep done!")
            
    pid = os.getpid()
    print(f'prep {pid} done')

In [8]:
def fun1(queueIn,queueOut,frame_count,num):

    for i in range(frame_count):
#         print(num,": ",i,"detect")
        
#         if(i>10 and queueIn.empty()):
#             print("empty detect")
#             break
            
        detect(queueIn,queueOut)
#         if(i%250 == 0):
#             print(num,i,"detect done!")
    pid = os.getpid()
    print(f'detect {pid} done')

In [9]:
def fps_calc(path):
    vidcap = cv2.VideoCapture(path) # big_buck_bunny_720p_5mb.mp4

    fps = vidcap.get(cv2.CAP_PROP_FPS)
    frame_count = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
    return fps,frame_count
fps,frame_count = fps_calc("big_buck_bunny_720p_5mb.mp4")

In [10]:
%%time

split_p = mp.Process(target=video_splitting, args=("big_buck_bunny_720p_5mb.mp4",split_q)) #splitting
split_p.daemon = True
split_p.start()

detect_p = mp.Process(target=fun1, args=(detect_q,stitch_q,frame_count,1)) #detection
detect_p.daemon = True
detect_p.start()

prep_p = mp.Process(target=fun, args=(split_q,detect_q,frame_count,1)) #preprocessing
prep_p.daemon = True
prep_p.start()

stitch_p = mp.Process(target=video_stitch, args=('test_output/vid7.mp4',stitch_q,fps,frame_count)) #stitching
stitch_p.daemon = True
stitch_p.start()

split_p.join()
detect_p.join()
prep_p.join()
stitch_p.join()

Splitting Done and Frames obtained: 739 with 17284
prep 17292 done
detect 17287 done
stitch 17297 done
CPU times: user 5.37 ms, sys: 23.7 ms, total: 29.1 ms
Wall time: 36.3 s


In [82]:
split_q.qsize()

0

In [83]:
detect_q.qsize()

0

## Parallel with 5 processors

In [35]:
def fps_calc(path):
    vidcap = cv2.VideoCapture(path) # big_buck_bunny_720p_5mb.mp4

    fps = vidcap.get(cv2.CAP_PROP_FPS)
    return fps

In [36]:
def smap(f):
    return f()

def parallel(InPath,OutPath):
    
    fps = fps_calc(InPath)
    
    split1 = functools.partial(video_splitting, InPath, split_q)
    prep1 = functools.partial(fun, split_q, detect_q,1)
    prep2 = functools.partial(fun, split_q, detect_q,2)
    dete1 = functools.partial(fun1, detect_q,stitch_q,1)
    dete2 = functools.partial(fun1, detect_q,stitch_q,2)
    dete3 = functools.partial(fun1, detect_q,stitch_q,3)
    pool = mp.Pool(processes=6)
    
    res = pool.map_async(smap, [split1, prep1, prep2, dete1, dete2, dete3])
    pool.close()
    pool.join()
    
#     video_stitch(OutPath,fps)

In [37]:
%%time
parallel("big_buck_bunny_720p_5mb.mp4",'test_output/vid3.mp4')

12  : 3:   0:    detect00 
detect 
detect
1 0 prep done!
2 0 prep done!
2 0 detect done!1
 20 : 3   detect done!10 
detect1
 detect done! 
:  31  detect: 
 1 detect
3 :  2 2detect
 :  2 1 detect: 
 2 detect
3 :  3 detect
2 :  33  : detect 
4 detect
1 :  3 detect
3 :  5 detect2 :  
1 : 4 4  detectdetect

213 :  :    55  detectdetect: 

 6 detect
2 :  6Splitting Done and Frames obtained:  100
3 detect
 :  7 detect
1 :  6 detect
32  : :   7 8 detect
detect
1 empty prep:  
prep 8987 doneempty prep
7
 prep 8994 donedetect

2 :  8 3detect
 :  9 1 :  8 detect
detect
2 :  9 detect
3 1 :  : 9 10 detect
 detect
3 :  11 2 :  10 detectdetect

1 :  10 detect
2 :  11 detect1 
:  11 detect
3 :  12 detect
3 :  13 detect
1 2:   : 12  detect
12 detect
3 :  14 detect
1 :  13 3 :  detect15
 detect
2 :  13 detect
1 :  14 detect3
 :  16 detect2
 :  114 :  detect 
15 detect
3 :  17 detect
2 :  15 detect1
 :  316 :  18 detect 
detect
2 :  16 detect
3 : 1  19:   detect17
 detect
2 :  17 detect
1 3:   : 18  20d

In [71]:
def smap(f):
    return f()

split1 = functools.partial(video_splitting, "big_buck_bunny_720p_5mb.mp4", split_q)
prep1 = functools.partial(fun, split_q, detect_q,1)
prep2 = functools.partial(fun, split_q, detect_q,2)
dete1 = functools.partial(fun1, detect_q,1)
dete2 = functools.partial(fun1, detect_q,2)
dete3 = functools.partial(fun1, detect_q,3)
pool = mp.Pool(processes=6)

Fps: 25.0
1 0 prep done!
2 0 prep done!
1 0 detect done!
2 0 detect done!
3 0 detect done!
1 250 prep done!
2 250 prep done!
Splitting Done and Frames obtained:  739
empty prep
prep 132384 done
empty prep
prep 132388 done
1 250 detect done!
empty detect
detect 132396 done
empty detect
detect 132402 done
empty detect
detect 132392 done


In [72]:
%%time
# res = mp.pool.map_async(smap, [prep1, prep2, dete1, dete2, dete3])
res = pool.map_async(smap, [split1, prep1, prep2, dete1, dete2, dete3])
pool.close()
pool.join()

CPU times: user 114 ms, sys: 15.4 ms, total: 129 ms
Wall time: 46.9 s


In [28]:
while(q.empty() != True):
    q.get()

In [11]:
q.qsize()

0