In [12]:
# Common Packages

import os
import sys
import argparse
import warnings
from contextlib import suppress
import traceback
import logging
from glob import glob

# Pyspark Packages
from pyspark.sql import SparkSession, Window
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import functions as F, DataFrameReader
from pyspark.sql.types import (
    StringType,
    ArrayType,
    IntegerType,
    FloatType,
    StructType,
    StructField,
    BooleanType,
    MapType
)

from pyspark.ml.feature import Word2Vec, Tokenizer
from pyspark.ml.pipeline import Pipeline

# NLTK
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from string import punctuation

sw_indo = stopwords.words("indonesian") + list(punctuation)

In [3]:
def config():

    conf = SparkConf()
    # COMMON CONFIGURATION
    conf.set("spark.sql.caseSensitive", "true")
    conf.set("spark.debug.maxToStringFields", 200)
    conf.set("spark.sql.files.ignoreCorruptFiles", "true")
    conf.set('spark.sql.session.timeZone', 'Asia/Jakarta')

    return conf

In [28]:
def read_dataframe(folder_name):
    fake_news, true_news = folder_name
    fake_df = spark.read.csv(fake_news, header=True).withColumn("target", F.lit(0))
    real_df = spark.read.csv(fake_news, header=True).withColumn("target", F.lit(1))
    
    df = real_df \
        .union(fake_df) \
        .withColumn("metadata", F.concat(F.col("title"), F.lit(". "), F.col("text"))) \
        .dropna(subset="metadata")
    return df

In [29]:
def text_preprocessing(text: str) -> str:

    text = text.lower()
    text = " ".join([word for word in word_tokenize(text) if word not in sw_indo])
    return text

def dataframe_cleaning(df):
    return (df \
                .withColumn("teks", 
                    F.udf(lambda text: text_preprocessing(text), StringType())(F.col("metadata"))))

In [31]:
if __name__ == "__main__":

    try:
        
        conf = config()
        spark = SparkSession \
            .builder \
            .config(conf=conf) \
            .appName("Real Fake News Classifier") \
            .getOrCreate()
        sc = spark.sparkContext
        df = read_dataframe(glob("./datasets/fake-and-real-news-dataset/*.csv"))
        df = dataframe_cleaning(df)
        df = df.orderBy(F.rand(seed=42))
        fractions = df.select("target").distinct().withColumn("fraction", F.lit(0.8)).rdd.collectAsMap()
        train = df.stat.sampleBy("target", fractions, 42)
        test = df.subtract(train)
        train.show(vertical=True)
#         tokenizer = Tokenizer(inputCol="metadata", outputCol="word_tokenize")
#         word2Vec = Word2Vec(
#             vectorSize=200, 
#             seed=42, 
#             inputCol="word_tokenize", 
#             outputCol="word_vector"
#         )
        
#         model = Pipeline(stages=[tokenizer, word2Vec])
#         model = model.fit(train)
        
#         train = model.transform(train)
    except Exception as e:

        logging.error(traceback.format_exc())
        raise e

KeyboardInterrupt: 