In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [None]:
sc = SparkContext(appName = "test-app-trofim")

In [None]:
! echo $PYSPARK_SUBMIT_ARGS

In [None]:
spark = SparkSession.Builder().getOrCreate() # required for dataframes

## Text pre-processing

In [None]:
inputDF = spark.createDataFrame([(0, "This is an apple. An apple is a fruit, not a vegetable"),
                                 (1, "Fruits are tasty"),
                                 (2, "Vegetables are nasty")],
                                  ["id", "document"])

In [None]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer

In [None]:
tokenizer = Tokenizer(inputCol = "document", outputCol = "words")

tokenizedDF = tokenizer.transform(inputDF)
tokenizedDF.select('id', 'document').show(truncate = False)
tokenizedDF.select('id', 'words').show(truncate = False)

In [None]:
regexTokenizer = RegexTokenizer(inputCol = "document", outputCol = "words", pattern = "\\s+|,|\\.")

tokenizedDF = regexTokenizer.transform(inputDF)
tokenizedDF.select('id', 'document').show(truncate = False)
tokenizedDF.select('id', 'words').show(truncate = False)

In [None]:
from pyspark.ml.feature import StopWordsRemover

In [None]:
stopwordsRemover = StopWordsRemover(inputCol = "words", outputCol = "words_filtered")

In [None]:
print stopwordsRemover.loadDefaultStopWords('english')

In [None]:
removedDF = stopwordsRemover.transform(tokenizedDF)
removedDF.show(truncate = True)

In [None]:
removedDF.select("document", "words_filtered").show(truncate = False)

In [None]:
from pyspark.ml.feature import NGram

In [None]:
ngram = NGram(n = 2, inputCol = "words", outputCol = "ngrams")  # bigram
ngramDF = ngram.transform(removedDF)

In [None]:
row = ngramDF.select("document", "ngrams").collect()[0]

print row['document']
print row['ngrams']

In [None]:
row = ngramDF.select("document", "ngrams").collect()[1]

print row['document']
print row['ngrams']

In [None]:
ngram = NGram(n = 3, inputCol = "words", outputCol = "ngrams")  # trigram
ngramDF = ngram.transform(removedDF)

In [None]:
row = ngramDF.select("document", "ngrams").collect()[0]

print row['document']
print row['ngrams']

In [None]:
row = ngramDF.select("document", "ngrams").collect()[1]

print row['document']
print row['ngrams']

## TF * IDF

### term frequency

In [None]:
from pyspark.ml.feature import CountVectorizer    # implemented as an estimator

In [None]:
#countVectorizer = CountVectorizer(inputCol = "words_filtered", outputCol = "features_tf", vocabSize = 2)
countVectorizer = CountVectorizer(inputCol = "words_filtered", outputCol = "features_tf") 
model = countVectorizer.fit(removedDF)

In [None]:
print model.vocabulary
print len(model.vocabulary)

In [None]:
countDF = model.transform(removedDF)
row = countDF.collect()[0]

In [None]:
print row['document']
print row['words_filtered']
print row['features_tf']

In [None]:
type(row['features_tf'])

In [None]:
v = row['features_tf']

In [None]:
v.toArray()

### Document frequency

In [None]:
from pyspark.ml.feature import HashingTF, IDF  # implemented as an estimator

In [None]:
idf = IDF(inputCol = "features_tf", outputCol = "features_tf_idf")
idfModel = idf.fit(countDF)

In [None]:
featuresDF = idfModel.transform(countDF)
featuresDF.select("id", "document").show(truncate = False)
featuresDF.select("id", "features_tf_idf").show(truncate = False)

In [None]:
row = featuresDF.collect()[0]

In [None]:
print row['document']
print row['words_filtered']
print row['features_tf']
print row['features_tf_idf']

### Categorical features

In [45]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

In [46]:
catDF = spark.createDataFrame([
    (0, "New York"),
    (1, "Moscow"),
    (2, "Beijing"),
    (3, "New York"),
    (4, "Paris"),
    (5, "Paris"),
    (6, "New York"),
    (7, "Beijing")],
    ["row_id", "city"])

In [47]:
stringIndexer = StringIndexer(inputCol = "city", outputCol = "cityIndex")
model = stringIndexer.fit(catDF)
indexedDF = model.transform(catDF)

indexedDF.show()

+------+--------+---------+
|row_id|    city|cityIndex|
+------+--------+---------+
|     0|New York|      0.0|
|     1|  Moscow|      3.0|
|     2| Beijing|      1.0|
|     3|New York|      0.0|
|     4|   Paris|      2.0|
|     5|   Paris|      2.0|
|     6|New York|      0.0|
|     7| Beijing|      1.0|
+------+--------+---------+



In [52]:
encoder = OneHotEncoder(inputCol = "cityIndex", outputCol = "cityVec")
encoder.setDropLast(False)
encodedDF = encoder.transform(indexedDF)
encodedDF.show()

+------+--------+---------+-------------+
|row_id|    city|cityIndex|      cityVec|
+------+--------+---------+-------------+
|     0|New York|      0.0|(4,[0],[1.0])|
|     1|  Moscow|      3.0|(4,[3],[1.0])|
|     2| Beijing|      1.0|(4,[1],[1.0])|
|     3|New York|      0.0|(4,[0],[1.0])|
|     4|   Paris|      2.0|(4,[2],[1.0])|
|     5|   Paris|      2.0|(4,[2],[1.0])|
|     6|New York|      0.0|(4,[0],[1.0])|
|     7| Beijing|      1.0|(4,[1],[1.0])|
+------+--------+---------+-------------+



In [53]:
row = encodedDF.collect()[0]
row['cityVec'].toArray()

array([ 1.,  0.,  0.,  0.])

In [54]:
row = encodedDF.collect()[1]
row['cityVec'].toArray()

array([ 0.,  0.,  0.,  1.])

In [55]:
catDF = spark.createDataFrame([
    (0, "New York", "books"),
    (1, "Moscow", "moovies"),
    (2, "Beijing", "clothes"),
    (3, "Ney York", "clothes"),
    (4, "Paris", "books"),
    (5, "Paris", "electronics"),
    (6, "New York", "electronics"),
    (7, "Beijing", "moovies")],
    ["row_id", "city", "category"])

In [56]:
stringIndexer = StringIndexer(inputCol = "city", outputCol = "cityIndex")
model = stringIndexer.fit(catDF)
indexedDF = model.transform(catDF)

stringIndexer2 = StringIndexer(inputCol = "category", outputCol = "categoryIndex")
model2 = stringIndexer2.fit(indexedDF)
indexedDF2 = model2.transform(indexedDF)

In [57]:
encoder = OneHotEncoder(inputCol = "cityIndex", outputCol = "cityVec")
encoder.setDropLast(False)
encodedDF = encoder.transform(indexedDF2)

encoder2 = OneHotEncoder(inputCol = "categoryIndex", outputCol = "categoryVec")
encoder2.setDropLast(False)
encodedDF2 = encoder2.transform(encodedDF)
encodedDF2.select('city', 'category', 'cityIndex', 'categoryIndex', 'cityVec', 'categoryVec').show(truncate = False)

+--------+-----------+---------+-------------+-------------+-------------+
|city    |category   |cityIndex|categoryIndex|cityVec      |categoryVec  |
+--------+-----------+---------+-------------+-------------+-------------+
|New York|books      |2.0      |0.0          |(5,[2],[1.0])|(4,[0],[1.0])|
|Moscow  |moovies    |3.0      |2.0          |(5,[3],[1.0])|(4,[2],[1.0])|
|Beijing |clothes    |0.0      |1.0          |(5,[0],[1.0])|(4,[1],[1.0])|
|Ney York|clothes    |4.0      |1.0          |(5,[4],[1.0])|(4,[1],[1.0])|
|Paris   |books      |1.0      |0.0          |(5,[1],[1.0])|(4,[0],[1.0])|
|Paris   |electronics|1.0      |3.0          |(5,[1],[1.0])|(4,[3],[1.0])|
|New York|electronics|2.0      |3.0          |(5,[2],[1.0])|(4,[3],[1.0])|
|Beijing |moovies    |0.0      |2.0          |(5,[0],[1.0])|(4,[2],[1.0])|
+--------+-----------+---------+-------------+-------------+-------------+



In [58]:
from pyspark.ml.feature import VectorAssembler

In [59]:
assembler = VectorAssembler(
    inputCols = ["cityVec", "categoryVec"],
    outputCol = "totalVec")

finalDF = assembler.transform(encodedDF2)

In [60]:
finalDF.select('city', 'category', 'cityVec', 'categoryVec', 'totalVec').show(truncate = True)

+--------+-----------+-------------+-------------+-------------------+
|    city|   category|      cityVec|  categoryVec|           totalVec|
+--------+-----------+-------------+-------------+-------------------+
|New York|      books|(5,[2],[1.0])|(4,[0],[1.0])|(9,[2,5],[1.0,1.0])|
|  Moscow|    moovies|(5,[3],[1.0])|(4,[2],[1.0])|(9,[3,7],[1.0,1.0])|
| Beijing|    clothes|(5,[0],[1.0])|(4,[1],[1.0])|(9,[0,6],[1.0,1.0])|
|Ney York|    clothes|(5,[4],[1.0])|(4,[1],[1.0])|(9,[4,6],[1.0,1.0])|
|   Paris|      books|(5,[1],[1.0])|(4,[0],[1.0])|(9,[1,5],[1.0,1.0])|
|   Paris|electronics|(5,[1],[1.0])|(4,[3],[1.0])|(9,[1,8],[1.0,1.0])|
|New York|electronics|(5,[2],[1.0])|(4,[3],[1.0])|(9,[2,8],[1.0,1.0])|
| Beijing|    moovies|(5,[0],[1.0])|(4,[2],[1.0])|(9,[0,7],[1.0,1.0])|
+--------+-----------+-------------+-------------+-------------------+



### Feature Interactions - not supported in python interface

In [None]:
:(((((( ;(( ^_^