In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as func


#创建SparkSession。
spark = SparkSession.builder.getOrCreate()

#读取文件并存储到DataFrame中。
df = spark.read.csv('../Datasets/news/news_sentiment.csv', header=False)

#指定标签列，并对文本特征列的数据进行分词处理。
df = df.select(df._c0.alias('label'), func.split(df._c1, ' ').alias('words'))

#分割出训练和测试集。
(train_df, test_df) = df.randomSplit([0.8, 0.2], seed=2021)

21/10/22 11:08:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/10/22 11:08:25 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
21/10/22 11:08:25 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
21/10/22 11:08:25 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
21/10/22 11:08:25 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
21/10/22 11:08:25 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
                                                                                

In [None]:
from pyspark.ml.feature import CountVectorizer, StringIndexer, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


#对标签数据进行数字化编码。
labelIndexer = StringIndexer(inputCol="label", outputCol="idx_label")

#对文本数据进行词频特征抽取。
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=500)

#对词频特征进行标准化转换。
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')

#使用逻辑斯蒂回归分类器。
classifier = LogisticRegression(labelCol="idx_label", featuresCol="scaled_features")

#使用Pipeline，构建标签编码、特征抽取、特征转换，以及模型分类的执行流程。
pipeline = Pipeline(stages=[labelIndexer, cv, scaler, classifier])


#采用留一验证方式进行超参数寻优。
paramGrid = ParamGridBuilder()\
    .addGrid(classifier.regParam, [1.0, 0.1, 0.01]) \
    .addGrid(classifier.fitIntercept, [False, True])\
    .addGrid(classifier.elasticNetParam, [0.0, 0.2, 0.5, 0.8, 1.0])\
    .build()

#构建评估器。
evaluator = MulticlassClassificationEvaluator(labelCol="idx_label", predictionCol="prediction", metricName="accuracy")

#采用交叉验证的方式进行超参数寻优。
cross_val = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid,
                           evaluator=evaluator, numFolds=5)

model = cross_val.fit(train_df)

predictions = model.transform(test_df)

21/10/22 11:08:53 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
21/10/22 11:08:53 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [None]:
accuracy = evaluator.evaluate(predictions)

#评估分类器的准确率。
print ('Spark-ML的逻辑斯蒂回归分类器经过交叉验证优化后，在news_sentiment测试集上的准确率为：%.2f%%。' %(accuracy * 100))