In [1]:
import os
src_path = os.getcwd()

In [2]:
#pyspark 세션 생성
from pyspark.context import SparkContext
from pyspark.sql import SQLContext

#로컬 단일머신에서 spark를 실행하기 때문에 local로 지정
#spark는 sql과 같은 연산을 이용해 거대한 데이터 테이블을 다루는 dataframe 추상화 이용
sc = SparkContext('local', 'test')
sql = SQLContext(sc)

In [3]:
#원시 테이블을 위한 dataframe 정의
from pyspark.sql.functions import lit
dem_df = sql.read.text("file://"+src_path+"/dem.txt")
gop_df = sql.read.text("file://"+src_path+"/gop.txt")

In [6]:
#두 소스들을 모두 포함하는 corpus_df를 만들고 dem이 1, gop이 0을 가지는 레이블열 추가
corpus_df = dem_df.select("value", lit(1).alias("label")).union(gop_df.select("value", lit(0).alias("label")))

In [8]:
corpus_df.select("*").limit(5).show()

+--------------------+-----+
|               value|label|
+--------------------+-----+
|A very merry Chri...|    1|
|Stay safe and pro...|    1|
|RT @ossoff: We ca...|    1|
|.@JoeBiden and @K...|    1|
|RT @KamalaHarris:...|    1|
+--------------------+-----+



In [9]:
#데이터를 훈련용과 테스트용으로 나눔
train_df, test_df = corpus_df.randomSplit([0.75, 0.25])

In [11]:
#파이프라인을 이용한 훈련
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, Tokenizer, StopWordsRemover

tokenizer = Tokenizer(inputCol="value", outputCol="words")
stop_words_remover = StopWordsRemover(inputCol="words", outputCol="words_cleaned")
vectorizer = CountVectorizer(inputCol="words_cleaned", outputCol="features")

#데이터를 정리하는데 필요한 파이프라인 정의
cleaning_pipeline = Pipeline(stages=[tokenizer, stop_words_remover, vectorizer])
cleaning_pipeline_model = cleaning_pipeline.fit(corpus_df)

cleaned_training_df = cleaning_pipeline_model.transform(train_df)
cleaned_testing_df = cleaning_pipeline_model.transform(test_df)

In [13]:
cleaned_training_df.show(n=5)

+--------------------+-----+--------------------+--------------------+--------------------+
|               value|label|               words|       words_cleaned|            features|
+--------------------+-----+--------------------+--------------------+--------------------+
|.@DenisMcDonough ...|    1|[.@denismcdonough...|[.@denismcdonough...|(2616,[11,21,50,8...|
|.@JanetYellen is ...|    1|[.@janetyellen, i...|[.@janetyellen, o...|(2616,[22,33,61,9...|
|.@JoeBiden and @K...|    1|[.@joebiden, and,...|[.@joebiden, @kam...|(2616,[7,15,30,31...|
|.@JoeBiden and @K...|    1|[.@joebiden, and,...|[.@joebiden, @kam...|(2616,[3,7,14,30,...|
|.@JoeBiden and @K...|    1|[.@joebiden, and,...|[.@joebiden, @kam...|(2616,[4,7,8,29,3...|
+--------------------+-----+--------------------+--------------------+--------------------+
only showing top 5 rows



In [14]:
#위 열들을 naive bayes 분류기에 지정하여 모델의 학습
from pyspark.ml.classification import NaiveBayes
naive_bayes = NaiveBayes(featuresCol="features", labelCol="label")
naive_bayes_model = naive_bayes.fit(cleaned_training_df)
predictions_df = naive_bayes_model.transform(cleaned_testing_df)

In [15]:
predictions_df.select("features", "label", "prediction").limit(3).show()

+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|(2616,[22,64,94,2...|    1|       1.0|
|(2616,[4,7,10,15,...|    1|       1.0|
|(2616,[4,5,7,15,2...|    1|       1.0|
+--------------------+-----+----------+



In [16]:
#정확도 평가
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label",
                                             predictionCol="prediction",
                                             metricName="accuracy")
evaluator.evaluate(predictions_df)

0.9396551724137931