# First Time to Python Multi-processing
I want to share something I learned about Python multi-processing on handling middle size data on a single machine.

By middle size, I mean GB level data. In my case, that is 13.2GB of data, which is small enough to load into memory but once load into memeory, you may not have enough memory to run any algorithms.

Let's dive into it.

# Problem Description
I have 13.2GB of data consisted of 7481 binary files. Each file is about 1.81M.

My task is simple:
    1. Read the file into a Python data structure
    2. Process the data to an image
    3. Write the image to disk
    
## File Format
The file stores a collection of vectors. Each vector has 4 elements: x, y, z, r

Each element is a float32, so the size of a vector is 16 bytes.

There is no delimiters between 2 vectors. Therefore, if the file size is 32 bytes, it has and only has 2 vectors. If the size of a file is not dividible by 16 bytes, it is not a valid file.
   


In [3]:
def read_velodyne_data(file_path):
	"""
	Read velodyne binary data and return a numpy array
	"""

	# First, check the size of this file to see if it's a valid velodyne binary file
	size = os.stat(file_path).st_size
	if size % 16 != 0:
		raise Exception('The size of '+file_path+' is not dividible by 16 bytes')

	with open(file_path, 'rb') as f:
		# Allocate memory for numpy array
		velodyne_data = np.empty(shape=(size//16, 4), dtype=np.float32)
		
		# Read the data, 16 bytes each time
		i = 0
		reader = BufferedReader(f)
		while reader.peek(16):
			read_bytes = reader.read(16)
			velodyne_data[i] = np.frombuffer(read_bytes, dtype=np.float32)
			i += 1
		
		# Check whether correct amount of bytes were read
		if i != size/16:
			error = ' '.join(['The file size is', str(size), ', but', str(i), 'bytes were read'])
			raise Exception(error)

		return velodyne_data

## Let's process the data
Each file is a velodyne point cloud scan

This is an sample processed image. This a a birdview of the lidar data
<img src="birdview.jpg" style="width: 600px; height: 600px"/>

The image is 1600 x 1600 pixel. Each pixel represent 10cm x 10cm of space. The range of the lidar in the x and y direction is 80m.

Where x is left and right, y is front and back.

We have 3 channels for this image.

In [4]:
def bird_view_map(velodyne_data):
	"""
	Implements the method in https://arxiv.org/pdf/1611.07759.pdf
	:param velodyne_data: a list of velodyne cloud points
	:return: 2D image with 3 channels: height, intensity and density
	"""
	bird_view = np.zeros(shape=(1600, 1600, 3), dtype=np.float32)
	for point in velodyne_data:
		x = point[0]
		y = point[1]
		z = point[2]
		r = point[3]
		# if (-40 <= x <= 40) and (-40 <= y <= 40):
		xi = 800 - np.int(np.ceil(x/0.1))
		yi = 800 - np.int(np.ceil(y/0.1))
		if -z > bird_view[yi][xi][0]:
			bird_view[yi][xi][0] = -z
			bird_view[yi][xi][1] = r
			bird_view[yi][xi][2] += 1

	# todo: normalize birdview with the real method
	bird_view[:,:,0] = np.interp(bird_view[:,:,0], xp=(np.min(bird_view[:,:,0]), np.max(bird_view[:,:,0])), fp=(0, 255))
	bird_view[:,:,1] = np.interp(bird_view[:,:,1], xp=(0, 1), fp=(0, 255))
	bird_view[:,:,2] = np.interp(bird_view[:,:,2], xp=(np.min(bird_view[:,:,2]), np.max(bird_view[:,:,2])), fp=(0, 255))
	return bird_view

In [16]:
from glob import glob
import cv2
import os
import numpy as np
from io import BufferedReader
import time

t = time.time()
for path in glob('data/raw/*.bin'):
    data = read_velodyne_data(path)
    view = bird_view_map(data)
    cv2.imwrite(''.join(['data/processed/', os.path.basename(path)[:-4], '.png']), view)
used = time.time() - t
print('Single processed version used', used, 'seconds')

Single processed version used 14.144984722137451 seconds


In [12]:
def read_velodyne_data_quick(file_path):
    """
    Read velodyne binary data and return a numpy array
    """

    # First, check the size of this file to see if it's a valid velodyne binary file
    size = os.stat(file_path).st_size
    if size % 16 != 0:
        raise Exception('The size of '+file_path+' is not dividible by 16 bytes')

    with open(file_path, 'rb') as f:
        # Allocate memory for numpy array
        velodyne_data = np.empty(shape=(size//16, 4), dtype=np.float32)

        # Read the data, 16 bytes each time
        i = 0
        reader = BufferedReader(f)
        read_bytes = reader.read(16)
        while read_bytes:
            velodyne_data[i] = np.frombuffer(read_bytes, dtype=np.float32)
            read_bytes = reader.read(16)
            i += 1

        # Check whether correct amount of bytes were read
        if i != size/16:
            error = ' '.join(['The file size is', str(size), ', but', str(i), 'bytes were read'])
            raise Exception(error)

        return velodyne_data

In [17]:
t = time.time()
for path in glob('data/raw/*.bin'):
    data = read_velodyne_data_quick(path)
    view = bird_view_map(data)
    cv2.imwrite(''.join(['data/processed/', os.path.basename(path)[:-4], '.png']), view)
used2 = time.time() - t
print('Better version used', used2, 'seconds')
print('It is ', used - used2, 'seconds faster')

Better version used 13.390703439712524 seconds
It is  -0.7542812824249268 seconds faster


In [None]:
def f(path):
	view = bird_view_map(read_velodyne_data_quick(path[0]))
	cv2.imwrite(''.join([path[1], '/', os.path.basename(path[0])[:-4], '.png']), view)

def generate_birdviews(data_paths, to_dir, workers):
	"""
	This function process velodyne data to birdview in parallel
	:param data_paths: a list of paths to velodyne xxx.bin files
	:param     to_dir: write birdview maps to this directory
	:param    workers: number of processes
	"""
	with Pool(workers) as p:
		to_dirs = [to_dir] * len(data_paths)
		p.map(f, list(zip(data_paths, to_dirs)))

from multiprocessing import Pool
        
t = time.time()
generate_birdviews(glob('data/raw/*.bin'), to_dir='data/try2', workers=8)
used3 = time.time() - t
print(used3)