In [1]:
from pyspark import SparkContext, SparkConf, SQLContext, HiveContext, SparkFiles
from pyspark.sql import *
import math
from collections import defaultdict
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StringType, MapType, ArrayType
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, NGram, OneHotEncoder, StringIndexer, CountVectorizer
from pyspark.ml.feature import VectorIndexer, VectorAssembler
from pyspark.mllib.util import MLUtils
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf

from pyspark.sql.functions import *
import os
import re
import sys

### Read our train and test data in as Pyspark Dataframes, using the inferSchema option

In [2]:
PATH = "./"
train_df = (spark.read
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .option("inferSchema", "true")
    .csv('{}train.tsv'.format(PATH), sep="\t"))

test_df = (spark.read
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .option("inferSchema", "true")
    .csv('{}test.tsv'.format(PATH), sep="\t"))

### Basic NLP Feature Engineering

In [3]:
tokenizer = RegexTokenizer(inputCol="item_description", outputCol="tokens", pattern="\\W")
train_df_temp = tokenizer.transform(train_df)
remover = StopWordsRemover(inputCol="tokens", outputCol="unigrams")
train_df_temp = remover.transform(train_df_temp)
ngram = NGram(n=2, inputCol="unigrams", outputCol="bigrams")
train_df_temp = ngram.transform(train_df_temp)
def replace_blank(x):
    return when((col(x) == "") | col(x).isNull(), "None").otherwise(col(x))
train_df_temp = train_df_temp.withColumn('brand_name', replace_blank('brand_name'))

tokenizer = RegexTokenizer(inputCol="item_description", outputCol="tokens", pattern="\\W")
test_df_temp = tokenizer.transform(test_df)
remover = StopWordsRemover(inputCol="tokens", outputCol="unigrams")
test_df_temp = remover.transform(test_df_temp)
ngram = NGram(n=2, inputCol="unigrams", outputCol="bigrams")
test_df_temp = ngram.transform(test_df_temp)
def replace_blank(x):
    return when((col(x) == "") | col(x).isNull(), "None").otherwise(col(x))
test_df_temp = train_df_temp.withColumn('brand_name', replace_blank('brand_name'))

### One hot encoding of unigrams and bigrams
Let's one hot encode the ungigrams an bigrams to make it easier to select and experiment with features when we move to sklearn models

In [4]:
stringIndexer = StringIndexer(inputCol="brand_name", outputCol="brand_category_index")
model = stringIndexer.fit(train_df_temp)
train_df_temp = model.transform(train_df_temp)
encoder = OneHotEncoder(inputCol="brand_category_index", outputCol="categoryVec")
train_df_temp = encoder.transform(train_df_temp)

stringIndexer = StringIndexer(inputCol="brand_name", outputCol="brand_category_index")
model = stringIndexer.fit(test_df_temp)
test_df_temp = model.transform(test_df_temp)
encoder = OneHotEncoder(inputCol="brand_category_index", outputCol="categoryVec")
test_df_temp = encoder.transform(test_df_temp)

In [6]:
cv = CountVectorizer(inputCol="unigrams", outputCol="vectors")
model = cv.fit(train_df_temp)
train_df_temp = model.transform(train_df_temp)

model = cv.fit(test_df_temp)
test_df_temp = model.transform(test_df_temp)

Py4JJavaError: An error occurred while calling o260.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 10.0 failed 1 times, most recent failure: Lost task 3.0 in stage 10.0 (TID 33, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$2: (string) => array<string>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
	at org.apache.spark.ml.feature.RegexTokenizer$$anonfun$createTransformFunc$2.apply(Tokenizer.scala:142)
	at org.apache.spark.ml.feature.RegexTokenizer$$anonfun$createTransformFunc$2.apply(Tokenizer.scala:140)
	... 16 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
	at org.apache.spark.ml.feature.CountVectorizer.fit(CountVectorizer.scala:176)
	at org.apache.spark.ml.feature.CountVectorizer.fit(CountVectorizer.scala:123)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$2: (string) => array<string>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
Caused by: java.lang.NullPointerException
	at org.apache.spark.ml.feature.RegexTokenizer$$anonfun$createTransformFunc$2.apply(Tokenizer.scala:142)
	at org.apache.spark.ml.feature.RegexTokenizer$$anonfun$createTransformFunc$2.apply(Tokenizer.scala:140)
	... 16 more


### Join the unigrams and bigrams fields to write out to csv format

In [None]:
train_df_temp = train_df_temp.withColumn('unigrams_string', concat_ws(',', 'unigrams'))
train_df_temp = train_df_temp.withColumn('bigrams_string', concat_ws(',', 'bigrams'))

In [None]:
ignore = ['train_id', 'test_id', 'price']
assembler = VectorAssembler(
    inputCols=[x for x in df.columns if x not in ignore],
    outputCol='features')
assembler.transform(train_df_temp)

In [None]:
# train_features_df = train_df_temp.select('train_id', 'name', 'item_condition_id', 'category_name', 'brand_name', 'price', 'shipping', 'item_description', 'unigrams_string', 'bigrams_string')
# test_features_df = train_df_temp.select('test_id', 'name', 'item_condition_id', 'category_name', 'brand_name', 'shipping', 'item_description', 'unigrams_string', 'bigrams_string')
# train_features_df.toPandas().to_csv('/tmp/train_features.csv')
# test_features_df.toPandas().to_csv('/tmp/test_features.csv')


TODO: combine these steps in a pipeline