# SlitScan MPI parallelization

In [None]:
!pip install mpi4py

Collecting mpi4py
[?25l  Downloading https://files.pythonhosted.org/packages/ec/8f/bbd8de5ba566dd77e408d8136e2bab7fdf2b97ce06cab830ba8b50a2f588/mpi4py-3.0.3.tar.gz (1.4MB)
[K     |████████████████████████████████| 1.4MB 6.4MB/s 
[?25hBuilding wheels for collected packages: mpi4py
  Building wheel for mpi4py (setup.py) ... [?25l[?25hdone
  Created wheel for mpi4py: filename=mpi4py-3.0.3-cp36-cp36m-linux_x86_64.whl size=2074430 sha256=4d562de881c5e9e1d12554eea3ce0240bb55df1239044483df4c4fd8fce00af7
  Stored in directory: /root/.cache/pip/wheels/18/e0/86/2b713dd512199096012ceca61429e12b960888de59818871d6
Successfully built mpi4py
Installing collected packages: mpi4py
Successfully installed mpi4py-3.0.3


In [None]:
!pip install vidgear

Collecting vidgear
[?25l  Downloading https://files.pythonhosted.org/packages/7c/b1/a227292534fb9c633bd58a1b02e892e2d64befc6cc42b1c868fcefb172c4/vidgear-0.1.9-py3-none-any.whl (83kB)
[K     |████████████████████████████████| 92kB 4.5MB/s 
Collecting colorlog
  Downloading https://files.pythonhosted.org/packages/4e/c8/c16d30bbed11a1722060014c246d124582d1f781b26f5859d8dacc3e08e1/colorlog-4.6.2-py2.py3-none-any.whl
Collecting youtube-dl
[?25l  Downloading https://files.pythonhosted.org/packages/46/9c/69f5ede4f4b3e01390a9e9b355cb3bbe4e7550439bd0c33daa0faf87c1ba/youtube_dl-2020.12.14-py2.py3-none-any.whl (1.8MB)
[K     |████████████████████████████████| 1.9MB 26.1MB/s 
Collecting mss
[?25l  Downloading https://files.pythonhosted.org/packages/d7/5f/77dece686b8d08a17430e169e936722693712b8cf1ee638caa8b1cb6452b/mss-6.1.0-py3-none-any.whl (76kB)
[K     |████████████████████████████████| 81kB 8.5MB/s 
[?25hCollecting pafy
  Downloading https://files.pythonhosted.org/packages/74/69/829919ee

In [None]:
%%writefile slitscan_mpi.py
import numpy as np
from mpi4py import MPI
import matplotlib 
from matplotlib import pyplot as plt
from matplotlib import image as mpimg
import cv2
from vidgear.gears import WriteGear

def main():
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()            
    num_processes = comm.Get_size()

    shape = (100, 100, 3)
    frame_buf_len = shape[1]
    total_frames = 200
    itemsize = MPI.CHAR.Get_size()
    if rank == 0:
        nbytes = itemsize * np.prod(shape)
    else:
        nbytes = 0

    shrcomm = comm.Split(MPI.COMM_TYPE_SHARED);
    win = MPI.Win.Allocate_shared(nbytes, itemsize, comm=shrcomm)
    buf, itemsize = win.Shared_query(0) 
    array = np.ndarray(buffer=buf, dtype=np.uint8, shape=shape) 

    # frames_start and frames_end shows which part of frame buffer handles specific process
    frames_start = int(rank * frame_buf_len / num_processes)
    if rank == num_processes - 1:
        frames_end = int((rank + 1) * frame_buf_len / num_processes)
    else:
        frames_end = int((rank + 1) * frame_buf_len / num_processes)

    # assert frames_end - frames_start == frame_buf_len

    out_frames = np.zeros((int(frame_buf_len / num_processes), shape[0], shape[1], shape[2]), dtype=np.uint8)

    if rank == 0:
        cap = cv2.VideoCapture('hand_crop_100.mp4')
        output_params = {"-vcodec" : "libx264"}
        writer = WriteGear(output_filename='vid_out.mp4', logging=True, **output_params)
    else:
        writer = None
    
    writer = shrcomm.bcast(writer, root=0)
    
    for f in range(total_frames):
        frame_bounds = (max(f - frame_buf_len + 1, 0), min(f + 1, total_frames - frame_buf_len + 1))
        start_output = f - frame_buf_len + 1
        if rank == 0:
            ok, frame = cap.read()
            if not ok:
                print('BREAKING')
                break
            array[:, :, :] = frame[:, :, :]

        for j in range(*frame_bounds):
            if j % frame_buf_len >= frames_start and j % frame_buf_len < frames_end:
                out_frames[j % frame_buf_len - frames_start, :, (f - (frames_start + j)) % frame_buf_len, :] = array[:, (f - (frames_start + j)) % frame_buf_len, :]

            if start_output >= 0 and start_output % frame_buf_len >= frames_start and start_output % frame_buf_len < frames_end:
                writer.write(out_frames[start_output % frame_buf_len - frames_start])

        shrcomm.Barrier()

    if rank == 0:
        cap.release()
        writer.close()

main()

Overwriting slitscan_mpi.py


In [None]:
!mpirun --allow-run-as-root -np 4 python slitscan_mpi.py

Traceback (most recent call last):
  File "slitscan_mpi.py", line 73, in <module>
    main()
  File "slitscan_mpi.py", line 30, in main
    if rank == num_procceses - 1:
NameError: name 'num_procceses' is not defined
Traceback (most recent call last):
  File "slitscan_mpi.py", line 73, in <module>
    main()
  File "slitscan_mpi.py", line 30, in main
    if rank == num_procceses - 1:
NameError: name 'num_procceses' is not defined
Traceback (most recent call last):
  File "slitscan_mpi.py", line 73, in <module>
    main()
  File "slitscan_mpi.py", line 30, in main
    if rank == num_procceses - 1:
NameError: name 'num_procceses' is not defined
Traceback (most recent call last):
  File "slitscan_mpi.py", line 73, in <module>
    main()
  File "slitscan_mpi.py", line 30, in main
    if rank == num_procceses - 1:
NameError: name 'num_procceses' is not defined
-------------------------------------------------------
Primary job  terminated normally, but 1 process returned
a non-zero exit cod