In [None]:
# Use the Azure Machine Learning data collector to log various metrics
from azureml.logging import get_azureml_logger
import numpy as np
from pyspark.sql.types import Row
logger = get_azureml_logger()

In [3]:
# Use Azure Machine Learning history magic to control history collection
# History is off by default, options are "on", "off", or "show"
%azureml history on

History logging enabled


## First Read in Data

In [4]:
df = spark.read.option("header", "false").csv("wasb://data-files@kpmgstorage1.blob.core.windows.net/traindata/*.csv")
df.show(n=3)

NameError: name 'spark' is not defined

## Second, we need to transform data into one hot vectors
These steps are:
    1. Find unique values
    2. Create a list which contains an index and value, which can be used for one hot encoding.
    3. One hot encode the data

In [None]:
c0_unique = df.select('_c0').distinct().rdd.map(lambda r: r[0]).collect()
c1_unique = df.select('_c1').distinct().rdd.map(lambda r: r[0]).collect()
c5_unique = df.select('_c5').distinct().rdd.map(lambda r: r[0]).collect()
c6_unique = df.select('_c6').distinct().rdd.map(lambda r: r[0]).collect()

print(c0_unique)
print(len(c0_unique))
print(c1_unique)
print(len(c1_unique))
print(c5_unique)
print(len(c5_unique))
print(c6_unique)
print(len(c6_unique))

### One Hot Encoding

In [None]:
#Example creating one hot encoding for something with 7 classes for class 3
print(np.eye(7)[3])

#Example applying this to our data
print(np.eye(len(c1_unique))[c1_unique.index('Sat')])

In [None]:
#Now to do this to a spark data frame

#Define Function to make this easier to deal with
def One_Hot_String(val_list, val, sep=' '):
    array = np.eye(len(val_list))[val_list.index(val)].astype(int)
    return sep.join(str(i) for i in array.tolist())

#Apply Function to each column
mapped = df.rdd.map(lambda r: ( Row(One_Hot_String(c0_unique, r['_c0'])), 
                                r['_c1'], 
                                r['_c2'], 
                                r['_c3'],
                                r['_c4'],
                                r['_c5'],
                                r['_c6'])).toDF()

mapped = mapped.rdd.map(lambda r: ( r['_1'], 
                                Row(One_Hot_String(c1_unique, r['_2'])), 
                                r['_3'], 
                                r['_4'],
                                r['_5'],
                                r['_6'],
                                r['_7'])).toDF()

mapped = mapped.rdd.map(lambda r: ( r['_1'], 
                                r['_2'], 
                                r['_3'], 
                                r['_4'],
                                r['_5'],
                                Row(One_Hot_String(c5_unique, r['_6'])),
                                r['_7'])).toDF()

mapped = mapped.rdd.map(lambda r: ( r['_1'], 
                                r['_2'], 
                                r['_3'], 
                                r['_4'],
                                r['_5'],
                                r['_6'],
                                Row(One_Hot_String(c6_unique, r['_7'])) )).toDF()

mapped.show(n=10)

## Generate a Label File
Deep learning frameworks typically operate on label files.  These are completely processed files in a format specific to that framework which the framework can derive optimal performance from.  We are going to create a label file for CNTK.

The type of label file we will create is a CTF File or CNTK Text File.  These follow the format:

|somedescriptor value1 value2 value3 |somedescriptor2 value1 value2 value3

In our case, we will build a file of the form:

|features _1 _2 _3 _4 _5 _6 |label _7

|features _1 _2 _3 _4 _5 _6 |label _7

|features _1 _2 _3 _4 _5 _6 |label _7

In [None]:
def row_to_ctf_string(r):
    s = '|label ' + str(r['_7'][0]) + ' '
    s = s + '|features ' + str(r['_1'][0]) + ' '
    s = s + str(r['_2'][0]) + ' '
    s = s + str(r['_3'][0]) + ' '
    s = s + str(r['_4'][0]) + ' '
    s = s + str(r['_5'][0]) + ' '
    s = s + str(r['_6'][0]) + ' '
    return s
pfw = mapped.rdd.flatMap(lambda r: Row(row_to_ctf_string(r)) )
pfw.saveAsTextFile("wasb://data-files@bikesharestorage.blob.core.windows.net/train_ctf")