<a href="https://colab.research.google.com/github/0AlphaZero0/IBM-Coursera-AdvancedAI/blob/master/Advanced_Data_Science_Capstone_(IBM_Coursera).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Setup

### Kaggle

In [1]:
!pip install kaggle



In [2]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [3]:
!cp '/content/gdrive/My Drive/Credentials/kaggle.json' 'kaggle.json'

In [4]:
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!ls ~/.kaggle
!chmod 600 /root/.kaggle/kaggle.json  # set permission

kaggle.json


### Spark

In [5]:
!pip install --upgrade --force-reinstall pyspark

Collecting pyspark
  Using cached pyspark-3.1.2-py2.py3-none-any.whl
Collecting py4j==0.10.9
  Using cached py4j-0.10.9-py2.py3-none-any.whl (198 kB)
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9
    Uninstalling py4j-0.10.9:
      Successfully uninstalled py4j-0.10.9
  Attempting uninstall: pyspark
    Found existing installation: pyspark 3.1.2
    Uninstalling pyspark-3.1.2:
      Successfully uninstalled pyspark-3.1.2
Successfully installed py4j-0.10.9 pyspark-3.1.2


## Dataset

In [6]:
!kaggle datasets download -d clmentbisaillon/fake-and-real-news-dataset
!mkdir fakenews
!unzip fake-and-real-news-dataset.zip -d fakenews/
!rm fake-and-real-news-dataset.zip

Downloading fake-and-real-news-dataset.zip to /content
100% 41.0M/41.0M [00:00<00:00, 129MB/s]

mkdir: cannot create directory ‘fakenews’: File exists
Archive:  fake-and-real-news-dataset.zip
replace fakenews/Fake.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: n
replace fakenews/True.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: n


In [7]:
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import col,lit,to_date

In [8]:
spark = (SparkSession.builder
                  .appName('Fake news detector')
                  .enableHiveSupport()
                  .config("spark.executor.memory", "4G")
                  .config("spark.driver.memory","18G")
                  .config("spark.executor.cores","7")
                  .config("spark.python.worker.memory","4G")
                  .config("spark.driver.maxResultSize","0")
                  .config("spark.sql.crossJoin.enabled", "true")
                  .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
                  .config("spark.default.parallelism","2")
                  .getOrCreate())

In [9]:
spark.sparkContext.setLogLevel('INFO')

In [10]:
df_true = spark.read.csv('./fakenews/True.csv',header=True)
df_true = df_true.withColumn("Label",lit("True"))
print("There are",df_true.count(),"rows in the True dataset.")
df_false = spark.read.csv('./fakenews/Fake.csv',header=True)
df_false = df_false.withColumn("Label",lit("False"))
print("There are",df_false.count(),"rows in the False dataset.")

df = df_true.union(df_false)
df = df.withColumn("Datetime",to_date(df.date,"MMMM dd, YYYY"))
df = df.drop("date")
count = df.count()
print("In total, there are",count,"rows in the whole dataset.")

columns = ['text','title']
df = df.na.drop(subset=columns)
print(count-df.count(),"rows has been deleted because of Nan values in columns : "+" and ".join(columns),"=>",df.count(),"rows in the dataset")

# df.show()

There are 21417 rows in the True dataset.
There are 23489 rows in the False dataset.
In total, there are 44906 rows in the whole dataset.
8 rows has been deleted because of Nan values in columns : text and title => 44898 rows in the dataset


In [11]:
df.printSchema()

root
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- Label: string (nullable = false)
 |-- Datetime: date (nullable = true)



In [12]:
df_tmp = df.groupBy('subject').count().where("count>500")
df_tmp = df_tmp.sort(df_tmp['count'].desc())

df_tmp.show()



+---------------+-----+
|        subject|count|
+---------------+-----+
|   politicsNews|11209|
|      worldnews|10115|
|           News| 8501|
|       politics| 6525|
|      left-news| 4216|
|Government News| 1543|
|        US_News|  767|
|    Middle-east|  762|
+---------------+-----+



In [13]:
# Remove corrupted subjects
count = df.count()
subjects = [str(x.subject) for x in df_tmp.select('subject').collect()]
res_df = df.where(df.subject.isin(subjects))
    
print(count-res_df.count(),"rows has been deleted because of corrupted subjects =>",res_df.count(),"rows in the dataset")

1260 rows has been deleted because of corrupted subjects => 43638 rows in the dataset


In [14]:
df = res_df

## Model

In [15]:
df = df.select('text','Label')
df.show(5)

+--------------------+-----+
|                text|Label|
+--------------------+-----+
|WASHINGTON (Reute...| True|
|WASHINGTON (Reute...| True|
|WASHINGTON (Reute...| True|
|WASHINGTON (Reute...| True|
|SEATTLE/WASHINGTO...| True|
+--------------------+-----+
only showing top 5 rows



In [16]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover,CountVectorizer,IDF
from pyspark.ml.feature import StringIndexer

In [17]:
tokenizer = Tokenizer(inputCol='text',outputCol='mytokens')
stopwords_remover = StopWordsRemover(inputCol='mytokens',outputCol='filtered_tokens')
vectorizer = CountVectorizer(inputCol='filtered_tokens',outputCol='rawFeatures')
idf = IDF(inputCol='rawFeatures',outputCol='vectorizedFeatures')

In [None]:
labelEncoder = StringIndexer(inputCol='Label',outputCol='label').fit(df)
labelEncoder.transform(df).show(5)
label_dict = {"True":1.0,"False":0.0}

In [19]:
df = labelEncoder.transform(df)

In [20]:
trainDF,testDF = df.randomSplit((0.7,0.3),seed=42)

In [21]:
from pyspark.ml.classification import LogisticRegression

In [22]:
lr = LogisticRegression(featuresCol='vectorizedFeatures',labelCol='label')

In [23]:
from pyspark.ml import Pipeline

In [24]:
pipeline = Pipeline(stages=[tokenizer,stopwords_remover,vectorizer,idf,lr])

In [25]:
lr_model = pipeline.fit(trainDF)

In [29]:
predictions = lr_model.transform(testDF)
predictions.show()

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|                text|label|            mytokens|     filtered_tokens|         rawFeatures|  vectorizedFeatures|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
| ((This December ...|  1.0|[, ((this, decemb...|[, ((this, decemb...|(262144,[0,1,3,4,...|(262144,[0,1,3,4,...|[-55.063478500804...|[1.21964975450048...|       1.0|
| (Corrects Comey ...|  1.0|[, (corrects, com...|[, (corrects, com...|(262144,[0,1,2,3,...|(262144,[0,1,2,3,...|[-56.355976599189...|[3.34896293297034...|       1.0|
| (Corrects Feb. 2...|  1.0|[, (corrects, feb...|[, (corrects, feb...|(262144,[0,1,2,3,...|(262144,[0,1,2,3,...|[-69.085824949656...|[9.91761960331462...|       1.0|
| (C

In [30]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [31]:
evaluator = MulticlassClassificationEvaluator(labelCol='label',predictionCol='prediction',metricName='accuracy')

In [32]:
accuracy = evaluator.evaluate(predictions)
accuracy

0.9888983774551665