# Create the dataset for the neural network

Setup

In [1]:
import pyspark
import pandas as pd
import os
import time
from pyspark.sql.types import StructType, StructField,StringType, FloatType, IntegerType
from pyspark.sql import Row
from pyspark.sql import functions as F
from pyspark.sql import Window
import operator
import numpy as np
import random
import math
import re
import csv
from joblib import Parallel, delayed
import multiprocessing
import bcolz

In [2]:
from pyspark import SparkConf, SparkContext

conf = SparkConf()
conf.set("spark.driver.maxResultSize", "240g")
conf.set('spark.executor.memory', '32g')
conf.set('spark.driver.memory', '180g')

sc = SparkContext.getOrCreate(conf=conf)

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [3]:
SparkContext.getConf(sc).getAll()

[(u'spark.executor.memory', u'32g'),
 (u'spark.driver.memory', u'180g'),
 (u'spark.app.id', u'local-1528085779629'),
 (u'spark.driver.maxResultSize', u'240g'),
 (u'spark.driver.host', u'ip-172-31-16-121.eu-west-1.compute.internal'),
 (u'spark.rdd.compress', u'True'),
 (u'spark.serializer.objectStreamReset', u'100'),
 (u'spark.master', u'local[*]'),
 (u'spark.executor.id', u'driver'),
 (u'spark.driver.port', u'42904'),
 (u'spark.submit.deployMode', u'client'),
 (u'spark.ui.showConsoleProgress', u'true'),
 (u'spark.app.name', u'pyspark-shell')]

In [4]:
debug = False
N = 60
MISSING_DEPARTMENT=21
MISSING_AISLE=135
products = "/cat/home/ubuntu/ikrt/mycode/products.csv"
%pwd

u'/cat/home/ubuntu/ikrt/mycode'

In [5]:
sqlContext.clearCache()

In [6]:
path = 'C:\\Users\\chapanda\\OneDrive - Epsilon\\cp\\ACG\\03_Practise\\RnD\\crosssell\\instacart-basket-prediction-master\\data\\raw'
path_out = 'C:\\Users\\chapanda\\data'
path = ''
path_out = ''


In [7]:
start_time = 0
def tic():
    global start_time
    start_time = time.time()
def toc():
    global start_time
    elapsed_time = (time.time() - start_time)
    print("took me " + str(round(elapsed_time, 3))+" seconds to do this..")

In [8]:
def Tofloat(s):
    try:
        return float(s)
    except:
        return None

In [9]:
def ToInt(s):
    try:
        return int(s)
    except:
        return None

In [10]:
def ClipNPad(bits, maxBits):
    bits = bits[:maxBits]
    paddedBits = bits + [0]*(maxBits - len(bits))
    return paddedBits

# Read Data

 Read User Level Historical Information 

In [11]:
sample_file = pd.read_csv(path + "orders.csv",nrows=10)
sample_file

Unnamed: 0,order_id,user_id,eval_set,order_number,order_dow,order_hour_of_day,days_since_prior_order
0,2539329,1,prior,1,2,8,
1,2398795,1,prior,2,3,7,15.0
2,473747,1,prior,3,3,12,21.0
3,2254736,1,prior,4,4,7,29.0
4,431534,1,prior,5,4,15,28.0
5,3367565,1,prior,6,2,7,19.0
6,550135,1,prior,7,1,9,20.0
7,3108588,1,prior,8,1,14,14.0
8,2295261,1,prior,9,1,16,0.0
9,2550362,1,prior,10,4,8,30.0


In [12]:
data_columns =  [ 
                   ['order_id',IntegerType(), True],
                   ['user_id',IntegerType(), True],
                   ['eval_set',StringType(), True],
                   ['order_number',IntegerType(), True],
                   ['order_dow',IntegerType(), True],
                   ['order_hour_of_day',IntegerType(), True],
                   ['days_since_prior_order',FloatType(), True]
                 ]
fields = [StructField(field_name, field_type, null_type) for field_name, field_type, null_type in data_columns]
schema = StructType(fields)

In [13]:
tic()
orders = sc.textFile(path + "orders.csv") 
header = orders.first() #extract header
orders = orders.filter(lambda x: x[0] != header[0])   #filter out header
#convert data type
orders_csv = orders.map(lambda x: x.split(',')).map(lambda x: [ToInt(x[0]), ToInt(x[1]), x[2]\
                                                               ,  ToInt(x[3]), ToInt(x[4]),ToInt(x[5]),Tofloat(x[6])])
orders = sqlContext.createDataFrame(orders_csv, schema)
orders = orders.fillna(0, subset=['days_since_prior_order'])
toc()

took me 3.538 seconds to do this..


In [14]:
orders_csv.take(2)

[[2539329, 1, u'prior', 1, 2, 8, None], [2398795, 1, u'prior', 2, 3, 7, 15.0]]

In [15]:
orders.take(2)

[Row(order_id=2539329, user_id=1, eval_set=u'prior', order_number=1, order_dow=2, order_hour_of_day=8, days_since_prior_order=0.0),
 Row(order_id=2398795, user_id=1, eval_set=u'prior', order_number=2, order_dow=3, order_hour_of_day=7, days_since_prior_order=15.0)]

 Read Product Names

In [16]:
# define udf
def sorter(l):
    res = sorted(l, key=operator.itemgetter(0))
    return [item[1] for item in res]
sort_udf = F.udf(sorter)

In [17]:
tic()

if debug:
    user_id__days_since_prior_order_history = orders.groupby(['user_id'])\
            .agg(F.collect_list(F.struct("order_number","days_since_prior_order")).alias('list_col'))
    user_id__days_since_prior_order_history.select('user_id', sort_udf("list_col")\
                                                   .alias("days_since_prior_order_history")).show(n=3,truncate=False)

toc()     

took me 0.0 seconds to do this..


In [18]:
tic()

if debug:
    user_id__order_dow_history = orders.groupby(['user_id'])\
            .agg(F.collect_list(F.struct("order_number","order_dow")).alias('list_col'))
    user_id__order_dow_history.select('user_id', sort_udf("list_col")\
                                                   .alias("order_dow_history")).show(n=3,truncate=False)

toc()     

took me 0.0 seconds to do this..


In [19]:
tic()

if debug:
    user_id__order_hour_of_day_history = orders.groupby(['user_id'])\
            .agg(F.collect_list(F.struct("order_number","order_hour_of_day")).alias('list_col'))
    user_id__order_hour_of_day_history.select('user_id', sort_udf("list_col")\
                                                   .alias("order_hour_of_day_history")).show(n=3,truncate=False)

toc() 

took me 0.0 seconds to do this..


In [20]:
tic()

if debug:
    user_id__order_number_history = orders.groupby(['user_id'])\
            .agg(F.collect_list(F.struct("order_number","order_number")).alias('list_col'))
    user_id__order_number_history.select('user_id', sort_udf("list_col")\
                                                   .alias("order_number_history")).show(n=3,truncate=False)

toc() 

took me 0.0 seconds to do this..


In [21]:
tic()

user_order_history = orders.groupby(['user_id'])\
        .agg(F.collect_list(F.struct("order_number","order_number")).alias('list_col'),\
            F.collect_list(F.struct("order_number","order_hour_of_day")).alias('list_col2'),\
             F.collect_list(F.struct("order_number","order_dow")).alias('list_col3'),\
             F.collect_list(F.struct("order_number","days_since_prior_order")).alias('list_col4'))\
        .select('user_id', sort_udf("list_col").alias("order_number_history"),\
                                    sort_udf("list_col2").alias("order_hour_of_day_history"),\
                                    sort_udf("list_col3").alias("order_dow_history"),\
                                    sort_udf("list_col4").alias("days_since_prior_order_history"))
if debug:
    user_order_history.show(n=3,truncate=False)

toc() 

took me 0.137 seconds to do this..


Read item information

In [22]:
sample_file = pd.read_csv(path + "products.csv",nrows=10)
sample_file

Unnamed: 0,product_id,product_name,aisle_id,department_id
0,1,Chocolate Sandwich Cookies,61,19
1,2,All-Seasons Salt,104,13
2,3,Robust Golden Unsweetened Oolong Tea,94,7
3,4,Smart Ones Classic Favorites Mini Rigatoni Wit...,38,1
4,5,Green Chile Anytime Sauce,5,13
5,6,Dry Nose Oil,11,11
6,7,Pure Coconut Water With Orange,98,7
7,8,Cut Russet Potatoes Steam N' Mash,116,1
8,9,Light Strawberry Blueberry Yogurt,120,16
9,10,Sparkling Orange Juice & Prickly Pear Beverage,115,7


In [23]:
data_columns =  [ 
                   ['product_id',IntegerType(), True],
                   ['product_name',StringType(), True],
                   ['aisle_id',IntegerType(), True],
                   ['department_id',IntegerType(), True]
                 ]
fields = [StructField(field_name, field_type, null_type) for field_name, field_type, null_type in data_columns]
schema = StructType(fields)

In [24]:
tic()
products = sc.textFile(path + "products.csv") 
header = products.first() #extract header
products = products.filter(lambda x: x[0] != header[0])   #filter out header
#convert data type
products_csv = products.map(lambda x: x.split(',')).map(lambda x: [ToInt(x[0]), x[1], ToInt(x[2]), ToInt(x[3])])
products = sqlContext.createDataFrame(products_csv, schema)
toc()

took me 0.089 seconds to do this..


In [25]:
products_csv.take(2)

[[1, u'Chocolate Sandwich Cookies', 61, 19], [2, u'All-Seasons Salt', 104, 13]]

In [26]:
products.take(2)

[Row(product_id=1, product_name=u'Chocolate Sandwich Cookies', aisle_id=61, department_id=19),
 Row(product_id=2, product_name=u'All-Seasons Salt', aisle_id=104, department_id=13)]

 Read Transaction Level Historical Information

In [27]:
sample_file = pd.read_csv(path + "order_products__prior.csv",nrows=10)
sample_file

Unnamed: 0,order_id,product_id,add_to_cart_order,reordered
0,2,33120,1,1
1,2,28985,2,1
2,2,9327,3,0
3,2,45918,4,1
4,2,30035,5,0
5,2,17794,6,1
6,2,40141,7,1
7,2,1819,8,1
8,2,43668,9,0
9,3,33754,1,1


In [28]:
data_columns =  [ 
                   ['order_id',IntegerType(), True],
                   ['product_id',IntegerType(), True],
                   ['add_to_cart_order',IntegerType(), True],
                   ['reordered',IntegerType(), True]
                 ]
fields = [StructField(field_name, field_type, null_type) for field_name, field_type, null_type in data_columns]
schema = StructType(fields)

In [29]:
tic()
items_prior = sc.textFile(path + "order_products__prior.csv") 
header = items_prior.first() #extract header
items_prior = items_prior.filter(lambda x: x[0] != header[0])   #filter out header
#convert data type
items_prior_csv = items_prior.map(lambda x: x.split(',')).map(lambda x: [ToInt(x[0]), ToInt(x[1]), ToInt(x[2]), ToInt(x[3])])
items_prior = sqlContext.createDataFrame(items_prior_csv, schema)
toc()

took me 0.083 seconds to do this..


In [30]:
items_prior_csv.take(2)

[[2, 33120, 1, 1], [2, 28985, 2, 1]]

In [31]:
items_prior.take(2)

[Row(order_id=2, product_id=33120, add_to_cart_order=1, reordered=1),
 Row(order_id=2, product_id=28985, add_to_cart_order=2, reordered=1)]

In [32]:
tic()
items = sc.textFile(path + "order_products__train.csv") 
header = items.first() #extract header
items = items.filter(lambda x: x[0] != header[0])   #filter out header
#convert data type
items_csv = items.map(lambda x: x.split(',')).map(lambda x: [ToInt(x[0]), ToInt(x[1]), ToInt(x[2]), ToInt(x[3])])
items = sqlContext.createDataFrame(items_csv, schema)
toc()

took me 0.088 seconds to do this..


In [33]:
items_csv.take(2)

[[1, 49302, 1, 1], [1, 11109, 2, 1]]

In [34]:
items.take(2)

[Row(order_id=1, product_id=49302, add_to_cart_order=1, reordered=1),
 Row(order_id=1, product_id=11109, add_to_cart_order=2, reordered=1)]

In [35]:
items_all = items_prior.union(items)

In [36]:
if debug:
    orders.show()

In [37]:
sample_file = pd.read_csv(path + "aisles.csv",nrows=10)
sample_file

Unnamed: 0,aisle_id,aisle
0,1,prepared soups salads
1,2,specialty cheeses
2,3,energy granola bars
3,4,instant foods
4,5,marinades meat preparation
5,6,other
6,7,packaged meat
7,8,bakery desserts
8,9,pasta sauce
9,10,kitchen supplies


In [38]:
data_columns =  [ 
                   ['aisle_id',IntegerType(), True],
                   ['aisle',StringType(), True],
                 ]
fields = [StructField(field_name, field_type, null_type) for field_name, field_type, null_type in data_columns]
schema = StructType(fields)

In [39]:
tic()
aisles = sc.textFile(path + "aisles.csv") 
header = aisles.first() #extract header
aisles = aisles.filter(lambda x: x[0] != header[0])   #filter out header
#convert data type
aisles_csv = aisles.map(lambda x: x.split(',')).map(lambda x: [ToInt(x[0]), x[1]])
aisles = sqlContext.createDataFrame(aisles_csv, schema)
toc()

took me 0.063 seconds to do this..


In [40]:
aisles_csv.take(2)

[[1, u'prepared soups salads'], [2, u'specialty cheeses']]

In [41]:
aisles.take(2)

[Row(aisle_id=1, aisle=u'prepared soups salads'),
 Row(aisle_id=2, aisle=u'specialty cheeses')]

In [42]:
items_all = items_all.join(products, on = 'product_id', how = 'left')
items_all = items_all.fillna(MISSING_AISLE, subset=['aisle_id'])

In [43]:
items_all = items_all.join(aisles, on='aisle_id' , how='left')\
    .where("aisle IS NOT NULL")

In [44]:
tic()
if debug:
    items_all.show()
toc()

took me 0.0 seconds to do this..


In [45]:
matrix = orders.select(['user_id','order_number','order_id','eval_set'])\
        .where("eval_set=='train' or eval_set=='test' or eval_set=='prior'").join(items_all.select(['order_id','aisle_id','department_id']),on = 'order_id', how = 'left')\
        .select(['user_id','aisle_id','department_id']).withColumnRenamed("aisle_id", "master_aisle_id").withColumnRenamed("department_id","master_department_id").distinct()\
        .where("master_aisle_id !=0 and master_aisle_id IS NOT NULL")\
        .join(orders.select(['user_id','order_id', 'order_number']),on='user_id', how='left')\
        .join(items_all.select('order_id', 'add_to_cart_order' ,'reordered', 'aisle_id', 'department_id' , 'product_id')\
        ,on='order_id', how='left').orderBy(['user_id','master_aisle_id','order_number','add_to_cart_order'])\
        .select(['user_id','master_aisle_id','master_department_id', 'order_number' , 'add_to_cart_order' , 'reordered', 'aisle_id',\
                 'product_id'])

In [46]:
tic()
if debug:
    matrix.show()
toc()

took me 0.0 seconds to do this..


In [47]:
# define udf
def sorterAndIsInOrderFinder(l):
    res = sorted(l, key=operator.itemgetter(0))
    dep = [item[2] for item in res]
    return int((dep[0] in [item[1] for item in res]))

sorterAndIsInOrderFinder_udf = F.udf(sorterAndIsInOrderFinder)

In [48]:
# define udf
def sorterAndNumProductsFinder(l):
    res = sorted(l, key=operator.itemgetter(0))
    dep = [item[2] for item in res]
    return [item[1] for item in res].count(dep[0])

sorterAndNumProductsFinder_udf = F.udf(sorterAndNumProductsFinder)

In [49]:
# define udf
def sorterAndOrderSizeFinder(l):
    res = sorted(l, key=operator.itemgetter(0))
    return len([item[1] for item in res])

sorterAndOrderSizeFinder_udf = F.udf(sorterAndOrderSizeFinder)

In [50]:
# define udf
def sorterAndIndexInOrderFinder(l):
    res = sorted(l, key=operator.itemgetter(0))
    IndexInOrders = [item[0] for item in res]
    L = [float(n) for n in IndexInOrders if n]
    avg = round(sum(L)/len(L),1) if L else 999.0
    return avg

sorterAndIndexInOrderFinder_udf = F.udf(sorterAndIndexInOrderFinder)

In [51]:
# define udf
def sorterAndProductFinder(l):
    res = sorted(l, key=operator.itemgetter(0))
    return [item[3] for item in res]

sorterAndProductFinder_udf = F.udf(sorterAndProductFinder)

In [52]:
tic()
user_aisle_history = matrix.groupby(['user_id','master_department_id','master_aisle_id','order_number'])\
        .agg(F.collect_list(F.struct("add_to_cart_order","aisle_id","master_aisle_id","product_id")).alias('list_col'))\
        .select('user_id', 'master_department_id', 'master_aisle_id','order_number', sorterAndIsInOrderFinder_udf("list_col").alias("isInOrder"),\
               sorterAndNumProductsFinder_udf("list_col").alias("NumProductsFromDep"),
               sorterAndOrderSizeFinder_udf("list_col").alias("OrderSize"),\
               sorterAndIndexInOrderFinder_udf("list_col").alias("IndexInOrder"),\
               sorterAndProductFinder_udf("list_col").alias("ProductIDs"))\
        .groupby(['user_id','master_aisle_id','master_department_id'])\
        .agg(F.collect_list(F.struct("order_number","isInOrder")).alias('list_col2'),\
             F.collect_list(F.struct("order_number","NumProductsFromDep")).alias('list_col3'),\
            F.collect_list(F.struct("order_number","OrderSize")).alias('list_col4'),\
            F.collect_list(F.struct("order_number","IndexInOrder")).alias('list_col5'),\
            F.collect_list(F.struct("order_number","ProductIDs")).alias('list_col6'))\
        .select('user_id', 'master_aisle_id','master_department_id' , sort_udf("list_col2").alias("IsInOrder_history")\
               ,sort_udf("list_col3").alias("NumProductsFromDep_history"),\
               sort_udf("list_col4").alias("OrderSize_history"),sort_udf("list_col5").alias("IndexInOrder_history"),\
               sort_udf("list_col6").alias("ProductIDs_history"))        
if debug:
    user_aisle_history.show(n=5,truncate=False)
else:
    user_order_aisle_history = user_aisle_history.join(user_order_history, on='user_id', how='left').collect()
toc() 

took me 3294.897 seconds to do this..


In [53]:
user_aisle_history.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- master_aisle_id: integer (nullable = true)
 |-- master_department_id: integer (nullable = true)
 |-- IsInOrder_history: string (nullable = true)
 |-- NumProductsFromDep_history: string (nullable = true)
 |-- OrderSize_history: string (nullable = true)
 |-- IndexInOrder_history: string (nullable = true)
 |-- ProductIDs_history: string (nullable = true)



In [54]:
user_order_history.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- order_number_history: string (nullable = true)
 |-- order_hour_of_day_history: string (nullable = true)
 |-- order_dow_history: string (nullable = true)
 |-- days_since_prior_order_history: string (nullable = true)



In [55]:
user_aisle_df_pd = pd.DataFrame(user_order_aisle_history,columns = ['user_id', 'master_aisle_id','master_department_id','IsInOrder_history'\
                        ,'NumProductsFromDep_history', 'OrderSize_history','IndexInOrder_history','ProductIDs_history','order_number_history'\
                        ,'order_hour_of_day_history','order_dow_history','days_since_prior_order_history'])

In [56]:
sc.stop()

In [57]:
debug = True
if debug:
    user_aisle_df_pd.to_csv(path_out+'user_aisle_df_pd.csv',index=False)

In [58]:
path_out=""
user_aisle_df_pd = pd.read_csv(path_out+'user_aisle_df_pd.csv')

In [59]:
user_aisle_df_pd.head(5)

Unnamed: 0,user_id,master_aisle_id,master_department_id,IsInOrder_history,NumProductsFromDep_history,OrderSize_history,IndexInOrder_history,ProductIDs_history,order_number_history,order_hour_of_day_history,order_dow_history,days_since_prior_order_history
0,148,9,9.0,"[0, 0, 0, 0, 0, 0, 1, 0]","[0, 0, 0, 0, 0, 0, 1, 0]","[39, 14, 11, 2, 14, 15, 7, 7]","[20.0, 7.5, 6.0, 1.5, 7.7, 8.0, 4.0, 4.0]","[[10983, 21806, 6891, 44557, 18629, 11212, 244...","[1, 2, 3, 4, 5, 6, 7, 8]","[16, 7, 5, 13, 13, 12, 15, 23]","[5, 1, 4, 6, 2, 3, 6, 5]","[0.0, 3.0, 3.0, 2.0, 3.0, 1.0, 30.0, 27.0]"
1,148,61,19.0,"[0, 1, 0, 1, 0, 0, 0, 0]","[0, 1, 0, 1, 0, 0, 0, 0]","[39, 14, 11, 2, 14, 15, 7, 7]","[20.0, 7.5, 6.0, 1.5, 7.7, 8.0, 4.0, 4.0]","[[10983, 21806, 6891, 44557, 18629, 11212, 244...","[1, 2, 3, 4, 5, 6, 7, 8]","[16, 7, 5, 13, 13, 12, 15, 23]","[5, 1, 4, 6, 2, 3, 6, 5]","[0.0, 3.0, 3.0, 2.0, 3.0, 1.0, 30.0, 27.0]"
2,148,24,4.0,"[1, 0, 0, 0, 0, 0, 0, 1]","[1, 0, 0, 0, 0, 0, 0, 1]","[39, 14, 11, 2, 14, 15, 7, 7]","[20.0, 7.5, 6.0, 1.5, 7.7, 8.0, 4.0, 4.0]","[[10983, 21806, 6891, 44557, 18629, 11212, 244...","[1, 2, 3, 4, 5, 6, 7, 8]","[16, 7, 5, 13, 13, 12, 15, 23]","[5, 1, 4, 6, 2, 3, 6, 5]","[0.0, 3.0, 3.0, 2.0, 3.0, 1.0, 30.0, 27.0]"
3,148,59,15.0,"[0, 1, 0, 0, 0, 0, 0, 0]","[0, 2, 0, 0, 0, 0, 0, 0]","[39, 14, 11, 2, 14, 15, 7, 7]","[20.0, 7.5, 6.0, 1.5, 7.7, 8.0, 4.0, 4.0]","[[10983, 21806, 6891, 44557, 18629, 11212, 244...","[1, 2, 3, 4, 5, 6, 7, 8]","[16, 7, 5, 13, 13, 12, 15, 23]","[5, 1, 4, 6, 2, 3, 6, 5]","[0.0, 3.0, 3.0, 2.0, 3.0, 1.0, 30.0, 27.0]"
4,148,84,16.0,"[1, 0, 1, 0, 0, 0, 0, 0]","[3, 0, 1, 0, 0, 0, 0, 0]","[39, 14, 11, 2, 14, 15, 7, 7]","[20.0, 7.5, 6.0, 1.5, 7.7, 8.0, 4.0, 4.0]","[[10983, 21806, 6891, 44557, 18629, 11212, 244...","[1, 2, 3, 4, 5, 6, 7, 8]","[16, 7, 5, 13, 13, 12, 15, 23]","[5, 1, 4, 6, 2, 3, 6, 5]","[0.0, 3.0, 3.0, 2.0, 3.0, 1.0, 30.0, 27.0]"


In [60]:
# The function "text_to_wordlist" is from
# https://www.kaggle.com/currie32/quora-question-pairs/the-importance-of-cleaning-text
def text_to_wordlist(text, remove_stopwords=False, stem_words=False):
    # Clean the text, with the option to remove stopwords and to stem words.
    
    # Convert words to lower case and split them
    text = text.lower().split()

    # Optionally, remove stop words
    if remove_stopwords:
        stops = set(stopwords.words("english"))
        text = [w for w in text if not w in stops]
    
    text = " ".join(text)

    # Clean the text
    text = re.sub(r"[^A-Za-z0-9^,!.\/'+-=]", " ", text)
    text = re.sub(r"what's", "what is ", text)
    text = re.sub(r"\'s", " ", text)
    text = re.sub(r"\'ve", " have ", text)
    text = re.sub(r"can't", "cannot ", text)
    text = re.sub(r"n't", " not ", text)
    text = re.sub(r"i'm", "i am ", text)
    text = re.sub(r"\'re", " are ", text)
    text = re.sub(r"\'d", " would ", text)
    text = re.sub(r"\'ll", " will ", text)
    text = re.sub(r",", " ", text)
    text = re.sub(r"\.", " ", text)
    text = re.sub(r"!", " ! ", text)
    text = re.sub(r"\/", " ", text)
    text = re.sub(r"\^", " ^ ", text)
    text = re.sub(r"\+", " + ", text)
    text = re.sub(r"\-", " - ", text)
    text = re.sub(r"\=", " = ", text)
    text = re.sub(r"'", " ", text)
    text = re.sub(r"(\d+)(k)", r"\g<1>000", text)
    text = re.sub(r":", " : ", text)
    text = re.sub(r" e g ", " eg ", text)
    text = re.sub(r" b g ", " bg ", text)
    text = re.sub(r" u s ", " american ", text)
    text = re.sub(r"\0s", "0", text)
    text = re.sub(r" 9 11 ", "911", text)
    text = re.sub(r"e - mail", "email", text)
    text = re.sub(r"j k", "jk", text)
    text = re.sub(r"\s{2,}", " ", text)
    
    # Optionally, shorten words to their stems
    if stem_words:
        text = text.split()
        stemmer = SnowballStemmer('english')
        stemmed_words = [stemmer.stem(word) for word in text]
        text = " ".join(stemmed_words)
    
    # Return a list of words
    return(text)

In [61]:
products_file = pd.read_csv(products)
products_file

Unnamed: 0,product_id,product_name,aisle_id,department_id
0,1,Chocolate Sandwich Cookies,61,19
1,2,All-Seasons Salt,104,13
2,3,Robust Golden Unsweetened Oolong Tea,94,7
3,4,Smart Ones Classic Favorites Mini Rigatoni Wit...,38,1
4,5,Green Chile Anytime Sauce,5,13
5,6,Dry Nose Oil,11,11
6,7,Pure Coconut Water With Orange,98,7
7,8,Cut Russet Potatoes Steam N' Mash,116,1
8,9,Light Strawberry Blueberry Yogurt,120,16
9,10,Sparkling Orange Juice & Prickly Pear Beverage,115,7


In [62]:
product_name_1 = {}
product_name_2 = {}
product_name_3 = {}
product_name_4 = {}
product_name_5 = {}
product_aisle_id = {}
product_names = {}


for i, row in products_file.iterrows(): 
    product_names[row['product_id']] = row['product_name']
    words = text_to_wordlist(row['product_name']).split()[0:4]
    length = len(words)
    if length>0:
        product_name_1[row['product_id']] = words[0]
    else:
        product_name_1[row['product_id']] = ""
    if length>1:
        product_name_2[row['product_id']] = words[1]
    else:
        product_name_2[row['product_id']] = ""
    if length>2:
        product_name_3[row['product_id']] = words[2]
    else:
        product_name_3[row['product_id']] = ""    
    if length>3:
        product_name_4[row['product_id']] = words[3]
    else:
        product_name_4[row['product_id']] = ""        
    if length>4:
        product_name_5[row['product_id']] = words[4]
    else:
        product_name_5[row['product_id']] = ""        
    product_aisle_id[row['product_id']] = row['aisle_id']

In [63]:
def product_to_aisle(product_id):
    global product_aisle_id
    if product_id == 0:
        return 0
    else: 
        return product_aisle_id[product_id] 
product_to_aisle = np.vectorize(product_to_aisle)

In [64]:
file = "glove.txt"
import io

def loadGloveModel(gloveFile):
    print ("Loading Glove Model")   
    with io.open(gloveFile, encoding="utf8" ) as f:
        content = f.readlines()
    model = {}
    for line in content:
        splitLine = line.split()
        word = splitLine[0]
        embedding = np.array([float(val) for val in splitLine[1:]])
        model[word] = embedding
    print ("Done.",len(model)," words loaded!")
    return model
     
model = loadGloveModel(file)

Loading Glove Model
('Done.', 400000, ' words loaded!')


In [65]:
def gloveEmbedding(word):
    global product_names
    word = product_names[word]
    global model
    count = 0
    embeddings = np.zeros(50, np.float16)
    words = text_to_wordlist(word).split()
    for i in range(len(words)):
        try:
            word_emb = model[words[i]].astype(np.float16)
        except KeyError:
            continue
        embeddings = embeddings +  word_emb
        count += 1
    if count ==0:
        count = 1
    return embeddings/count

In [66]:
def averageGloveEmbeddings(p):
    ProductNameEmbeddingsInOrder = np.zeros(shape=[50],dtype=np.float16)
    count=0
    s=p.shape[0]
    for i in range(s):
        if (p[i] != 0):
            ProductNameEmbeddingsInOrder = ProductNameEmbeddingsInOrder+gloveEmbedding(p[i])
            count+=1
    if count==0:
        returnValue = ProductNameEmbeddingsInOrder/count
    else:
        returnValue = np.zeros(shape=[50],dtype=np.float16)
    return(returnValue)

In [67]:
num_cores_total = multiprocessing.cpu_count()
num_cores = multiprocessing.cpu_count() - 1
worker_id = range(num_cores)
print num_cores_total,num_cores

32 31


In [68]:
user_aisle_df_pd = user_aisle_df_pd.sample(frac=1)
user_aisle_df_pd['worker_id'] = np.random.randint(0, num_cores, user_aisle_df_pd.shape[0])

In [69]:
def save_array(fname, arr):
    c=bcolz.carray(arr, rootdir=fname, mode='w')
    c.flush()

In [70]:
def createArrays(wid):
    user_aisle_df_pd_worker = user_aisle_df_pd[(user_aisle_df_pd.worker_id == wid)]
    length = len(user_aisle_df_pd_worker)

    user_id = np.zeros(shape=[length,],dtype=np.int32)
    label = np.zeros(shape=[length,],dtype=np.int32)

    master_aisle_id = np.zeros(shape=[length,],dtype=np.int32)
    master_department_id = np.zeros(shape=[length,],dtype=np.int32)
    IsInOrder_history = np.zeros(shape=[length,N],dtype=np.int32)
    NextInOrder_history = np.zeros(shape=[length,N],dtype=np.int32)
    NumProductsFromDep_history = np.zeros(shape=[length,N],dtype=np.int32)
    OrderSize_history = np.zeros(shape=[length,N],dtype=np.int32)
    IndexInOrder_history = np.zeros(shape=[length,N],dtype=np.int32)

    ProductID1_history = np.zeros(shape=[length,N],dtype=np.int32)
    ProductID2_history = np.zeros(shape=[length,N],dtype=np.int32)
    ProductID3_history = np.zeros(shape=[length,N],dtype=np.int32)
    ProductID4_history = np.zeros(shape=[length,N],dtype=np.int32)
    ProductID5_history = np.zeros(shape=[length,N],dtype=np.int32)
    ProductID6_history = np.zeros(shape=[length,N],dtype=np.int32)
    ProductID7_history = np.zeros(shape=[length,N],dtype=np.int32)
    ProductID8_history = np.zeros(shape=[length,N],dtype=np.int32)
    ProductID9_history = np.zeros(shape=[length,N],dtype=np.int32)
    ProductID10_history = np.zeros(shape=[length,N],dtype=np.int32)
    ProductID11_history = np.zeros(shape=[length,N],dtype=np.int32)
    ProductID12_history = np.zeros(shape=[length,N],dtype=np.int32)
    ProductID13_history = np.zeros(shape=[length,N],dtype=np.int32)
    ProductID14_history = np.zeros(shape=[length,N],dtype=np.int32)
    ProductID15_history = np.zeros(shape=[length,N],dtype=np.int32)
    ProductID16_history = np.zeros(shape=[length,N],dtype=np.int32)
    ProductID17_history = np.zeros(shape=[length,N],dtype=np.int32)
    ProductID18_history = np.zeros(shape=[length,N],dtype=np.int32)
    ProductID19_history = np.zeros(shape=[length,N],dtype=np.int32)
    ProductID20_history = np.zeros(shape=[length,N],dtype=np.int32)

    ProductNameEmbedding_history = np.zeros(shape=[length,N,50],dtype=np.float16)

    order_number_history = np.zeros(shape=[length,N],dtype=np.int32)
    order_hour_of_day_history = np.zeros(shape=[length,N],dtype=np.int32)
    order_dow_history = np.zeros(shape=[length,N],dtype=np.int32)
    days_since_prior_order_history = np.zeros(shape=[length,N],dtype=np.int32)
    history_length = np.zeros(shape=[length,],dtype=np.int32)

    i=-1
    for row_index, row in user_aisle_df_pd_worker.iterrows(): 
        i+=1 
        user_id[i] = row['user_id']
        master_aisle_id[i] = row['master_aisle_id']
        if math.isnan(row['master_department_id']):
            master_department_id[i] = MISSING_DEPARTMENT
        else:
            master_department_id[i] = row['master_department_id']    
        IsInOrder_history[i,:] = ClipNPad(list(map(int,row['IsInOrder_history'].replace("[","").replace("]","").split(","))),N)
        NextInOrder_history[i,:] = np.roll(ClipNPad(list(map(int,row['IsInOrder_history'].replace("[","").replace("]","").split(","))),N),-1,axis=0)
        NumProductsFromDep_history[i,:] = ClipNPad(list(map(int,row['NumProductsFromDep_history'].replace("[","").replace("]","").split(","))),N)
        OrderSize_history[i,:] = ClipNPad(list(map(int,row['OrderSize_history'].replace("[","").replace("]","").split(","))),N)

        ProductIDs = list(map(int,row['ProductIDs_history'].replace("[","").replace("]","").replace("null","0").split(",")))

        start=0
        productsInorder = np.zeros(shape=[20],dtype=np.int32)

        for z in range(N):
            size = min(OrderSize_history[i,z],20)
            if size==0:
                break
            end = start + size
            productsInorder[0:size] = ProductIDs[start:end]

            ProductNameEmbedding_history[i,z] = averageGloveEmbeddings(productsInorder[0:size])

            ProductID1_history[i,z] = productsInorder[0]
            ProductID2_history[i,z] = productsInorder[1]
            ProductID3_history[i,z] = productsInorder[2]
            ProductID4_history[i,z] = productsInorder[3]
            ProductID5_history[i,z] = productsInorder[4]
            ProductID6_history[i,z] = productsInorder[5]
            ProductID7_history[i,z] = productsInorder[6]
            ProductID8_history[i,z] = productsInorder[7]
            ProductID9_history[i,z] = productsInorder[8]
            ProductID10_history[i,z] = productsInorder[9]
            ProductID11_history[i,z] = productsInorder[10]
            ProductID12_history[i,z] = productsInorder[11]
            ProductID13_history[i,z] = productsInorder[12]
            ProductID14_history[i,z] = productsInorder[13]
            ProductID15_history[i,z] = productsInorder[14]
            ProductID16_history[i,z] = productsInorder[15]
            ProductID17_history[i,z] = productsInorder[16]
            ProductID18_history[i,z] = productsInorder[17]
            ProductID19_history[i,z] = productsInorder[18]
            ProductID20_history[i,z] = productsInorder[19]

            start += size

        IndexInOrder_history[i,:] = ClipNPad(list(map(float,row['IndexInOrder_history'].replace("[","").replace("]","").split(","))),N)
        order_number_history[i,:] = np.roll(ClipNPad(list(map(int,row['order_number_history'].replace("[","").replace("]","").split(","))),N),-1,axis=0)
        order_hour_of_day_history[i,:] = np.roll(ClipNPad(list(map(int,row['order_hour_of_day_history'].replace("[","").replace("]","").split(","))),N),-1,axis=0)
        order_dow_history[i,:] = np.roll(ClipNPad(list(map(int,row['order_dow_history'].replace("[","").replace("]","").split(","))),N),-1, axis=0)
        days_since_prior_order_history[i,:] = np.roll(ClipNPad(list(map(float,row['days_since_prior_order_history'].replace("[","").replace("]","").split(","))),N),-1,axis=0)
        history_length[i] = np.max(order_number_history[i,:]) - 1

        #Create labels for final testing using the final step
        label[i] = NextInOrder_history[i,history_length[i]] 

        #Remove the last step's information to ensure no leakage while validation
        NextInOrder_history[i,history_length[i]] = 0
        IsInOrder_history[i,history_length[i]] = 0
        NumProductsFromDep_history[i,history_length[i]] = 0
        OrderSize_history[i,history_length[i]] = 0
        IndexInOrder_history[i,history_length[i]] = 0

        ProductID1_history[i,history_length[i]] = 0
        ProductID2_history[i,history_length[i]] = 0
        ProductID3_history[i,history_length[i]] = 0
        ProductID4_history[i,history_length[i]] = 0
        ProductID5_history[i,history_length[i]] = 0
        ProductID6_history[i,history_length[i]] = 0
        ProductID7_history[i,history_length[i]] = 0
        ProductID8_history[i,history_length[i]] = 0
        ProductID9_history[i,history_length[i]] = 0
        ProductID10_history[i,history_length[i]] = 0
        ProductID11_history[i,history_length[i]] = 0
        ProductID12_history[i,history_length[i]] = 0
        ProductID13_history[i,history_length[i]] = 0
        ProductID14_history[i,history_length[i]] = 0
        ProductID15_history[i,history_length[i]] = 0
        ProductID16_history[i,history_length[i]] = 0
        ProductID17_history[i,history_length[i]] = 0
        ProductID18_history[i,history_length[i]] = 0
        ProductID19_history[i,history_length[i]] = 0
        ProductID20_history[i,history_length[i]] = 0
        
        ProductNameEmbedding_history[i,history_length[i]] = np.zeros(shape=[50],dtype=np.float16)
        
    ProductID1_history_a = product_to_aisle(ProductID1_history).astype(np.int32) 
    ProductID2_history_a = product_to_aisle(ProductID2_history).astype(np.int32) 
    ProductID3_history_a = product_to_aisle(ProductID3_history).astype(np.int32) 
    ProductID4_history_a = product_to_aisle(ProductID4_history).astype(np.int32) 
    ProductID5_history_a = product_to_aisle(ProductID5_history).astype(np.int32) 
    ProductID6_history_a = product_to_aisle(ProductID6_history).astype(np.int32) 
    ProductID7_history_a = product_to_aisle(ProductID7_history).astype(np.int32) 
    ProductID8_history_a = product_to_aisle(ProductID8_history).astype(np.int32) 
    ProductID9_history_a = product_to_aisle(ProductID9_history).astype(np.int32) 
    ProductID10_history_a = product_to_aisle(ProductID10_history).astype(np.int32) 
    ProductID11_history_a = product_to_aisle(ProductID11_history).astype(np.int32) 
    ProductID12_history_a = product_to_aisle(ProductID12_history).astype(np.int32) 
    ProductID13_history_a = product_to_aisle(ProductID13_history).astype(np.int32) 
    ProductID14_history_a = product_to_aisle(ProductID14_history).astype(np.int32) 
    ProductID15_history_a = product_to_aisle(ProductID15_history).astype(np.int32) 
    ProductID16_history_a = product_to_aisle(ProductID16_history).astype(np.int32) 
    ProductID17_history_a = product_to_aisle(ProductID17_history).astype(np.int32) 
    ProductID18_history_a = product_to_aisle(ProductID18_history).astype(np.int32) 
    ProductID19_history_a = product_to_aisle(ProductID19_history).astype(np.int32) 
    ProductID20_history_a = product_to_aisle(ProductID20_history).astype(np.int32) 

    path_out = 'np/'
    arrays = {
        'user_id' : user_id,
        'master_aisle_id' : master_aisle_id,
        'master_department_id' : master_department_id,
        'IsInOrder_history' : IsInOrder_history,
        'NextInOrder_history' : NextInOrder_history,
        'NumProductsFromDep_history' : NumProductsFromDep_history,
        'OrderSize_history' : OrderSize_history,
        'IndexInOrder_history' : IndexInOrder_history,
        'order_number_history' : order_number_history,
        'order_hour_of_day_history' : order_hour_of_day_history,
        'order_dow_history' : order_dow_history,
        'days_since_prior_order_history' : days_since_prior_order_history,
        'history_length' : history_length,
        'ProductID1_history' : ProductID1_history,
        'ProductID2_history' : ProductID2_history,
        'ProductID3_history' : ProductID3_history,
        'ProductID4_history' : ProductID4_history,
        'ProductID5_history' : ProductID5_history,
        'ProductID6_history' : ProductID6_history,
        'ProductID7_history' : ProductID7_history,
        'ProductID8_history' : ProductID8_history,
        'ProductID9_history' : ProductID9_history,
        'ProductID10_history' : ProductID10_history,
        'ProductID11_history' : ProductID11_history,
        'ProductID12_history' : ProductID12_history,
        'ProductID13_history' : ProductID13_history,
        'ProductID14_history' : ProductID14_history,
        'ProductID15_history' : ProductID15_history,
        'ProductID16_history' : ProductID16_history,
        'ProductID17_history' : ProductID17_history,
        'ProductID18_history' : ProductID18_history,   
        'ProductID19_history' : ProductID19_history,
        'ProductID20_history' : ProductID20_history, 
        'ProductID1_history_a' : ProductID1_history_a,
        'ProductID2_history_a' : ProductID2_history_a,
        'ProductID3_history_a' : ProductID3_history_a,
        'ProductID4_history_a' : ProductID4_history_a,
        'ProductID5_history_a' : ProductID5_history_a,
        'ProductID6_history_a' : ProductID6_history_a,
        'ProductID7_history_a' : ProductID7_history_a,
        'ProductID8_history_a' : ProductID8_history_a,
        'ProductID9_history_a' : ProductID9_history_a,
        'ProductID10_history_a' : ProductID10_history_a,
        'ProductID11_history_a' : ProductID11_history_a,
        'ProductID12_history_a' : ProductID12_history_a,
        'ProductID13_history_a' : ProductID13_history_a,
        'ProductID14_history_a' : ProductID14_history_a,
        'ProductID15_history_a' : ProductID15_history_a,
        'ProductID16_history_a' : ProductID16_history_a,
        'ProductID17_history_a' : ProductID17_history_a,
        'ProductID18_history_a' : ProductID18_history_a,   
        'ProductID19_history_a' : ProductID19_history_a,
        'ProductID20_history_a' : ProductID20_history_a,
        'ProductNameEmbedding_history' : ProductNameEmbedding_history,
        'label' : label
    }

    if not os.path.isdir(path_out):
        os.mkdir(path_out)

    # Save as bcolz arrays    
    for key, value in arrays.items():
        save_array(path_out + '{}_{}.npy'.format(key,wid), value) 


In [71]:
tic()
results = Parallel(n_jobs=num_cores)(delayed(createArrays)(i) for i in worker_id)
toc()

took me 13045.454 seconds to do this..


# Scrapbook

In [None]:
train_percentage = 0.9
val_percentage = 1.0 - (train_percentage)
train_length = int(length * train_percentage)
val_length = length - (train_length)

for key, value in arrays.items():
    np.save(path_out + '{}_val.npy'.format(key), value[np.arange((train_length),length)]) 


In [None]:
arrayList = [    'ProductID1_history',
            'ProductID2_history',
            'ProductID3_history',
            'ProductID4_history',
            'ProductID5_history',
            'ProductID6_history',
            'ProductID7_history',
            'ProductID8_history',
            'ProductID9_history',
            'ProductID10_history',
            'ProductID11_history',
            'ProductID12_history',
            'ProductID13_history',
            'ProductID14_history',
            'ProductID15_history',
            'ProductID16_history',
            'ProductID17_history',
            'ProductID18_history',   
            'ProductID19_history',
            'ProductID20_history' ]

for i in range(len(arrayList)):
    np.save(path_out + '{}_a.npy'.format(arrayList[i]),\
            product_to_aisle(np.load(path_out + '{}.npy'.format(arrayList[i]))).astype(np.int32)) 
for i in range(len(arrayList)):
    np.save(path_out + '{}_a_val.npy'.format(arrayList[i]),\
            product_to_aisle(np.load(path_out + '{}_val.npy'.format(arrayList[i]))).astype(np.int32)) 