# 👨‍🔬 Mr. Owlf POC
By Anthony Vilarim Caliani

[![#](https://img.shields.io/badge/licence-MIT-lightseagreen.svg)](#) [![#](https://img.shields.io/badge/python-3.7.x-yellow.svg)](#)

## Description
Mr. Owlf Proof of Concept.


## Related Links
- [Pantech Solutions: Fake News Detector](https://www.pantechsolutions.net/fake-news-detection-using-machine-learning)
- [Towards Data Science: Fake News Detector using NLP](https://towardsdatascience.com/i-built-a-fake-news-detector-using-natural-language-processing-and-classification-models-da180338860e)
- [Towards Data Science: Training Fake News Detection AI](https://towardsdatascience.com/i-trained-fake-news-detection-ai-with-95-accuracy-and-almost-went-crazy-d10589aa57c)
- [GitHub Project (1)](https://github.com/jfantell/Fake-News-Detection)
- [GitHub Project (2)](https://github.com/jasminevasandani/NLP_Classification_Model_FakeNews)

---

_You can find [@avcaliani](#) at [GitHub](https://github.com/avcaliani) or [GitLab](https://gitlab.com/avcaliani)._

In [1]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [0]:
from typing import Tuple, List, Set

import numpy as np
import pandas as pd

from nltk import download
from nltk.corpus import stopwords
from pandas import DataFrame, Series, to_datetime, read_csv
from sklearn import metrics
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import Pipeline

In [0]:
#  _____        _          _    _ _   _ _     
# |  __ \      | |        | |  | | | (_) |    
# | |  | | __ _| |_ __ _  | |  | | |_ _| |___ 
# | |  | |/ _` | __/ _` | | |  | | __| | / __|
# | |__| | (_| | || (_| | | |__| | |_| | \__ \
# |_____/ \__,_|\__\__,_|  \____/ \__|_|_|___/
#


def read(file_name: str) -> DataFrame:
    print(r'+-----------------------------------+')
    print(r'|           Reading File            |')
    print(r'+-----------------------------------+')

    print(f'\nReading file: "{file_name}"')
    df = read_csv(file_name)
    print(f'Shape: {df.shape}')
    print(f'Sample...\n{df.head(2)}\n...\n{df.tail(2)}\n')
    return df


def clean_data(df: DataFrame) -> None:
    print(r'+-----------------------------------+')
    print(r'|        Data Frame Clean Up        |')
    print(r'+-----------------------------------+')

    print(f'Old shape: {df.shape}')
    # Drop duplicate rows
    df.drop_duplicates(subset='title', inplace=True)
    # Remove punctation
    df['title'] = df['title'].str.replace(r'[^\w\s]', ' ')
    # Remove numbers
    df['title'] = df['title'].str.replace(r'[^A-Za-z]', ' ')
    # Make sure any double-spaces are single
    df['title'] = df['title'].str.replace('  ', ' ')
    df['title'] = df['title'].str.replace('  ', ' ')
    # Transform all text to lowercase
    df['title'] = df['title'].str.lower()
    # Remove null values
    df.dropna(inplace=True)
    print(f'New shape: {df.shape}')


def show_statistics(df: DataFrame) -> None:
    print(r'+-----------------------------------+')
    print(r'|       Data Frame Statistics       |')
    print(r'+-----------------------------------+')

    # Convert Unix Timestamp to Datetime
    df['timestamp'] = to_datetime(df['timestamp'], unit='s')
    print(f'\nDate range of posts...')
    print(f'* Start date:\t{df["timestamp"].min()}')
    print(f'* End date:\t{df["timestamp"].max()}')

    # Set x values: # of posts
    authors: DataFrame = df['author'].value_counts()
    authors: DataFrame = authors[authors > 100].sort_values(ascending=False)
    print(
        f'\nMost Active Authors...\n{authors.head(2)}\n...\n{authors.tail(2)}\n')

    # Set x values: # of posts
    domains: DataFrame = df['domain'].value_counts()
    domains: DataFrame = domains.sort_values(ascending=False).head(5)
    print(
        f'\nMost referenced domains...\n{domains.head(2)}\n...\n{domains.tail(2)}\n')


#           _____   _    _ _   _ _     
#     /\   |_   _| | |  | | | (_) |    
#    /  \    | |   | |  | | |_ _| |___ 
#   / /\ \   | |   | |  | | __| | / __|
#  / ____ \ _| |_  | |__| | |_| | \__ \
# /_/    \_\_____|  \____/ \__|_|_|___/
#


def count_vectorizer(df: DataFrame, filter_value: int, ngram_range: Tuple[int, int] = (1, 1)) -> DataFrame:
    print(r'+-----------------------------------+')
    print(r'|         Count Vectorizer          |')
    print(r'+-----------------------------------+')

    # Set variables to show only one category titles
    titles = df[df['subreddit'] == filter_value]['title']

    cv = CountVectorizer(stop_words='english', ngram_range=ngram_range)
    df_cvec = DataFrame(
        # Fit and transform the vectorizer on our corpus
        cv.fit_transform(titles).toarray(),
        columns=cv.get_feature_names()
    )

    print(f'Count Vectorizer Result Shape: {df_cvec.shape}')
    print(f'Sample...\n{df_cvec.head(2)}\n...\n{df_cvec.tail(2)}\n')
    return df_cvec


def unigrams(df: DataFrame, df_2: DataFrame = None) -> Set:
    print(r'+-----------------------------------+')
    print(r'|             Unigrams              |')
    print(r'+-----------------------------------+')

    # Set up variables to contain top 5 most used words
    df_top_5: Series = df.sum(axis=0).sort_values(ascending=False).head(5)
    df_top_5_set = set(df_top_5.index)
    print(f'\nDF:\n{df_top_5}')

    df_2_top_5_set = None
    if df_2 is not None:
        df_2_top_5: Series = df_2.sum(
            axis=0).sort_values(ascending=False).head(5)
        df_2_top_5_set = set(df_2_top_5.index)
        print(f'\nDF 2:\n{df_2_top_5}')

    if df_2_top_5_set is not None:
        unigrams = df_top_5_set.intersection(df_2_top_5_set)
    else:
        unigrams = df_top_5_set

    print(f'Unigrams: {unigrams}')
    return unigrams


def get_stop_words(unigrams: List, bigrams: List) -> List:
    print(r'+-----------------------------------+')
    print(r'|            Stop Words             |')
    print(r'+-----------------------------------+')
    download('stopwords')
    custom = list(stopwords.words('english'))
    for i in unigrams:
        custom.append(i)

    for i in bigrams:
        split_words = i.split(" ")
        for word in split_words:
            custom.append(word)

    print(f'Stop Words: {len(custom)}\n{custom}')
    return custom


#           _____   ______         _                   
#     /\   |_   _| |  ____|       | |                  
#    /  \    | |   | |__ __ _  ___| |_ ___  _ __ _   _ 
#   / /\ \   | |   |  __/ _` |/ __| __/ _ \| '__| | | |
#  / ____ \ _| |_  | | | (_| | (__| || (_) | |  | |_| |
# /_/    \_\_____| |_|  \__,_|\___|\__\___/|_|   \__, |
#                                                 __/ |
#                                                |___/ 


class AIFactory:
    """Algotithms factory!

    We are expecting a model that is better than 54% and the majority class is 1 (TheOnion).
    If the model is not better than 54%, we know the model is not performing well.

    Model 01: Grid Search using 'Count Vectorizer' and 'Logistic Regression'
    Model 02: Grid Search using 'Tfidf Vectorizer' and 'Logistic Regression'
    Model 03: Grid Search using 'Count Vectorizer' and 'Multinomial Naive Bayes'
    Model 04: Grid Search using 'Tfidf Vectorizer' and 'Multinomial Naive Bayes'
    """

    def __init__(self, x: Series, y: Series, stop_words: List):
        self.best_model = None
        self.stop_words = stop_words
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            x, y, random_state=42, stratify=y
        )

    def get_classifier(self) -> Tuple[any, any, any]:
        if self.best_model is not None:
            return self.best_model

        models = [
            self.model_01(),
            self.model_02(),
            self.model_03(),
            self.model_04()
        ]
        self.best_model = sorted(models, key=lambda v: v[2]['best_score'], reverse=True)[0]

        print(f'Best Classifier: {type(self.best_model[0]).__name__}')
        print(f'Best Vectorizer: {type(self.best_model[1]).__name__}')

        return self.best_model

    def model_01(self) -> Tuple[any, any, any]:
        print(f'\n[ LogisticRegression + CountVectorizer ]')
        gs = GridSearchCV(
            Pipeline([
                ('cvec', CountVectorizer()),
                ('lr', LogisticRegression(solver='liblinear'))
            ]),
            param_grid={
                'cvec__stop_words': [None, 'english', self.stop_words],
                'cvec__ngram_range': [(1, 1), (2, 2), (1, 3)],
                'lr__C': [0.01, 1]
            },
            cv=3
        )
        gs.fit(self.X_train, self.y_train)

        gs_score = self.get_gs_score(gs)
        params = gs_score['best_params']

        clf = LogisticRegression(C=params['lr__C'], solver='liblinear')
        vectorizer = CountVectorizer(
            ngram_range=params['cvec__ngram_range'],
            stop_words=self.stop_words
        )
        return clf, vectorizer, gs_score

    def model_02(self) -> Tuple[any, any, any]:
        print(f'\n[ LogisticRegression + TfidfVectorizer ]')
        gs = GridSearchCV(
            Pipeline([
                ('tvect', TfidfVectorizer()),
                ('lr', LogisticRegression(solver='liblinear'))
            ]),
            param_grid={
                'tvect__max_df': [.75, .98, 1.0],
                'tvect__min_df': [2, 3, 5],
                'tvect__ngram_range': [(1, 1), (1, 2), (1, 3)],
                'lr__C': [1]
            },
            cv=3
        )
        gs.fit(self.X_train, self.y_train)

        gs_score = self.get_gs_score(gs)
        params = gs_score['best_params']

        clf = LogisticRegression(C=params['lr__C'], solver='liblinear')
        vectorizer = TfidfVectorizer(
            max_df=params['tvect__max_df'],
            min_df=params['tvect__min_df'],
            ngram_range=params['tvect__ngram_range'],
            stop_words=self.stop_words
        )
        return clf, vectorizer, gs_score

    def model_03(self) -> Tuple[any, any, any]:
        print(f'\n[ MultinomialNB + CountVectorizer ]')
        gs = GridSearchCV(
            Pipeline([
                ('cvec', CountVectorizer()),
                ('nb', MultinomialNB())
            ]),
            param_grid={
                'cvec__stop_words': [None, 'english', self.stop_words],
                'cvec__ngram_range': [(1, 1), (1, 3)],
                'nb__alpha': [.36, .6]
            },
            cv=3
        )
        gs.fit(self.X_train, self.y_train)

        gs_score = self.get_gs_score(gs)
        params = gs_score['best_params']

        clf = MultinomialNB(alpha=params['nb__alpha'])
        vectorizer = CountVectorizer(
            ngram_range=params['cvec__ngram_range'],
            stop_words=self.stop_words
        )
        return clf, vectorizer, gs_score

    def model_04(self) -> Tuple[any, any, any]:
        print(f'\n[ MultinomialNB + TfidfVectorizer ]')
        gs = GridSearchCV(
            Pipeline([
                ('tvect', TfidfVectorizer()),
                ('nb', MultinomialNB())
            ]),
            param_grid={
                'tvect__max_df': [.75, .98],
                'tvect__min_df': [4, 5],
                'tvect__ngram_range': [(1, 2), (1, 3)],
                'nb__alpha': [0.1, 1]
            },
            cv=3
        )
        gs.fit(self.X_train, self.y_train)

        gs_score = self.get_gs_score(gs)
        params = gs_score['best_params']

        clf = MultinomialNB(alpha=params['nb__alpha'])
        vectorizer = TfidfVectorizer(
            max_df=params['tvect__max_df'],
            min_df=params['tvect__min_df'],
            ngram_range=params['tvect__ngram_range'],
            stop_words=self.stop_words
        )
        return clf, vectorizer, gs_score

    def get_gs_score(self, gs: GridSearchCV) -> any:
        score = {
            'best_score': round(gs.best_score_ * 100, 2),
            'best_params': gs.best_params_,
        }

        print(f'Best Score  : {score["best_score"]}%')
        print(f'Train Score : {round(gs.score(self.X_train, self.y_train) * 100, 2)}%')
        print(f'Test Score  : {round(gs.score(self.X_test, self.y_test) * 100, 2)}%')
        print(f'Best Params : {score["best_params"]}\n')
        return score


#           _____   __  __           _      _ _             
#     /\   |_   _| |  \/  |         | |    | (_)            
#    /  \    | |   | \  / | ___   __| | ___| |_ _ __   __ _ 
#   / /\ \   | |   | |\/| |/ _ \ / _` |/ _ \ | | '_ \ / _` |
#  / ____ \ _| |_  | |  | | (_) | (_| |  __/ | | | | | (_| |
# /_/    \_\_____| |_|  |_|\___/ \__,_|\___|_|_|_| |_|\__, |
#                                                      __/ |
#                                                     |___/ 


def get_model(df: DataFrame, stop_words: List) -> Tuple[any, any]:
    df['subreddit'].value_counts(normalize=True)
    x, y = df['title'], df['subreddit']

    factory = AIFactory(x, y, stop_words)
    clf, vectorizer, gs_score = factory.get_classifier()

    X_train, X_test, y_train, y_test = train_test_split(x, y, random_state=42, stratify=y)

    vectorizer.fit(X_train)

    Xcvec_train = vectorizer.transform(X_train)
    Xcvec_test = vectorizer.transform(X_test)

    clf.fit(Xcvec_train, y_train)
    show_details(clf, vectorizer, gs_score, Xcvec_train, y_train, Xcvec_test, y_test, clf.predict(Xcvec_test))
    return clf, vectorizer


def show_details(clf, vectorizer, gs_score, Xcvec_train, y_train, Xcvec_test, y_test, preds) -> None:
    cnf_matrix = metrics.confusion_matrix(y_test, preds)
    tn_fp, fn_tp = np.array(cnf_matrix).tolist()
    tn, fp = tn_fp
    fn, tp = fn_tp

    print(f'Classifier               : {type(clf).__name__}')
    print(f'Vectorizer               : {type(vectorizer).__name__}')
    print(f'Best Params              : {gs_score["best_params"]}')
    print(f'Best Score (Grid Search) : {gs_score["best_score"]}%')
    print(f'Train Score              : {round(clf.score(Xcvec_train, y_train) * 100, 2)}%')
    print(f'Test Score               : {round(clf.score(Xcvec_test, y_test) * 100, 2)}%')
    print(f'Accuracy                 : {round(metrics.accuracy_score(y_test, preds) * 100, 2)}%')
    print(f'Precision                : {round(metrics.precision_score(y_test, preds) * 100, 2)}%')
    print(f'Recall                   : {round(metrics.recall_score(y_test, preds) * 100, 2)}%')
    print(f'Specificity              : {round((tn / (tn + fp)) * 100, 2)}%')
    print(f'Misclassification Rate   : {round((fp + fn) / (tn + fp + fn + tn) * 100, 2)}%')
    print(f'Confusion Matrix\n{DataFrame(cnf_matrix).head()}\n')


#           _____   _____                             
#     /\   |_   _| |  __ \                            
#    /  \    | |   | |__) | __ ___   ___ ___  ___ ___ 
#   / /\ \   | |   |  ___/ '__/ _ \ / __/ _ \/ __/ __|
#  / ____ \ _| |_  | |   | | | (_) | (_|  __/\__ \__ \
# /_/    \_\_____| |_|   |_|  \___/ \___\___||___/___/
#


class Process:

    def __init__(self, clf: any, vectorizer: any):
        self.clf = clf
        self.vectorizer = vectorizer

    def run(self, sentence: str) -> any:

        print(f'Sentence => "{sentence}"')
        data = DataFrame({'title': [sentence]})

        data_cvec = self.vectorizer.transform(data['title'])
        preds_prob = self.clf.predict_proba(data_cvec)

        fake = '{0:.2f}'.format(preds_prob[0][0])
        not_fake = '{0:.2f}'.format(preds_prob[0][1])
        print(f'Prob. to be Fake "{fake}" / Not Fake "{not_fake}"')

        return not_fake


In [5]:
#  __  __       _       
# |  \/  |     (_)      
# | \  / | __ _ _ _ __  
# | |\/| |/ _` | | '_ \ 
# | |  | | (_| | | | | |
# |_|  |_|\__,_|_|_| |_|
#
THE_ONION = f'\033[1;32;40m[r/The Onion]\033[0m'
NOT_THE_ONION = f'\033[1;31;40m[r/Not The Onion]\033[0m'
AI = f'\033[1;35;40m[AI]\033[0m'
ME = f'\033[1;36;40m[ME]\033[0m'

# Not The Onion
print(f'\n{NOT_THE_ONION}')
df_not_onion: DataFrame = read('/content/drive/My Drive/02 - IGTI/TCC/POC/not-the-onion.csv')
clean_data(df_not_onion)
show_statistics(df_not_onion)

# The Onion
print(f'\n{THE_ONION}')
df_onion: DataFrame = read('/content/drive/My Drive/02 - IGTI/TCC/POC/the-onion.csv')
clean_data(df_onion)
show_statistics(df_onion)

# Combine df_onion & df_not_onion with only 'subreddit' (target) and 'title' (predictor) columns
print(f'\n{THE_ONION} {NOT_THE_ONION} {"[Natural Language Processing]"}')

df = pd.concat([df_onion[['subreddit', 'title']], df_not_onion[['subreddit', 'title']]], axis=0)
print(f'Combined DF shape: {df.shape}\n')
print(f'Combined DF Sample...\n{df.head(2)}\n...\n{df.tail(2)}\n\n')

df = df.reset_index(drop=True)  # Reset the index
df["subreddit"] = df["subreddit"].map({"nottheonion": 0, "TheOnion": 1})
print(f'Prepared DF Sample...\n{df.head(2)}\n...\n{df.tail(2)}')

# Count Vectorize - ngram_range = (1,1)
print(f'\n{THE_ONION}')
onion_cvec_df: DataFrame = count_vectorizer(df, filter_value=1)

print(f'\n{NOT_THE_ONION}')
not_onion_cvec_df: DataFrame = count_vectorizer(df, filter_value=0)

# Unigrams
print(f'\n{THE_ONION} {NOT_THE_ONION}')
common_unigrams = list(unigrams(onion_cvec_df, not_onion_cvec_df))

# Count Vectorize - ngram_range = (2,2)
print(f'\n{THE_ONION}')
onion_cvec_df: DataFrame = count_vectorizer(df, filter_value=1, ngram_range=(2, 2))

print(f'\n{NOT_THE_ONION}')
not_onion_cvec_df: DataFrame = count_vectorizer(df, filter_value=0, ngram_range=(2, 2))

# Bigrams
print(f'\n{THE_ONION} {NOT_THE_ONION}')
common_bigrams = list(unigrams(onion_cvec_df, not_onion_cvec_df))

# Stop Words
# ------------------
# Take out {'man', 'new', 'old', 'people', 'say', 'trump', 'woman', 'year'}
# from dataset when modeling, since these words occur frequently in both subreddits.
print(f'\n{THE_ONION} {NOT_THE_ONION}')
custom = get_stop_words(common_unigrams, common_bigrams)

print(f'\n{THE_ONION} {NOT_THE_ONION}')
clf, vectorizer = get_model(df, custom)

sentences = [
    'San Diego backyard shed rents for $1,050 a month',
    'Are You The Whistleblower? Trump Boys Ask White House Janitor After Giving Him Serum Of All The Sodas Mixed Together',
    'Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aenean at diam ac orci pharetra scelerisque non sit amet turpis. Donec quis erat quam',
    '12356487984158641351568463213851684132168461'
]

process = Process(clf, vectorizer)
for sentence in sentences:
    print(f'\n{ME}')
    process.run(sentence)



[1;31;40m[r/Not The Onion][0m
+-----------------------------------+
|           Reading File            |
+-----------------------------------+

Reading file: "/content/drive/My Drive/02 - IGTI/TCC/POC/not-the-onion.csv"
Shape: (15000, 8)
Sample...
   Unnamed: 0  ...                                              title
0           0  ...   San Diego backyard shed rents for $1,050 a month
1           1  ...  Orioles players send handwritten thank you not...

[2 rows x 8 columns]
...
       Unnamed: 0  ...                                              title
14998       14998  ...  Iowa Official Ousted After Bombarding Staffers...
14999       14999  ...  City hopes 'Baby Shark' song will drive homele...

[2 rows x 8 columns]

+-----------------------------------+
|        Data Frame Clean Up        |
+-----------------------------------+
Old shape: (15000, 8)
New shape: (11830, 8)
+-----------------------------------+
|       Data Frame Statistics       |
+--------------------------------