In [1]:
import numpy as np
import pandas as pd
import io
import bson
import matplotlib.pyplot as plt
from skimage.data import imread
import multiprocessing as mp
import pickle

In [2]:
NCORE = 2
#all_categories = mp.Manager().list()

all_categories = list()
all_categories_array = np.array([])

#categories to int dictionary
categ_to_int = {}
int_to_categ = {}

#total number of items in the list
n_train = 7069896 #from kaggle page
n_test = 1768182 #from kaggle page
n_example = 100 #from kaggle page

all_categories_filename_format = 'allcategoriesdata_{0}.p'
train_data_batch_file_format = 'training_batches/{0}/train_{0}_{1}_{2}.jpeg'
test_data_batch_file_format = 'testing_batches/{0}/test_{0}_{1}_{2}.jpeg'

train_category_folder_path_format = 'training_batches/{0}'
test_category_folder_path_format = 'testing_batches/{0}'
test_category_folder_name_format = 'folder_{0}'

show_every = 10000

In [3]:
import time

In [4]:
import os.path

def process_record_multicore_category(queue, iolock):
    while True:
        record = queue.get()
        if record is None:
            break
        global all_categories
        all_categories.append(record['category_id'])

def process_all_categories(filepath):
    """
    processes all categories and forms the list
    : filepath: file path
    """
    process_filename = filepath[filepath.rfind('/')+1:]
    filename_suffix = process_filename.replace('.bson','')
    categories_filename = all_categories_filename_format.format(filename_suffix)
    if os.path.isfile(categories_filename):
        print('File already exists. Seems already it is processed.')
        return
    
    global all_categories
    
    #queue = mp.Queue(maxsize=NCORE)
    #iolock = mp.Lock()
    #pool = mp.Pool(NCORE, initializer=process_record_multicore_category, initargs=(queue, iolock))
    
    #loading data from file
    data = bson.decode_file_iter(open(filepath, 'rb'))
    
    print('Starting to go through the file. Time: {0}'.format(time.ctime()))
    for c, record in enumerate(data):
        #queue.put(record)
        all_categories.append(record['category_id'])
        if c % 100000 ==0:
            print ('records processed: {0}, time: {1}'.format(c, time.ctime()))
    
    # tell workers we're done and join the stuff
    #for _ in range(NCORE):
    #    queue.put(None)
    #pool.close()
    #pool.join()
    print('File is processed. Time: {0}'.format(time.ctime()))
    
    all_categories_array = np.array(list(set(all_categories)))

    #process the categories and save them
    process_all_categories_array(all_categories_array, categories_filename)
    print('all categories processed.')

In [5]:
#data record preprocess sub-function 
def process_record_train(record):
    """
    processes each record from the training / test file during preprocessing function execution for training dataset
    : record: record to be processed
    : return: void
    """ 
    product_id = record['_id']
    category_id = record['category_id']
    for e, pic in enumerate(record['imgs']):
        picture = pic['picture']
        filepath = train_data_batch_file_format.format(category_id,product_id, e)
        if os.path.isfile(filepath):
            continue
        with open(filepath, 'wb') as w:
            w.write(picture)

In [6]:
#data record preprocess sub-function for test data set
def process_record_test(record, folder_id):
    """
    processes each record from the training / test file during preprocessing function execution for test data set
    : record: record to be processed
    : return: void
    """
    product_id = record['_id']  
    for e, pic in enumerate(record['imgs']):
        picture = pic['picture']
        filepath = test_data_batch_file_format.format(folder_id,product_id, e)
        if os.path.isfile(filepath):
            continue
        with open(filepath, 'wb') as w:
            w.write(picture)

In [7]:
#data preprocess function 
def process_training_file(data, enum_start=None, limit = None, file_suffix=''):
    """
    processes the training file and saves them to batch files for loading them later
    : filepath: path of the training file
    : return: void
    """
    #create all folders for categories
    for categ, i in categ_to_int.items():
        directory = train_category_folder_path_format.format(categ)
        if not os.path.exists(directory):
            os.makedirs(directory)
    
    #loading data from file
    #print('Starting to go through the Set. Time: {0}'.format(time.ctime()))
    
    init =  0 if enum_start == None else enum_start
    for c, record in enumerate(data, start=init):
        if(c % show_every ==0):
            print('processed records: {0}, Time: {1}'.format(c, time.ctime()))
        if(c > limit):
            break
        process_record_train(record)
        
    #print('File is processed. Time: {0}'.format(time.ctime()))
    #print('Preprocessing is done and saved. Time: {0}'.format(time.ctime()))

In [8]:
#test data preprocess function 
def process_test_file(data, enum_start=None, limit=None, file_suffix=''):
    """
    processes test file and saves the output to batch files for loading them later
    : filepath: path of the test file on disk
    : return: void
    """
    #create folder for each 10000 images
    folder_name = test_category_folder_name_format.format(file_suffix)
    directory = test_category_folder_path_format.format(folder_name)
    if not os.path.exists(directory):
        os.makedirs(directory)
    
    #loading data from file
    #print('TestFile: Starting to go through the Set. Time: {0}'.format(time.ctime()))
    
    init =  0 if enum_start == None else enum_start
    
    for c, record in enumerate(data, start=init):
        if(c % show_every==0):
            print('processed records: {0}, Time: {1}'.format(c, time.ctime()))
        if(c > limit):
            break
        process_record_test(record, folder_name)
    
    #print('TestFile: File is processed. Time: {0}'.format(time.ctime()))
    #print('Preprocessing is done and saved. Time: {0}'.format(time.ctime()))

In [9]:
def process_all_categories_array(all_categories_array, processed_filename):
    """
    processes all categories found in training data and creates dictionaries for faster reference
    : all_categories_array: array that contains all categories to form one hot encoding
    : return: void
    """
    global categ_to_int, int_to_categ
    categories_length = len(all_categories_array)
    categ_to_int = { categ:idx for idx, categ in enumerate(all_categories_array) }
    int_to_categ = { idx:categ for idx, categ in enumerate(all_categories_array) }
    
    pickle.dump((categ_to_int, int_to_categ), open(processed_filename, 'wb'))

In [10]:
def load_categ_to_int_dicts(data_file_path):
    """
    restores categ_to_int and int_to_categ object dictionaries from saved state files if exist
    : data_file_path: actual data file path - to represent the mode (train or train example)
    """
    process_filename = data_file_path[data_file_path.rfind('/')+1:]
    filename_suffix = process_filename.replace('.bson','')
    categories_filename = all_categories_filename_format.format(filename_suffix)
    
    with open(categories_filename, 'rb') as f:
        
        global categ_to_int, int_to_categ
        
        categ_to_int, int_to_categ = pickle.load(f)

In [11]:
def create_one_hot_label(original_label, label_length, one_hot_labels):
    """
    creates one hot label for a given original label value. A sub function for multi core processing of one hot encode function
    : label_length: length of label to initialize the array
    : one_hot_labels: the array that contains all one hot label
    : return: void
    """
    one_hot_label = np.zeros(label_length, dtype='int16')
    
    #commenting below line since now conversion to index happens while preparing the matrix.
    #one_hot_label[categ_to_int[original_label]] = 1
    #so changing it to
    one_hot_label[original_label] = 1
    
    one_hot_labels.append(one_hot_label)

def one_hot_encode(data_batch):
    """
    creates one hot encoded label for the given data batch using multi-core processing
    : data_batch: the sub-section of original final training data
    : return: array of one hot encoded label
    """
    one_hot_labels = list()
    label_length = len(categ_to_int)
    
    #print(data_batch)
    
    for i in range(len(data_batch)):
        original_label = data_batch[i][1] # category column
        create_one_hot_label(original_label, label_length, one_hot_labels)

    one_hot_labels = np.array(list(one_hot_labels))
    
    return one_hot_labels

In [14]:
#process_all_categories('data/train_example.bson')

File already exists. Seems already it is processed.


In [37]:
#test with training example file

#process_training_file('data/train_example.bson')

Multicore processing Queue, Lock, and Pool have been initialized and set up.
The data file has been loaded.
Starting to go through the file. Time: Tue Oct 10 19:52:25 2017
File is processed. Time: Tue Oct 10 19:52:25 2017
Preprocessing is done and saved. Time: Tue Oct 10 19:52:25 2017


In [None]:
#final_data_array[:,1]

In [None]:
#process_training_file('data/train.bson')

In [85]:
#load_categ_to_int_dicts('data/train_example.bson')

In [14]:
process_all_categories('data/train.bson')
#process_all_categories('train_example.bson')

Starting to go through the file. Time: Fri Oct 20 14:01:56 2017
records processed: 0, time: Fri Oct 20 14:01:56 2017
records processed: 100000, time: Fri Oct 20 14:02:06 2017
records processed: 200000, time: Fri Oct 20 14:02:18 2017
records processed: 300000, time: Fri Oct 20 14:02:38 2017
records processed: 400000, time: Fri Oct 20 14:02:50 2017
records processed: 500000, time: Fri Oct 20 14:03:01 2017
records processed: 600000, time: Fri Oct 20 14:03:21 2017
records processed: 700000, time: Fri Oct 20 14:04:09 2017
records processed: 800000, time: Fri Oct 20 14:04:19 2017
records processed: 900000, time: Fri Oct 20 14:04:30 2017
records processed: 1000000, time: Fri Oct 20 14:04:40 2017
records processed: 1100000, time: Fri Oct 20 14:04:50 2017
records processed: 1200000, time: Fri Oct 20 14:05:00 2017
records processed: 1300000, time: Fri Oct 20 14:05:13 2017
records processed: 1400000, time: Fri Oct 20 14:05:23 2017
records processed: 1500000, time: Fri Oct 20 14:05:33 2017
records

In [12]:
#Load dictionaries - categ_to_int and int_to_categ from files to objects
load_categ_to_int_dicts('data/train.bson')

In [13]:
len(categ_to_int)

5270

In [14]:
def preprocess_test_batches(filepath, override_batch=None):
    """
    preprocesses batches and saves them in batches to end up losing data due to long running processes
    : filepath: path of file to be processed
    """
    input_data = bson.decode_file_iter(open(filepath, 'rb'))
    
    limit = 10000
    batches_count = int(n_test / limit)
    batch_range = batches_count if override_batch is None else override_batch
    for batch_idx in range(batch_range):
        print('starting with batch: {0}'.format(batch_idx))
        process_test_file(input_data, enum_start=batch_idx*limit, limit=(batch_idx+1)*limit, file_suffix=batch_idx)
        
    print('all test files are preprocessed. cool!')

In [15]:
def preprocess_training_batches(filepath, override_batch=None):
    """
    preprocesses batches and saves them in batches to end up losing data due to long running processes
    : filepath: path of file to be processed
    """
    input_data = bson.decode_file_iter(open(filepath, 'rb'))
    
    limit = 10000
    batches_count = int(n_train / limit)
    batch_range = batches_count if override_batch is None else override_batch
    for batch_idx in range(batch_range):
        print('starting with batch: {0}'.format(batch_idx))
        process_training_file(input_data, enum_start=batch_idx*limit, limit=(batch_idx+1)*limit, file_suffix=batch_idx)

    print('all training files are preprocessed. cool!')

In [26]:
preprocess_training_batches('data/train.bson')

starting with batch: 0
processed records: 0, Time: Fri Oct 20 15:46:32 2017
processed records: 10000, Time: Fri Oct 20 15:46:51 2017
starting with batch: 1
processed records: 10000, Time: Fri Oct 20 15:46:52 2017
processed records: 20000, Time: Fri Oct 20 15:47:05 2017
starting with batch: 2
processed records: 20000, Time: Fri Oct 20 15:47:05 2017
processed records: 30000, Time: Fri Oct 20 15:47:13 2017
starting with batch: 3
processed records: 30000, Time: Fri Oct 20 15:47:14 2017
processed records: 40000, Time: Fri Oct 20 15:47:22 2017
starting with batch: 4
processed records: 40000, Time: Fri Oct 20 15:47:22 2017
processed records: 50000, Time: Fri Oct 20 15:47:31 2017
starting with batch: 5
processed records: 50000, Time: Fri Oct 20 15:47:31 2017
processed records: 60000, Time: Fri Oct 20 15:47:39 2017
starting with batch: 6
processed records: 60000, Time: Fri Oct 20 15:47:40 2017
processed records: 70000, Time: Fri Oct 20 15:47:49 2017
starting with batch: 7
processed records: 700

KeyboardInterrupt: 

In [16]:
preprocess_test_batches('data/test.bson')

starting with batch: 0
processed records: 0, Time: Mon Oct 23 01:03:48 2017
processed records: 10000, Time: Mon Oct 23 01:04:03 2017
starting with batch: 1
processed records: 10000, Time: Mon Oct 23 01:04:03 2017
processed records: 20000, Time: Mon Oct 23 01:04:24 2017
starting with batch: 2
processed records: 20000, Time: Mon Oct 23 01:04:24 2017
processed records: 30000, Time: Mon Oct 23 01:04:42 2017
starting with batch: 3
processed records: 30000, Time: Mon Oct 23 01:04:42 2017
processed records: 40000, Time: Mon Oct 23 01:05:03 2017
starting with batch: 4
processed records: 40000, Time: Mon Oct 23 01:05:03 2017
processed records: 50000, Time: Mon Oct 23 01:05:20 2017
starting with batch: 5
processed records: 50000, Time: Mon Oct 23 01:05:20 2017
processed records: 60000, Time: Mon Oct 23 01:05:36 2017
starting with batch: 6
processed records: 60000, Time: Mon Oct 23 01:05:36 2017
processed records: 70000, Time: Mon Oct 23 01:05:53 2017
starting with batch: 7
processed records: 700

KeyboardInterrupt: 