In [1]:
from collections import Counter
from pyspark.sql import Row
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import HashingTF
from pyspark.ml.feature import IDF
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

dataPath = """File uploaded to /FileStore/tables/C000008.txt
File uploaded to /FileStore/tables/C000010.txt
File uploaded to /FileStore/tables/C000013.txt
File uploaded to /FileStore/tables/C000014.txt
File uploaded to /FileStore/tables/C000007.txt
File uploaded to /FileStore/tables/C000016.txt
File uploaded to /FileStore/tables/C000020.txt
File uploaded to /FileStore/tables/C000022.txt"""
dataPath = [i.split(' ')[-1] for i in dataPath.split('\n')]

In [2]:
columns = ["category", "text"]
df = sc.textFile(dataPath[0]) \
  .map(lambda x: x.split(',')) \
  .map(lambda x: (x[0], x[1])) \
  .toDF(columns)

In [3]:
df.select("category", 'text').take(2)

In [4]:
tokenizer = Tokenizer().setInputCol('text').setOutputCol('word')
wordData = tokenizer.transform(df)

In [5]:
wordData.select("category","text","word").take(2)

In [6]:
hashingTF = HashingTF().setInputCol("word").setOutputCol("feature").setNumFeatures(100)
featureData = hashingTF.transform(wordData)

In [7]:
featureData.show(2)

In [8]:
idf = IDF().setInputCol('feature').setOutputCol('IDFfeature')
idfModel = idf.fit(featureData)
resData = idfModel.transform(featureData)

In [9]:
resData.show(2)

In [10]:
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

training.show()

In [11]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training)

In [12]:
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

In [13]:
from pyspark.ml.feature import Word2Vec
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])

word2Vec = Word2Vec(vectorSize=3, minCount=0, 
                    inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)
result = model.transform(documentDF)
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

In [14]:
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
    (7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,),
    (8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,),
    (9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,)
], ["id", "features", "clicked"])

selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="clicked")
result = selector.fit(df).transform(df)
result.show()