In [1]:
from pyspark.sql import SparkSession 
from pyspark.ml import Pipeline 
from pyspark.ml.classification import LogisticRegression 
from pyspark.ml.feature import HashingTF, Tokenizer 
spark = SparkSession.builder.getOrCreate() 
training = spark.createDataFrame([ 
 (0, "This is a testing for spark", 1.0), 
 (1, "kudu ozone", 0.0), 
 (2, "spark is in-memory engine", 1.0), 
 (3, "hive is data warehouse", 0.0), 
 (4, "hadoop is mapreduce for batch", 0.0) 
], ["id", "text", "label"]) 

In [2]:
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. 
tokenizer = Tokenizer(inputCol="text", outputCol="words") 
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") 
lr = LogisticRegression(maxIter=10, regParam=0.001) 
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) 


In [3]:
model = pipeline.fit(training) 


2024-09-24 11:12:37,941 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
2024-09-24 11:12:38,006 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [4]:
training.show(5,False) 


+---+-----------------------------+-----+
|id |text                         |label|
+---+-----------------------------+-----+
|0  |This is a testing for spark  |1.0  |
|1  |kudu ozone                   |0.0  |
|2  |spark is in-memory engine    |1.0  |
|3  |hive is data warehouse       |0.0  |
|4  |hadoop is mapreduce for batch|0.0  |
+---+-----------------------------+-----+



In [6]:
from pyspark.sql import Row
Document = Row("id", "text") 
test = sc.parallelize([(5, "K O 1"), 
 (6, "spark hadoop spark impala"), 
 (7, "apache spark open-source"), 
 (8, "spark is a platform"), 
 (9, "Hadoop is for Big Data")]).map(lambda x: Document(*x)).toDF() 

In [7]:
# Make predictions on test documents and print columns of interest 
prediction = model.transform(test) 
selected = prediction.select("id", "text", "probability", "prediction") 
for row in selected.collect(): 
 rid, text, prob, prediction = row 
 print( 
 "(%d, %s) --> prob=%s, prediction=%f" % ( 
 rid, text, str(prob), prediction 
 ) 
 ) 

(5, K O 1) --> prob=[0.760510559377351,0.23948944062264899], prediction=0.000000
(6, spark hadoop spark impala) --> prob=[0.09933562532409927,0.9006643746759008], prediction=1.000000
(7, apache spark open-source) --> prob=[0.1938094405325343,0.8061905594674657], prediction=1.000000
(8, spark is a platform) --> prob=[0.03587540471389243,0.9641245952861076], prediction=1.000000
(9, Hadoop is for Big Data) --> prob=[0.9907289093358328,0.009271090664167203], prediction=0.000000
