In [None]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.1.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease [1,581 B]
Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Get:7 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:8 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:9 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:10 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Get:11 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ Packages [91.1 kB]
Get:12 https://developer.download.nvidia.com/compute/cud

In [None]:
from pyspark.ml.feature import Tokenizer
from pyspark.sql import SparkSession
import findspark
findspark.init()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
spark = SparkSession.builder.appName("Tokens").getOrCreate()

In [None]:
news_spark = spark.read.csv("/content/drive/My Drive/fake.csv",sep='\t', header = True)

In [None]:
news_spark.show(3)

+---+--------------------+--------------------+-----+
|_c0|               title|                text|label|
+---+--------------------+--------------------+-----+
|  0|LAW ENFORCEMENT O...|No comment is exp...|    1|
|  1|                ghgg|Did they post the...|    1|
|  2|UNBELIEVABLE! OBA...| Now, most of the...|    1|
+---+--------------------+--------------------+-----+
only showing top 3 rows



In [None]:
from pyspark.sql.functions import concat,col,lit
test = news_spark.select("*", concat(col("title"),lit("-"),col("text")).alias("text1"))
news_spark = test

In [None]:
# Tokenize sentences
tok_title = Tokenizer(inputCol="title", outputCol="Tok_title")
tok_text = Tokenizer(inputCol = "text1", outputCol= "Tok_text")

In [None]:
news_spark.select('label').distinct().collect()

[Row(label='0'), Row(label='1')]

In [None]:
def word_list_length(word_list):
    return len(word_list)
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
# Create a user defined function
count_tokens = udf(word_list_length, IntegerType())

In [None]:
tokenized_df = tok_title.transform(news_spark)
tokenized_df = tok_text.transform(tokenized_df)
tokenized_df = tokenized_df.withColumn("title tokens", count_tokens(col("Tok_title")))
tokenized_df =  tokenized_df.withColumn("text tokens", count_tokens(col("Tok_text")))
tokenized_df.show(2)

+---+--------------------+--------------------+-----+--------------------+--------------------+--------------------+------------+-----------+
|_c0|               title|                text|label|               text1|           Tok_title|            Tok_text|title tokens|text tokens|
+---+--------------------+--------------------+-----+--------------------+--------------------+--------------------+------------+-----------+
|  0|LAW ENFORCEMENT O...|No comment is exp...|    1|LAW ENFORCEMENT O...|[law, enforcement...|[law, enforcement...|          18|        958|
|  1|                ghgg|Did they post the...|    1|ghgg-Did they pos...|              [ghgg]|[ghgg-did, they, ...|           1|          8|
+---+--------------------+--------------------+-----+--------------------+--------------------+--------------------+------------+-----------+
only showing top 2 rows



In [None]:
from pyspark.ml.feature import StopWordsRemover
stop_title = StopWordsRemover(inputCol="Tok_title", outputCol="stop_title")
stop_text = StopWordsRemover(inputCol="Tok_text", outputCol="stop_text")
filtered = stop_title.transform(tokenized_df)
filtered = stop_text.transform(filtered)
filtered.show()

+---+--------------------+--------------------+-----+--------------------+--------------------+--------------------+------------+-----------+--------------------+--------------------+
|_c0|               title|                text|label|               text1|           Tok_title|            Tok_text|title tokens|text tokens|          stop_title|           stop_text|
+---+--------------------+--------------------+-----+--------------------+--------------------+--------------------+------------+-----------+--------------------+--------------------+
|  0|LAW ENFORCEMENT O...|No comment is exp...|    1|LAW ENFORCEMENT O...|[law, enforcement...|[law, enforcement...|          18|        958|[law, enforcement...|[law, enforcement...|
|  1|                ghgg|Did they post the...|    1|ghgg-Did they pos...|              [ghgg]|[ghgg-did, they, ...|           1|          8|              [ghgg]|[ghgg-did, post, ...|
|  2|UNBELIEVABLE! OBA...| Now, most of the...|    1|UNBELIEVABLE! OBA...|[unbel

In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
hashingTF1 = HashingTF(inputCol="stop_title", outputCol="hash_title")
hashingTF2 = HashingTF(inputCol="stop_text", outputCol="hash_text")
hashed_df = hashingTF1.transform(filtered)
hashed_df = hashingTF2.transform(hashed_df)

hashed_df.show()

+---+--------------------+--------------------+-----+--------------------+--------------------+--------------------+------------+-----------+--------------------+--------------------+--------------------+--------------------+
|_c0|               title|                text|label|               text1|           Tok_title|            Tok_text|title tokens|text tokens|          stop_title|           stop_text|          hash_title|           hash_text|
+---+--------------------+--------------------+-----+--------------------+--------------------+--------------------+------------+-----------+--------------------+--------------------+--------------------+--------------------+
|  0|LAW ENFORCEMENT O...|No comment is exp...|    1|LAW ENFORCEMENT O...|[law, enforcement...|[law, enforcement...|          18|        958|[law, enforcement...|[law, enforcement...|(262144,[19684,22...|(262144,[619,992,...|
|  1|                ghgg|Did they post the...|    1|ghgg-Did they pos...|              [ghgg]|[

In [None]:
idf1 = IDF(inputCol="hash_title", outputCol="idf_title")
idf2 = IDF(inputCol="hash_text", outputCol="idf_text")
idfModel = idf1.fit(hashed_df)
rescaledData = idfModel.transform(hashed_df)
rescaledData.select("hash_title", "hash_text", "idf_title").show()
idfModel = idf2.fit(rescaledData)
rescaledData = idfModel.transform(rescaledData)
rescaledData.select("hash_title", "hash_text", "idf_title", "idf_text").show()

+--------------------+--------------------+--------------------+
|          hash_title|           hash_text|           idf_title|
+--------------------+--------------------+--------------------+
|(262144,[19684,22...|(262144,[619,992,...|(262144,[19684,22...|
|(262144,[45523],[...|(262144,[84100,10...|(262144,[45523],[...|
|(262144,[17893,31...|(262144,[3564,538...|(262144,[17893,31...|
|(262144,[54679,11...|(262144,[511,1546...|(262144,[54679,11...|
|(262144,[3571,171...|(262144,[161,921,...|(262144,[3571,171...|
|(262144,[30367,72...|(262144,[2162,227...|(262144,[30367,72...|
|(262144,[36217,83...|(262144,[29066,36...|(262144,[36217,83...|
|(262144,[20326,31...|(262144,[20326,52...|(262144,[20326,31...|
|(262144,[31895,66...|(262144,[960,6957...|(262144,[31895,66...|
|(262144,[41421,59...|(262144,[1546,160...|(262144,[41421,59...|
|(262144,[47685,92...|(262144,[751,1512...|(262144,[47685,92...|
|(262144,[18923,38...|(262144,[115,1772...|(262144,[18923,38...|
|(262144,[36525,44...|(26

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
clean = VectorAssembler(inputCols=['idf_text', 'text tokens'], outputCol='features')

In [None]:
cleaned = clean.transform(rescaledData)

In [None]:
cleaned.select("label").distinct().collect()

[Row(label='0'), Row(label='1')]

In [None]:
cleaned = cleaned.withColumn("label", cleaned["label"].cast('numeric'))

In [None]:
cleaned = cleaned.select("text","features","label")

In [None]:
test = 5
i = 3
print(str(i) +"Accuracy : %f" % test)

3Accuracy : 5.000000


In [None]:
for i in range(45,48):
  cleaned
  training, testing = cleaned.randomSplit([0.7, 0.3], i)
  from pyspark.ml.classification import NaiveBayes
  # Create a Naive Bayes model and fit training data
  nb = NaiveBayes()
  predictor = nb.fit(training)
  test_results = predictor.transform(testing)
  from pyspark.ml.evaluation import BinaryClassificationEvaluator
  acc_eval = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol = 'prediction')
  acc = acc_eval.evaluate(test_results)
  print(str(i) +"Accuracy : %f" % acc)

45Accuracy : 0.939212
46Accuracy : 0.940787
47Accuracy : 0.938911


In [None]:
training, testing = cleaned.randomSplit([0.7, 0.3], 48)

In [None]:
from pyspark.ml.classification import NaiveBayes
# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [None]:
test_results = predictor.transform(testing)
test_results.show(5)

+--------------------+--------------------+-----+--------------------+--------------------+----------+
|                text|            features|label|       rawPrediction|         probability|prediction|
+--------------------+--------------------+-----+--------------------+--------------------+----------+
|    Barbra Streis...|(262145,[3105,538...|    0|[-13003.492272400...|[1.0,6.0623875751...|       0.0|
|    actor Daniel ...|(262145,[1745,178...|    0|[-10754.384884971...|[1.0,4.5012626818...|       0.0|
|    months after ...|(262145,[511,1546...|    0|[-6381.7968171430...|[1.0,2.8393452960...|       0.0|
|   France  —   Th...|(262145,[161,324,...|    0|[-43216.029242995...|           [1.0,0.0]|       0.0|
|   James Franco s...|(262145,[329,3657...|    0|[-6652.8472764883...|[1.0,7.8546315542...|       0.0|
+--------------------+--------------------+-----+--------------------+--------------------+----------+
only showing top 5 rows



In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
acc_eval = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol = 'prediction')
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting fake news was : %f" % acc)

Accuracy of model at predicting fake news was : 0.944615


In [None]:
from google.colab import files
test_results.toPandas().to_csv('Confusion_matrix_data.csv',encoding = 'utf-8-sig') 
files.download('Confusion_matrix_data.csv')

In [None]:
news_df["title"].str.split(expand=True).stack().value_counts()