Amaç: true ve fake haberleden oluşan dataseti ile model oluşturup tahminler yapacağız.

In [1]:
import pyspark
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext

import os
import sys
from pyspark.sql import SparkSession

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
sc= SparkContext(master= 'local', appName= 'Fake and real news')
spark= SparkSession(sc)

In [None]:
"sc.stop()"

In [4]:
from pyspark.sql.types import StringType, StructField, StructType
def read_data(path):
  schema= StructType(
      [StructField('title',StringType(),True),
      StructField('text',StringType(),True),
      StructField('subject',StringType(),True),
      StructField('date',StringType(),True)])
  pd_df= pd.read_csv(path)
  spark_df= spark.createDataFrame(pd_df, schema= schema)
  return spark_df


In [5]:
path_true= 'True.csv'
true_df= read_data(path_true)
true_df.show(5)

+--------------------+--------------------+------------+------------------+
|               title|                text|     subject|              date|
+--------------------+--------------------+------------+------------------+
|As U.S. budget fi...|WASHINGTON (Reute...|politicsNews|December 31, 2017 |
|U.S. military to ...|WASHINGTON (Reute...|politicsNews|December 29, 2017 |
|Senior U.S. Repub...|WASHINGTON (Reute...|politicsNews|December 31, 2017 |
|FBI Russia probe ...|WASHINGTON (Reute...|politicsNews|December 30, 2017 |
|Trump wants Posta...|SEATTLE/WASHINGTO...|politicsNews|December 29, 2017 |
+--------------------+--------------------+------------+------------------+
only showing top 5 rows



In [6]:
path_fake= 'Fake.csv'
fake_df= read_data(path_fake)
fake_df.show(5)

+--------------------+--------------------+-------+-----------------+
|               title|                text|subject|             date|
+--------------------+--------------------+-------+-----------------+
| Donald Trump Sen...|Donald Trump just...|   News|December 31, 2017|
| Drunk Bragging T...|House Intelligenc...|   News|December 31, 2017|
| Sheriff David Cl...|On Friday, it was...|   News|December 30, 2017|
| Trump Is So Obse...|On Christmas day,...|   News|December 29, 2017|
| Pope Francis Jus...|Pope Francis used...|   News|December 25, 2017|
+--------------------+--------------------+-------+-----------------+
only showing top 5 rows



In [7]:
# satır ve sutun sayısı

true_df.count(), len(true_df.columns)

(21417, 4)

In [8]:
# satır ve sutun sayısı

fake_df.count(), len(fake_df.columns)

(23481, 4)

In [9]:
# true verime label kolonu ekledim ve değerini 1 verdim

from pyspark.sql.functions import lit, rand
true_df = true_df.withColumn("label", lit(1))
true_df.show(3)

+--------------------+--------------------+------------+------------------+-----+
|               title|                text|     subject|              date|label|
+--------------------+--------------------+------------+------------------+-----+
|As U.S. budget fi...|WASHINGTON (Reute...|politicsNews|December 31, 2017 |    1|
|U.S. military to ...|WASHINGTON (Reute...|politicsNews|December 29, 2017 |    1|
|Senior U.S. Repub...|WASHINGTON (Reute...|politicsNews|December 31, 2017 |    1|
+--------------------+--------------------+------------+------------------+-----+
only showing top 3 rows



In [10]:
# fake verime label kolonu ekledim ve değerini 0 verdim

from pyspark.sql.functions import lit, rand
fake_df = fake_df.withColumn("label", lit(0))
fake_df.show(3)

+--------------------+--------------------+-------+-----------------+-----+
|               title|                text|subject|             date|label|
+--------------------+--------------------+-------+-----------------+-----+
| Donald Trump Sen...|Donald Trump just...|   News|December 31, 2017|    0|
| Drunk Bragging T...|House Intelligenc...|   News|December 31, 2017|    0|
| Sheriff David Cl...|On Friday, it was...|   News|December 30, 2017|    0|
+--------------------+--------------------+-------+-----------------+-----+
only showing top 3 rows



In [11]:
# iki veriyi birleştirdim

df = true_df.union(fake_df).orderBy(rand())
df.show(5)

+--------------------+--------------------+---------------+------------------+-----+
|               title|                text|        subject|              date|label|
+--------------------+--------------------+---------------+------------------+-----+
|Flynn pleads guil...|WASHINGTON (Reute...|   politicsNews| December 1, 2017 |    1|
|Greek prime minis...|WASHINGTON (Reute...|   politicsNews| October 17, 2017 |    1|
|Myanmar to grant ...|YANGON (Reuters) ...|      worldnews|December 23, 2017 |    1|
|Mongolian parliam...|ULAANBAATAR (Reut...|      worldnews|September 7, 2017 |    1|
|GUN GRABBING JUDG...|In a 10-4 ruling,...|Government News|      Feb 23, 2017|    0|
+--------------------+--------------------+---------------+------------------+-----+
only showing top 5 rows



In [12]:
# toplam satır ve sutun sayısı

df.count(),len(df.columns)

(44898, 5)

In [13]:
df.printSchema()

root
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- date: string (nullable = true)
 |-- label: integer (nullable = false)



In [58]:
# eksik değerler kontrolü

from pyspark.sql import functions as F

# df içindeki eksik değerleri kontrol et
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show(5)

+-----+----+-------+-----+
|title|text|subject|label|
+-----+----+-------+-----+
|    0|   0|      0|    0|
+-----+----+-------+-----+



In [14]:
# target sutunumun dağılımını kontrol ettim

df.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1|21417|
|    0|23481|
+-----+-----+



In [15]:
# subject sutunu da işlemlerime ekleyeceğim onu da kontrol ettim

df.groupBy("subject").count().show(5)

+------------+-----+
|     subject|count|
+------------+-----+
|   worldnews|10145|
|politicsNews|11272|
|     US_News|  783|
|   left-news| 4459|
|    politics| 6841|
+------------+-----+
only showing top 5 rows



In [16]:
df.show(3)

+--------------------+--------------------+------------+------------------+-----+
|               title|                text|     subject|              date|label|
+--------------------+--------------------+------------+------------------+-----+
|Flynn pleads guil...|WASHINGTON (Reute...|politicsNews| December 1, 2017 |    1|
|Greek prime minis...|WASHINGTON (Reute...|politicsNews| October 17, 2017 |    1|
|Myanmar to grant ...|YANGON (Reuters) ...|   worldnews|December 23, 2017 |    1|
+--------------------+--------------------+------------+------------------+-----+
only showing top 3 rows



## title sutunu için

In [17]:
# pattern = '\\W' sadece harf ve rakamları al,noktalama işaretlerini ve boşlukları çıkar
# toLowercase= True = tüm kelimeleri küçük harfe dönüştür
from pyspark.ml.feature import SQLTransformer, RegexTokenizer
title_tokenizer= RegexTokenizer(inputCol= 'title', outputCol= 'title_words',
                                pattern= '\\W', toLowercase= True)
title_tokenized_df = title_tokenizer.transform(df)


# stopword leri çıkar
from pyspark.ml.feature import SQLTransformer, StopWordsRemover
title_sw_remover= StopWordsRemover(inputCol= 'title_words',
                                   outputCol= 'title_sw_removed')
title_sw_removed_df = title_sw_remover.transform(title_tokenized_df)


# terim frekans vektörünü oluştur
# belgeler arasındaki kelime sıklığını dikkate alır
from pyspark.ml.feature import SQLTransformer,CountVectorizer
title_count_vectorizer= CountVectorizer(inputCol= 'title_sw_removed',
                                        outputCol= 'tf_title')
title_count_vectorized_df = title_count_vectorizer.fit(title_sw_removed_df).transform(title_sw_removed_df)


# belgelerdeki terimlerin önemini ölçer
# Ters belge frekans vektörüne oluşturulur
from pyspark.ml.feature import SQLTransformer, IDF
title_tfidf= IDF(inputCol= 'tf_title', outputCol= 'tf_idf_title')
df = title_tfidf.fit(title_count_vectorized_df).transform(title_count_vectorized_df)

df.show(5)

+--------------------+--------------------+---------------+------------------+-----+--------------------+--------------------+--------------------+--------------------+
|               title|                text|        subject|              date|label|         title_words|    title_sw_removed|            tf_title|        tf_idf_title|
+--------------------+--------------------+---------------+------------------+-----+--------------------+--------------------+--------------------+--------------------+
|Flynn pleads guil...|WASHINGTON (Reute...|   politicsNews| December 1, 2017 |    1|[flynn, pleads, g...|[flynn, pleads, g...|(22081,[4,16,111,...|(22081,[4,16,111,...|
|Greek prime minis...|WASHINGTON (Reute...|   politicsNews| October 17, 2017 |    1|[greek, prime, mi...|[greek, prime, mi...|(22081,[5,92,130,...|(22081,[5,92,130,...|
|Myanmar to grant ...|YANGON (Reuters) ...|      worldnews|December 23, 2017 |    1|[myanmar, to, gra...|[myanmar, grant, ...|(22081,[41,106,21...|(22081,[

## text sutunu için

In [18]:
text_tokenizer= RegexTokenizer(inputCol= 'text', outputCol= 'text_words',
                                pattern= '\\W', toLowercase= True)
text_tokenized_df = text_tokenizer.transform(df)


text_sw_remover= StopWordsRemover(inputCol= 'text_words',
                                  outputCol= 'text_sw_removed')
text_sw_removed_df = text_sw_remover.transform(text_tokenized_df)


text_count_vectorizer= CountVectorizer(inputCol= 'text_sw_removed',
                                       outputCol= 'tf_text')
text_count_vectorized_df = text_count_vectorizer.fit(text_sw_removed_df).transform(text_sw_removed_df)


text_tfidf= IDF(inputCol= 'tf_text', outputCol= 'tf_idf_text')
df = text_tfidf.fit(text_count_vectorized_df).transform(text_count_vectorized_df)

df.show(5)

+--------------------+--------------------+---------------+------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|               title|                text|        subject|              date|label|         title_words|    title_sw_removed|            tf_title|        tf_idf_title|          text_words|     text_sw_removed|             tf_text|         tf_idf_text|
+--------------------+--------------------+---------------+------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Flynn pleads guil...|WASHINGTON (Reute...|   politicsNews| December 1, 2017 |    1|[flynn, pleads, g...|[flynn, pleads, g...|(22081,[4,16,111,...|(22081,[4,16,111,...|[wash, ngton, reu...|[wash, ngton, reu...|(123121,[0,1,2,3,...|(123121,[0

## subject sutunu

In [19]:
from pyspark.ml.feature import StringIndexer

subject_str_indexer= StringIndexer(inputCol= 'subject', outputCol= 'subject_idx')
subject_str_indexer_model = subject_str_indexer.fit(df)
df = subject_str_indexer_model.transform(df)

df.show(5)

+--------------------+--------------------+---------------+------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+
|               title|                text|        subject|              date|label|         title_words|    title_sw_removed|            tf_title|        tf_idf_title|          text_words|     text_sw_removed|             tf_text|         tf_idf_text|subject_idx|
+--------------------+--------------------+---------------+------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+
|Flynn pleads guil...|WASHINGTON (Reute...|   politicsNews| December 1, 2017 |    1|[flynn, pleads, g...|[flynn, pleads, g...|(22081,[4,16,111,...|(22081,[4,16,111,...|[wash, ngton, reu...|[wash, ngton, re

In [20]:
from pyspark.ml.feature import VectorAssembler
vec_assembler= VectorAssembler(inputCols=['tf_idf_title', 'tf_idf_text', 'subject_idx'],
                               outputCol= 'features')


In [21]:
df = vec_assembler.transform(df)

df.show(5)

+--------------------+--------------------+---------------+------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+--------------------+
|               title|                text|        subject|              date|label|         title_words|    title_sw_removed|            tf_title|        tf_idf_title|          text_words|     text_sw_removed|             tf_text|         tf_idf_text|subject_idx|            features|
+--------------------+--------------------+---------------+------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+--------------------+
|Flynn pleads guil...|WASHINGTON (Reute...|   politicsNews| December 1, 2017 |    1|[flynn, pleads, g...|[flynn, pleads, g...|(22081,[4,16,111

In [22]:
df = df.select("title", "text","subject","label")
df.show(5)


+--------------------+--------------------+---------------+-----+
|               title|                text|        subject|label|
+--------------------+--------------------+---------------+-----+
|Flynn pleads guil...|WASHINGTON (Reute...|   politicsNews|    1|
|Greek prime minis...|WASHINGTON (Reute...|   politicsNews|    1|
|Myanmar to grant ...|YANGON (Reuters) ...|      worldnews|    1|
|Mongolian parliam...|ULAANBAATAR (Reut...|      worldnews|    1|
|GUN GRABBING JUDG...|In a 10-4 ruling,...|Government News|    0|
+--------------------+--------------------+---------------+-----+
only showing top 5 rows



In [23]:
train, test= df.randomSplit([0.8, 0.2], seed=42)

In [24]:
train.show(5)

+--------------------+--------------------+-------+-----+
|               title|                text|subject|label|
+--------------------+--------------------+-------+-----+
|\r\r\r\r\r\r\r\nD...|A 10-year-old gir...|   News|    0|
|\r\r\r\r\r\r\r\nE...|At this point, ev...|   News|    0|
| #AfterTrumpImplo...|What will the wor...|   News|    0|
| #BlackLivesMatte...|The police shooti...|   News|    0|
| #BringBackObama ...|The six months si...|   News|    0|
+--------------------+--------------------+-------+-----+
only showing top 5 rows



In [25]:
test.show(5)

+--------------------+--------------------+-------+-----+
|               title|                text|subject|label|
+--------------------+--------------------+-------+-----+
|\r\r\r\r\r\r\r\nW...|Massachusetts Rep...|   News|    0|
| #FreeChrisChrist...|Last Friday, New ...|   News|    0|
| #NeverTrump Cons...|Donald Trump is b...|   News|    0|
| #THERESISTANCE I...|Over the weekend,...|   News|    0|
| 10 Reasons Donal...|There are no two ...|   News|    0|
+--------------------+--------------------+-------+-----+
only showing top 5 rows



In [27]:

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

rf = RandomForestClassifier(featuresCol='features', labelCol='label')
rf_pipe= Pipeline(stages=[
                title_tokenizer,
                title_sw_remover,
                title_count_vectorizer,
                title_tfidf,
                text_tokenizer,
                text_sw_remover,
                text_count_vectorizer,
                text_tfidf,
                subject_str_indexer,
                vec_assembler,
                rf])


"""
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [50, 100])
             .addGrid(rf.maxDepth, [3, 5])
             .build())


evaluator = BinaryClassificationEvaluator()

cv = CrossValidator(estimator=rf_pipe,
                    evaluator=evaluator,
                    numFolds=5
                    estimatorParamMaps=paramGrid)

cvModel = cv.fit(train)
"""

rf_model= rf_pipe.fit(train)

train

In [51]:
# modeli değerlendirmek için kullanılacak fonksiyon
from pyspark.ml.evaluation import  MulticlassClassificationEvaluator, BinaryClassificationEvaluator

accuracy= MulticlassClassificationEvaluator(predictionCol= 'prediction', metricName= 'accuracy')
f1= MulticlassClassificationEvaluator(predictionCol= 'prediction', metricName= 'f1')
areaUnderROC= BinaryClassificationEvaluator(metricName= 'areaUnderROC')

def classification_evaluator(data_result):
    #data_result.crosstab(col1='predictions',col2="label").show()
    data_result.groupBy('prediction', 'label').count().show(5)
    print('accuracy:', accuracy.evaluate(data_result))
    print('f1:', f1.evaluate(data_result))
    print('areaUnderROC (AUC):', areaUnderROC.evaluate(data_result))

In [48]:
# train setini tahmin et
rf_train_result= rf_model.transform(train)

In [52]:
classification_evaluator(rf_train_result)

+----------+-----+-----+
|prediction|label|count|
+----------+-----+-----+
|       1.0|    0|  327|
|       0.0|    0|18489|
|       0.0|    1| 1563|
|       1.0|    1|15571|
+----------+-----+-----+

accuracy: 0.9474269819193324
f1: 0.9472792958255327
areaUnderROC (AUC): 0.9937758438958345


test

In [53]:
# test setini tahmin et
rf_test_result= rf_model.transform(test)

In [54]:
classification_evaluator(rf_test_result)

+----------+-----+-----+
|prediction|label|count|
+----------+-----+-----+
|       1.0|    0|   83|
|       0.0|    0| 4582|
|       0.0|    1|  409|
|       1.0|    1| 3874|
+----------+-----+-----+

accuracy: 0.9450156459544032
f1: 0.9448561441809127
areaUnderROC (AUC): 0.993195987326455
