In [1]:
import pyspark as ps
# Uses all 4 cores
sc = ps.SparkContext('local[4]')

In [104]:
from settings import *
import numpy as np
from src.ETL_Pipeline import clean_data

# Summary
We're loading the data from a file but we're going to assume that in fact the data lives on a hadoop cluster.  This means that the data is too big to clean using bash or python - we have to clean it using spark.  It would be a lot easier to clean on one machine, since we're assuming it's big data we'll have to use some spark magic

In [106]:
clean_data(sc, 'train.txt').take(3)

[(0, array([[  1.63553300e+00,   2.48480000e-02,   4.32087000e-01,
           -3.61914000e-01,  -7.47760000e-02,  -6.93481000e-01,
           -2.29621000e-01,   2.61503000e-01,  -8.94210000e-02,
           -2.04310000e-02,  -8.61200000e-03,   1.39754000e-01],
         [  6.01596000e-01,  -1.41051000e-01,   1.47954000e-01,
           -6.88680000e-01,   1.00712000e-01,  -4.30234000e-01,
           -2.39402000e-01,   2.57168000e-01,  -5.51070000e-02,
           -3.32708000e-01,  -9.31730000e-02,   4.83610000e-02],
         [  1.54769400e+00,   8.75400000e-03,   3.19101000e-01,
           -2.97440000e-01,  -7.61700000e-03,  -6.36042000e-01,
           -2.96480000e-01,   2.62700000e-01,  -1.18514000e-01,
           -8.51280000e-02,   3.04080000e-02,   1.18123000e-01],
         [  4.90573000e-01,  -1.47165000e-01,   1.04378000e-01,
           -6.18192000e-01,   1.87163000e-01,  -4.24637000e-01,
           -2.17779000e-01,   1.88133000e-01,  -7.79710000e-02,
           -3.76427000e-01,  -1.02

## Train data

In [44]:
# Loading from text file but we're assuming it actually exists on hadoop
train_data = sc.textFile('{0}/{1}/train.txt'.format(HOME, DATA_DIR)).cache()

In [6]:
train_data.take(2)

[u'1.635533 0.024848 0.432087 -0.361914 -0.074776 -0.693481 -0.229621 0.261503 -0.089421 -0.020431 -0.008612 0.139754 ',
 u'1.547694 0.008754 0.319101 -0.297440 -0.007617 -0.636042 -0.296480 0.262700 -0.118514 -0.085128 0.030408 0.118123 ']

In [108]:
import os
os.environ["TRUMOTION_TEST"] = "1"


In [7]:
# Give each line a unique id, without having to traverse the entire dataset.
# I'm assuming we have massive data in this demented format already and I need to be able to clean it up efficiently.
zipped = train_data.zipWithUniqueId()

In [8]:
# Collect the indices of the breaks.  This is massively smaller than the full dataset and we can 
# use it as a broadcast variable.

In [9]:
breaks = zipped.filter(lambda x: x[0] == '').collect()
breaks_list = sorted([v for (k, v) in breaks])
breaks_list_broadcast = sc.broadcast(np.array(breaks_list))

In [10]:
def f(x):
    '''
    Find the smallest value greater than x[1] in breaks_list_broadcast.values()
    Output is: block_id, (row_id, row)
    '''
    
    # Index of the first double line break after the given line
    i = np.argmax(breaks_list_broadcast.value > x[1])
    br = breaks_list_broadcast.value[i]
    
    return br, (x[1], x[0])

In [11]:
# Now we remove the whitespace lines and merge the values into a single list

combined = zipped.filter(lambda x: len(x[0]) > 0).map(f).combineByKey(lambda value: [value],
                           lambda x, value: x + [value],
                           lambda x, y: x + y
                          )



In [12]:
# The data format is now
# key - index of the double line break indicating the end of the block
# value - (index of the line, line)

# We want this to eventually be: 
# key - index of the *block*
# value - numpy array representing the block

tst_value = combined.take(1)

In [66]:
def mapper(x):
    # Construct the numpy array of data
    key = x[0]
    # This sorts the list by line id, ensuring that the data are in order
    value = sorted(x[1])
    
    cols = len(value[0][1].strip().split(' '))
    rows = len(value)
    
    arr = np.empty((rows, cols))
    
    for i in range(len(value)):
        arr[i] = np.array(map(float, value[i][1].strip().split(' ')))
        
    # find the index of the block by looking it up in the broadcast array
    block_index = np.where(breaks_list_broadcast.value == key)[0][0]
    return block_index, arr

In [95]:
# Train data 
cleanTrainData = combined.map(mapper).sortByKey()

In [97]:
cleanTrainData.take(3)

[(0, array([[  1.63553300e+00,   6.01596000e-01,   1.54769400e+00,
            4.90573000e-01,   1.60259300e+00,   4.61532000e-01,
            1.67266500e+00,   4.67321000e-01,   1.74849700e+00,
            4.40218000e-01,   1.87026400e+00,   3.22108000e-01,
            1.68218700e+00,   2.81544000e-01,   1.74593700e+00,
            2.88430000e-01,   1.45357200e+00,   2.19517000e-01,
            1.41636700e+00,   1.48845000e-01,   1.46625100e+00,
            9.04780000e-02,   1.51558900e+00,   4.44170000e-02,
            1.32171800e+00,   5.04760000e-02,   1.29791100e+00,
            1.29152000e-01,   1.48730200e+00,   1.28647000e-01,
            1.54793800e+00,   1.17524000e-01,   1.54653100e+00,
            1.12678000e-01,   1.50885800e+00],
         [  2.48480000e-02,  -1.41051000e-01,   8.75400000e-03,
           -1.47165000e-01,  -2.10520000e-01,  -2.13263000e-01,
           -3.43458000e-01,  -2.84738000e-01,  -4.04425000e-01,
           -3.40476000e-01,  -5.47399000e-01,  -3.8274

## Train Labels

In [53]:
# Loading from text file but we're assuming it actually exists on hadoop
train_labels = sc.textFile('{0}/{1}/train_block_labels.txt'.format(HOME, DATA_DIR)).cache()

In [58]:
def flatMapper1(x):
    for k in x.split(' '):
        yield int(k)

In [61]:
train_label_rows = train_labels.flatMap(flatMapper1)

In [64]:
# Slower than zipWithUnique id but this is not a big dataset yet.
zipped_labels = train_label_rows.zipWithIndex()

In [65]:
zipped_labels.take(2)

[(31, 0), (35, 1)]

In [85]:
def flatMapper2(x):
    key = x[0]
    value = x[1]
    for i in range(key):
        yield value, 1

In [93]:
# Key is the index of the observation, value is the label of the observation
# with that index.  Labels are now clean
# The zipWithIndex here is unfortunate, there may be a faster way to do it, but let's cross that 
# bridge when we come to it.

cleanLabels = zipped_labels.flatMap(flatMapper2) \
                            .sortByKey() \
                            .map(lambda x: x[0]) \
                            .zipWithIndex() \
                            .map(lambda x: (x[1], x[0]))

## Join Data and Labels

In [101]:
# Format is 
# key: index
# Value: (train_data, train_label)

trainDataWithLabels = cleanTrainData.join(cleanLabels)

In [102]:
trainDataWithLabels.persist()

PythonRDD[198] at RDD at PythonRDD.scala:43

In [None]:
t