In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import HashingTF, IDF, Tokenizer


In [85]:
sentenceData = spark.createDataFrame([
    (0, "Python python Spark Spark"),
    (1, "Python SQL Java"),
    (2,"SQL Java")],
 ["document", "sentence"])

In [86]:
sentenceData.show(truncate=False)

+--------+-------------------------+
|document|sentence                 |
+--------+-------------------------+
|0       |Python python Spark Spark|
|1       |Python SQL Java          |
|2       |SQL Java                 |
+--------+-------------------------+



Tokenization is the process of taking text (such as a sentence) and breaking it into individual 
terms (usually words). A simple Tokenizer class provides this functionality. 
The example below shows how to split sentences into sequences of words.

In [87]:
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

In [88]:
wordsData = tokenizer.transform(sentenceData)

In [89]:
wordsData.show(truncate=False)

+--------+-------------------------+------------------------------+
|document|sentence                 |words                         |
+--------+-------------------------+------------------------------+
|0       |Python python Spark Spark|[python, python, spark, spark]|
|1       |Python SQL Java          |[python, sql, java]           |
|2       |SQL Java                 |[sql, java]                   |
+--------+-------------------------+------------------------------+



CountVectorizer and CountVectorizerModel aim to help convert a collection of text documents to vectors of token counts. When an a-priori dictionary is not available, CountVectorizer can be used as an Estimator to extract the vocabulary, and generates a CountVectorizerModel. The model produces sparse representations for the documents over the vocabulary, which can then be passed to other algorithms like LDA.

In [90]:
vectorizer  = CountVectorizer(inputCol="words", outputCol="rawFeatures")

In [91]:
cv = CountVectorizer(inputCol="words", outputCol="rawFeatures",  vocabSize=4,minDF=1.0)

model = cv.fit(wordsData)

result = model.transform(wordsData)
result.show(truncate=False)

+--------+-------------------------+------------------------------+-------------------------+
|document|sentence                 |words                         |rawFeatures              |
+--------+-------------------------+------------------------------+-------------------------+
|0       |Python python Spark Spark|[python, python, spark, spark]|(4,[0,2],[2.0,2.0])      |
|1       |Python SQL Java          |[python, sql, java]           |(4,[0,1,3],[1.0,1.0,1.0])|
|2       |SQL Java                 |[sql, java]                   |(4,[1,3],[1.0,1.0])      |
+--------+-------------------------+------------------------------+-------------------------+



In [124]:
cv = CountVectorizer(inputCol="words", outputCol="rawFeatures", vocabSize=4, minDF=1.0)

model = cv.fit(wordsData)



In [125]:
result = model.transform(wordsData)
result.show(truncate=False)

+--------+-------------------------+------------------------------+-------------------------+
|document|sentence                 |words                         |rawFeatures              |
+--------+-------------------------+------------------------------+-------------------------+
|0       |Python python Spark Spark|[python, python, spark, spark]|(4,[0,2],[2.0,2.0])      |
|1       |Python SQL Java          |[python, sql, java]           |(4,[0,1,3],[1.0,1.0,1.0])|
|2       |SQL Java                 |[sql, java]                   |(4,[1,3],[1.0,1.0])      |
+--------+-------------------------+------------------------------+-------------------------+



IDF: IDF is an Estimator which is fit on a dataset and produces an IDFModel. The IDFModel takes feature vectors (generally created from HashingTF or CountVectorizer) and scales each feature. Intuitively, it down-weights features which appear frequently in a corpus.

In [126]:
idf = IDF(inputCol="rawFeatures", outputCol="features")

In [127]:
idfModel = idf.fit(result)
rescaledData = idfModel.transform(result)

#rescaledData.select("label", "features").show()

In [128]:
rescaledData.show(truncate=False)

+--------+-------------------------+------------------------------+-------------------------+-------------------------------------------------------------------------+
|document|sentence                 |words                         |rawFeatures              |features                                                                 |
+--------+-------------------------+------------------------------+-------------------------+-------------------------------------------------------------------------+
|0       |Python python Spark Spark|[python, python, spark, spark]|(4,[0,2],[2.0,2.0])      |(4,[0,2],[0.5753641449035617,1.3862943611198906])                        |
|1       |Python SQL Java          |[python, sql, java]           |(4,[0,1,3],[1.0,1.0,1.0])|(4,[0,1,3],[0.28768207245178085,0.28768207245178085,0.28768207245178085])|
|2       |SQL Java                 |[sql, java]                   |(4,[1,3],[1.0,1.0])      |(4,[1,3],[0.28768207245178085,0.28768207245178085])                

In [129]:
pipeline = Pipeline(stages=[tokenizer, vectorizer, idf])

In [130]:
model = pipeline.fit(sentenceData)

In [98]:
model.stages

[Tokenizer_879c014ad63b, CountVectorizer_e7e0055b945c, IDF_04fbe42238c4]

In [99]:
model.stages[1].explainParams()

"binary: Binary toggle to control the output vector values. If True, all nonzero counts (after minTF filter applied) are set to 1. This is useful for discrete probabilistic models that model binary events rather than integer counts. Default False (default: False)\ninputCol: input column name. (current: words)\nmaxDF: Specifies the maximum number of different documents a term could appear in to be included in the vocabulary. A term that appears more than the threshold will be ignored. If this is an integer >= 1, this specifies the maximum number of documents the term could appear in; if this is a double in [0,1), then this specifies the maximum fraction of documents the term could appear in. Default (2^63) - 1 (default: 9.223372036854776e+18)\nminDF: Specifies the minimum number of different documents a term must appear in to be included in the vocabulary. If this is an integer >= 1, this specifies the number of documents the term must appear in; if this is a double in [0,1), then this 

In [131]:
model.transform(sentenceData).show(truncate=False)

+--------+-------------------------+------------------------------+-------------------------+-------------------------------------------------------------------------+
|document|sentence                 |words                         |rawFeatures              |features                                                                 |
+--------+-------------------------+------------------------------+-------------------------+-------------------------------------------------------------------------+
|0       |Python python Spark Spark|[python, python, spark, spark]|(5,[0,4],[2.0,2.0])      |(5,[0,4],[1.3862943611198906,0.5753641449035617])                        |
|1       |Python SQL Java          |[python, sql, java]           |(5,[1,2,4],[1.0,1.0,1.0])|(5,[1,2,4],[0.28768207245178085,0.28768207245178085,0.28768207245178085])|
|2       |SQL Java                 |[sql, java]                   |(5,[1,2],[1.0,1.0])      |(5,[1,2],[0.28768207245178085,0.28768207245178085])                

In [102]:
model.transform(sentenceData).select('rawFeatures').rdd.map(lambda row: row['rawFeatures']).collect()

[SparseVector(4, {0: 2.0, 3: 2.0}),
 SparseVector(4, {0: 1.0, 1: 1.0, 2: 1.0}),
 SparseVector(4, {1: 1.0, 2: 1.0})]

In [103]:
import numpy as np

total_counts = model.transform(sentenceData)\
                    .select('rawFeatures').rdd\
                    .map(lambda row: row['rawFeatures'].toArray())\
                    .reduce(lambda x,y: [x[i]+y[i] for i in range(len(y))])

vocabList = model.stages[1].vocabulary

In [105]:
vocabList

['python', 'java', 'sql', 'spark']

In [106]:

d = {'vocabList':vocabList,'counts':total_counts}

spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys())).show()

+---------+------+
|vocabList|counts|
+---------+------+
|   python|   3.0|
|     java|   2.0|
|      sql|   2.0|
|    spark|   2.0|
+---------+------+



In [107]:
counts = model.transform(sentenceData).select('rawFeatures').collect()
counts

[Row(rawFeatures=SparseVector(4, {0: 2.0, 3: 2.0})),
 Row(rawFeatures=SparseVector(4, {0: 1.0, 1: 1.0, 2: 1.0})),
 Row(rawFeatures=SparseVector(4, {1: 1.0, 2: 1.0}))]

In [108]:
model.transform(sentenceData).show(truncate=False)

+--------+-------------------------+------------------------------+-------------------------+-------------------------------------------------------------------------+
|document|sentence                 |words                         |rawFeatures              |features                                                                 |
+--------+-------------------------+------------------------------+-------------------------+-------------------------------------------------------------------------+
|0       |Python python Spark Spark|[python, python, spark, spark]|(4,[0,3],[2.0,2.0])      |(4,[0,3],[0.5753641449035617,1.3862943611198906])                        |
|1       |Python SQL Java          |[python, sql, java]           |(4,[0,1,2],[1.0,1.0,1.0])|(4,[0,1,2],[0.28768207245178085,0.28768207245178085,0.28768207245178085])|
|2       |SQL Java                 |[sql, java]                   |(4,[1,2],[1.0,1.0])      |(4,[1,2],[0.28768207245178085,0.28768207245178085])                

HashingTF
HashingTF is a Transformer which takes sets of terms and converts those sets into fixed-length feature vectors. In text processing, a “set of terms” might be a bag of words. HashingTF utilizes the hashing trick. A raw feature is mapped into an index (term) by applying a hash function. The hash function used here is MurmurHash 3. Then term frequencies are calculated based on the mapped indices. This approach avoids the need to compute a global term-to-index map, which can be expensive for a large corpus, but it suffers from potential hash collisions, where different raw features may become the same term after hashing.

In [112]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0, "Python python Spark Spark"),
    (1, "Python SQL Java"),
    (2,"SQL Java")],
 ["document", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
vectorizer  = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=5)

idf = IDF(inputCol="rawFeatures", outputCol="features")

pipeline = Pipeline(stages=[tokenizer, vectorizer, idf])


model = pipeline.fit(sentenceData)
model.transform(sentenceData).show(truncate=False)

+--------+-------------------------+------------------------------+-------------------------+-------------------------------------------------------------------------+
|document|sentence                 |words                         |rawFeatures              |features                                                                 |
+--------+-------------------------+------------------------------+-------------------------+-------------------------------------------------------------------------+
|0       |Python python Spark Spark|[python, python, spark, spark]|(5,[0,4],[2.0,2.0])      |(5,[0,4],[1.3862943611198906,0.5753641449035617])                        |
|1       |Python SQL Java          |[python, sql, java]           |(5,[1,2,4],[1.0,1.0,1.0])|(5,[1,2,4],[0.28768207245178085,0.28768207245178085,0.28768207245178085])|
|2       |SQL Java                 |[sql, java]                   |(5,[1,2],[1.0,1.0])      |(5,[1,2],[0.28768207245178085,0.28768207245178085])                