In [1]:
'''
This script is part of a network Performance Profile Recommender system built on
top of Intel DAAL Kmeans and (implicit) ALS algorithms.

It converts the distributed *CSV* files output by kmeans_dense_distributed_mpi2.cpp 
(DAAL KMean w/ MPI) to input *CSR* files of implicit_als_csr_distributed_mpi2.cpp.

See kmeans_dense_distributed_mpi2.cpp     for C++ code of the DAAL Kmeans app.
See implicit_als_csr_distributed_mpi2.cpp for C++ code of the DAAL ALS app.
'''
import csv
import numpy as np
from scipy import sparse

'''
A generator class for 'with' to read a CSV file. Actually no with is okay 
because there is already a 'with' to close the file; for entertainment only.
'''
class readCSV:
    def __init__(self, fileName):
        self.fileName = fileName

    def __generator__(self):
        with open(self.fileName) as f:
            for cols in csv.reader(f):
                yield cols
                
    def __enter__(self):
        self.generator = self.__generator__()
        return self.generator
    
    def __exit__(self, type, value, traceback):
        # anything to do?
        pass

# get a list of input CSV files
import glob
import threading

csv_files = glob.glob('../data/sta_phyr_x*.csv')
if len(csv_files) <= 0:
    sys.exit()

'''
The order of file names (sta_phyr_xaa, sta_phyr_xab, ...) 
is significant for merging and slicing the matrices !!!!!
'''
csv_files = sorted(csv_files, key=lambda f: f)
print '\n'.join(csv_files)

# prepare to merge the CSV files in parallel
max_stas = [1 for i in range(len(csv_files))]
max_prfs = [1 for i in range(len(csv_files))]
sta_prfs = sparse.lil_matrix((9,9))

../data/sta_phyr_xaa.csv
../data/sta_phyr_xab.csv
../data/sta_phyr_xac.csv
../data/sta_phyr_xad.csv


In [None]:
'''
Thread function to read CSV files output by kmeans_dense_distributed_mpi2.cpp.
Depends on the value of set, it either gets shapes or load data to sta_prfs[].
'''
def worker_csv(i, fname, load):
    with readCSV(fname) as csv:
        for cols in csv:
            if len(cols) == 3:
                if load == False:  
                    # get shapes only
                    if cols[0] > max_stas[i]: max_stas[i] = cols[0]
                    if cols[2] > max_prfs[i]: max_prfs[i] = cols[2]
                else:              
                    # load data
                    sta_prfs[cols[0], cols[2]] = cols[1]

In [2]:
%%time

'''
To convert the CSV files output by kmeans_dense_distributed_mpi2.cpp,
it needs 2 passes:
    1. learn maximum network endpoint id and network profile id (aka. learn matrix shape)
       (these will be the maximum 'user' id and 'item' id for ALS)
    2. load network endpoint id's, profile id's and their ratings

This cell is for pass 1; next cell is for pass 2.
'''
threads = []
for i, csv_file in enumerate(csv_files):
    t = threading.Thread(target=worker_csv, args=(i, csv_file, False))
    threads.append(t)
    t.start()

[t.join() for t in threads]

max_sta = 1 + int(max(max_stas))
max_prf = 1 + int(max(max_prfs))

# To construct a matrix efficiently, use either dok_matrix or lil_matrix...
sta_prfs = sparse.lil_matrix((max_sta, max_prf), dtype=int, shape=None)

print sta_prfs.shape

(5438, 99998)
CPU times: user 3.96 s, sys: 1.72 s, total: 5.67 s
Wall time: 2.42 s


In [3]:
%%time

# fork a list of threads to populate entries of user-item matrix
threads = []
for i, csv_file in enumerate(csv_files):
    t = threading.Thread(target=worker_csv, args=(i, csv_file, True))
    threads.append(t)
    t.start()

[t.join() for t in threads]

CPU times: user 4min 8s, sys: 1min 3s, total: 5min 12s
Wall time: 3min 27s


In [None]:
# play dummy traversal of the matrix. (for debug only)
if False:
    nc = 0
    lil = sta_prfs
    for i, (row, data) in enumerate(zip(lil.rows, lil.data)):
        if nc > 1000: break
        for j, val in zip(row, data):
            print '[%d, %d] = %d' % (i, j, val)
            nc += 1
            if nc > 100: break

In [None]:
if False:
    # @deprecated! use mazzage() below instead!
    # remove all-zero rows and columns, other DAAL ALS aborts!!!
    # !!! TODO: need to create a mapping table for the removed STAs/profiles !!!
    sta_prfs = sta_prfs[sta_prfs.getnnz(1)>0][:,sta_prfs.getnnz(0)>0]
    print sta_prfs.shape

In [34]:
'''
Check if matrix is a CSR 
'''
def is_csr(matrix):
    return str(type(matrix)).find('csr_matrix') > 0

'''
This function massages a sparse matrix by making any of its rows or columns that
contain all zero values no more contain all zero values. 

Deprecated!! It is confirmed thru experiments that Intel DAAL ALS accepts an
input matrix or its transpose to have all zero values in any row/colum.
'''
def mazzage(matrix, filler=1, debug=False):
    if debug: print type(matrix)
    if debug: print 'matrix: \n', matrix.toarray()

    # toggle between CSR and CSC to get indices of any all-zero rows and columns
    matriy= matrix.tocsc() if is_csr(matrix) else matrix.tocsr()
    diff1 = np.where(np.diff(matrix.indptr) == 0)[0]
    diff2 = np.where(np.diff(matriy.indptr) == 0)[0]
    
    # swap indices if it's a CSC matrix
    if not is_csr(matrix):
        diff1, diff2 = diff2, diff1
    if debug: print 'diff1: ', diff1
    if debug: print 'diff2: ', diff2

    if len(diff1)>0 or len(diff2)>0:
        # need outer loop, because sucky np.broadcast can't handle index blocks larger than 32
        for ri in diff1 if len(diff1) else [0]:
            matrix[ri, diff2 if len(diff2) else [0]] = filler

    if debug: print 'matrix: \n', matrix.toarray()
    if debug: print '=' * 80


In [31]:
%%time

import random

'''
This function slices a matrix into the same number of CSR slices as that of input (CSV slices).
The CSR slices are then written to files with a '.csr' or a '.tsr' extension depending on
whether the input matrix is the original or transposed matrix, respectively.

Note! The matrix must be in lil form due to possible disorder of the slice function of scipy 
CSV or CSC object. 
'''
# cut matrix to (4) slices
def slicer(matrix, filext):
    nslice = len(csv_files)
    zslice = int((matrix.shape[0] + nslice - 1) / nslice)
    for i in range(nslice):
        slice = matrix[zslice*i : min(zslice*(i+1), matrix.shape[0]), ]
        #print "slice type = ", str(type(slice))

        # massge slice to not have non-zero row/col
#        if False:
#            mazzage(slice, filler=1, debug=True)
#        elif True:
#            # only for debug. make matrix as large as possible
#            slice[-1, random.choice(range(slice.shape[1]))] += 1
#            slice[random.choice(range(slice.shape[0])), -1] += 1

        # output the slice ALWAYS in CSR format !!
        fname = csv_files[i].replace('csv', filext)
        slice = sparse.csr_matrix(slice)

        with open(fname, 'w') as csvfile:
            writer = csv.writer(csvfile, delimiter=',')
            writer.writerow(slice.indptr  + 1)
            writer.writerow(slice.indices + 1)
            writer.writerow(slice.data / 100.) 
            # DAAL ALS expects a preference of [0,1]

        print 'slices[%s] = ' % fname, slice.shape

# @ only for debug - 1
if False:
    for k in range(100):
        sta_prfs[k//10, k%10] = k / 100.

'''
Don't convert sta_prfs from lil to csr BEFORE slicing it!
Otherwise, Intel DAAL ALS rejects the files and crashes!!
'''
slicer(sta_prfs,                     'csr')
slicer(sta_prfs.transpose().tocsr(), 'tsr') # lil.transpose is a CSC !!!

slices[../data/sta_phyr_xaa.csr] =  (1360, 99998)
slices[../data/sta_phyr_xab.csr] =  (1360, 99998)
slices[../data/sta_phyr_xac.csr] =  (1360, 99998)
slices[../data/sta_phyr_xad.csr] =  (1358, 99998)
slices[../data/sta_phyr_xaa.tsr] =  (25000, 5438)
slices[../data/sta_phyr_xab.tsr] =  (25000, 5438)
slices[../data/sta_phyr_xac.tsr] =  (25000, 5438)
slices[../data/sta_phyr_xad.tsr] =  (24998, 5438)
CPU times: user 3.46 s, sys: 104 ms, total: 3.57 s
Wall time: 4.33 s


In [32]:
'''
This and the following cells are for visually verifying the output 
CSR files before feeding them into my C++ implicit ALS app. 
See ./implicit_als_csr_distributed_mpi2.cpp.

Before running this verification, change the condition "@ only for debug - 1" 
in last cell to True so as to verify the first slice is correctly transposed.
'''
def readCSR(fileName):
    print fileName
    # csr file should have 3 rows: indices, indptrs and data
    rows = []
    with open(fileName) as f:
        reader = csv.reader(f)
        for i, row in enumerate(reader):
            rows.append(row)
            print "%d = " % i, row[0:10]

    rows[0] = np.array(rows[0]).astype(np.integer) - 1
    rows[1] = np.array(rows[1]).astype(np.integer) - 1
    rows[2] = np.array(rows[2]).astype(np.float)
    csr = sparse.csr_matrix((rows[2], rows[1], rows[0]))
    
    # dump csr and its dense
    if True:
        print rows[0][0:10]
        print rows[1][0:10]
        dense = csr.toarray()
        print "dense = ", dense.shape
        print dense[0:10, 0:10]
#        print 'diff=',np.where(np.diff(csr.indptr) == 0)[0]
#        for i in range(csr.shape[0]):
#            print "row[%d]="%i, rows[1][rows[0][i]:rows[0][i+1]] 
#            if len(rows[1][rows[0][i]:rows[0][i+1]] ) == 0:
#                print "row[%d]="%i, rows[1][rows[0][i]:rows[0][i+1]] 
        print '=' * 80

    return csr

In [33]:
# verify the written CSR files
for csv_file in csv_files: readCSR(csv_file.replace('csv', 'csr'))
for csv_file in csv_files: readCSR(csv_file.replace('csv', 'tsr'))

../data/sta_phyr_xaa.csr
0 =  ['1', '10', '20', '30', '40', '50', '60', '70', '80', '90']
1 =  ['2', '3', '4', '5', '6', '7', '8', '9', '10', '1']
2 =  ['0.01', '0.02', '0.029999999999999999', '0.040000000000000001', '0.050000000000000003', '0.059999999999999998', '0.070000000000000007', '0.080000000000000002', '0.089999999999999997', '0.10000000000000001']
[ 0  9 19 29 39 49 59 69 79 89]
[1 2 3 4 5 6 7 8 9 0]
dense =  (1360, 99982)
[[ 0.    0.01  0.02  0.03  0.04  0.05  0.06  0.07  0.08  0.09]
 [ 0.1   0.11  0.12  0.13  0.14  0.15  0.16  0.17  0.18  0.19]
 [ 0.2   0.21  0.22  0.23  0.24  0.25  0.26  0.27  0.28  0.29]
 [ 0.3   0.31  0.32  0.33  0.34  0.35  0.36  0.37  0.38  0.39]
 [ 0.4   0.41  0.42  0.43  0.44  0.45  0.46  0.47  0.48  0.49]
 [ 0.5   0.51  0.52  0.53  0.54  0.55  0.56  0.57  0.58  0.59]
 [ 0.6   0.61  0.62  0.63  0.64  0.65  0.66  0.67  0.68  0.69]
 [ 0.7   0.71  0.72  0.73  0.74  0.75  0.76  0.77  0.78  0.79]
 [ 0.8   0.81  0.82  0.83  0.84  0.85  0.86  0.87  0.88  0.