In [1]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark duo fuxi this is cool mike", 1.0),
    (1, "b d f i hate mike hate data ", 0.0),
    (2, "spark i love spark ddd example hadoop", 1.0),
    (3, "i love coding and ml", 0.0),
   (4, "i want to move fast", 1.0), 
   (5, "Mike like student to ask question", 1.0),
   (6, "Mike hate people to ask hwo to import spark URL", 0.0)
], ["id", "text", "label"])

In [2]:
display(training)

id,text,label
0,a b c d e spark duo fuxi this is cool mike,1.0
1,b d f i hate mike hate data,0.0
2,spark i love spark ddd example hadoop,1.0
3,i love coding and ml,0.0
4,i want to move fast,1.0
5,Mike like student to ask question,1.0
6,Mike hate people to ask hwo to import spark URL,0.0


In [3]:
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")

# now build the estimator 面试时常会问的问题，就是 你调优LR 的时候，你用了什么参数？
lr = LogisticRegression(maxIter=10, regParam=0.001)

# build a pipeline here
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
#piepline.saveAs("location")
#pipeline.load("location")

In [4]:
preprocess_model = pipeline.fit(training)

In [5]:
# Fit the pipeline to training documents.
model = pipeline.fit(training)
# model.saveAs(directory)

In [6]:
# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

In [7]:
display(test)

id,text
4,spark i j k
5,l m n
6,spark hadoop spark
7,apache hadoop


In [8]:
# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")

In [9]:
hash_results = preprocess_model.transform(test)

In [10]:
display(hash_results)

id,text,words,features,rawPrediction,probability,prediction
4,spark i j k,"List(spark, i, j, k)","List(0, 262144, List(20197, 24417, 227520, 234657), List(1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(-1.021302773370314, 1.021302773370314))","List(1, 2, List(), List(0.2647737137672841, 0.7352262862327159))",1.0
5,l m n,"List(l, m, n)","List(0, 262144, List(18910, 100743, 213302), List(1.0, 1.0, 1.0))","List(1, 2, List(), List(-0.5051253893484876, 0.5051253893484876))","List(1, 2, List(), List(0.37633694143148283, 0.6236630585685172))",1.0
6,spark hadoop spark,"List(spark, hadoop, spark)","List(0, 262144, List(155117, 234657), List(1.0, 2.0))","List(1, 2, List(), List(-3.3943146193514866, 3.3943146193514866))","List(1, 2, List(), List(0.032473620122634224, 0.9675263798773658))",1.0
7,apache hadoop,"List(apache, hadoop)","List(0, 262144, List(66695, 155117), List(1.0, 1.0))","List(1, 2, List(), List(-2.187390745427375, 2.187390745427375))","List(1, 2, List(), List(0.1008885327842177, 0.8991114672157823))",1.0


In [11]:
display(selected)

id,text,probability,prediction
4,spark i j k,"List(1, 2, List(), List(0.2647737137672841, 0.7352262862327159))",1.0
5,l m n,"List(1, 2, List(), List(0.37633694143148283, 0.6236630585685172))",1.0
6,spark hadoop spark,"List(1, 2, List(), List(0.032473620122634224, 0.9675263798773658))",1.0
7,apache hadoop,"List(1, 2, List(), List(0.1008885327842177, 0.8991114672157823))",1.0


In [12]:
for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

In [13]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
 
df = spark.createDataFrame([
	(0, "a"),
	(1, "b"),
	(2, "c"),
	(3, "a"),
	(4, "a"),
	(5, "c")
], ["id", "category"])
 
stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
# indexed.show()
 
encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()