In [1]:
# Import the findspark module 
import findspark

# Initialize via the full spark path
findspark.init("/usr/local/spark/")

In [2]:
# Import the SparkSession and SQLContext modules
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

# Build the SparkSession
spark = SparkSession.builder \
   .master("local") \
   .appName("NLP Homework") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
   
# Main entry point for Spark functionality. 
sc = spark.sparkContext

# The entry point for working with structured data (rows and columns) in Spark
sqlContext = SQLContext(sc)

In [3]:
# Install a pip package in the current Jupyter kernel
import sys
!{sys.executable} -m pip install nltk --no-cache-dir



In [4]:
# Install a pip package in the current Jupyter kernel
import sys
!{sys.executable} -m pip install langid --no-cache-dir



In [5]:
# Download averaged_perceptron_tagger from the nltk
import nltk
nltk.download('averaged_perceptron_tagger')

[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /home/jovyan/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


True

In [6]:
# Download en averaged_perceptron_tagger from the nltk
import nltk
nltk.download('averaged_perceptron_tagger_eng')

[nltk_data] Downloading package averaged_perceptron_tagger_eng to
[nltk_data]     /home/jovyan/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger_eng is already up-to-
[nltk_data]       date!


True

In [7]:
# Download stopwords from the nltk
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /home/jovyan/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [8]:
# Download wordnet from the nltk
import nltk
nltk.download('wordnet')

[nltk_data] Downloading package wordnet to /home/jovyan/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

In [9]:
# Import the udf, StringType, and pp modules
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import preproc as pp
import csv

# Invoke the user-defined function `pp.check_lang` within the Spark UDF.
# Refer to https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.functions.udf.html
# `pp.check_lang` is used to classify the language of our input text
check_lang_udf = udf(pp.check_lang, StringType())

# Invoke the user-defined function `pp.remove_stops` within the Spark UDF.
# Stop words usually refer to the most common words in a language, there is no single universal list of stop words used
# by all natural language processing tools.
# Reduces Dimensionality
# removes stop words of a single Tweets (cleaned_str/row/document)
remove_stops_udf = udf(pp.remove_stops, StringType())

# Invoke the user-defined function `pp.remove_features` within the Spark UDF.
# catch-all to remove other 'words' that I felt didn't add a lot of value
# Reduces Dimensionality, gets rid of a lot of unique urls
remove_features_udf = udf(pp.remove_features, StringType())

# Invoke the user-defined function `pp.tag_and_remove` within the Spark UDF.
# Process of classifying words into their parts of speech and labeling them accordingly is known as part-of-speech
# tagging, POS-tagging, or simply tagging. Parts of speech are also known as word classes or lexical categories. The
# collection of tags used for a particular task is known as a tagset. Our emphasis in this chapter is on exploiting
# tags, and tagging text automatically.
# http://www.nltk.org/book/ch05.html
tag_and_remove_udf = udf(pp.tag_and_remove, StringType())

# Invoke the user-defined function `pp.lemmatize` within the Spark UDF.
# Tweets are going to use different forms of a word, such as organize, organizes, and
# organizing. Additionally, there are families of derivationally related words with similar meanings, such as democracy,
# democratic, and democratization. In many situations, it seems as if it would be useful for a search for one of these
# words to return documents that contain another word in the set.
# Reduces Dimensionality and boosts numerical measures like TFIDF
# http://nlp.stanford.edu/IR-book/html/htmledition/stemming-and-lemmatization-1.html
# lemmatization of a single Tweets (cleaned_str/row/document)
lemmatize_udf = udf(pp.lemmatize, StringType())

# Invoke the user-defined function `pp.check_blanks` within the Spark UDF.
# check to see if a row only contains whitespace
check_blanks_udf = udf(pp.check_blanks, StringType())

In [10]:
# Read a text file from HDFS
data_rdd = sc.textFile("./data/fake_or_real_news.csv")

# Use a lambda function to split lines on tabs
parts_rdd = data_rdd.map(lambda line: next(csv.reader([line], delimiter=',')))
parts_rdd.take(5)

[['8476', 'You Can Smell Hillary’s Fear', '1'],
 ['10294',
  'Watch The Exact Moment Paul Ryan Committed Political Suicide At A Trump Rally (VIDEO)',
  '1'],
 ['3608', 'Kerry to go to Paris in gesture of sympathy', '0'],
 ['10142',
  "Bernie supporters on Twitter erupt in anger against the DNC: 'We tried to warn you!'",
  '1'],
 ['875', 'The Battle of New York: Why This Primary Matters', '0']]

In [11]:
columns = ["id", "headline", "label"]
df = parts_rdd.map(lambda c: (int(c[0]), c[1], int(c[2]))).toDF(columns)
df.show(n=2)

+-----+--------------------+-----+
|   id|            headline|label|
+-----+--------------------+-----+
| 8476|You Can Smell Hil...|    1|
|10294|Watch The Exact M...|    1|
+-----+--------------------+-----+
only showing top 2 rows



In [12]:
from pyspark.sql.functions import col

# Appling preprocessing functions from the lab2
df = (df.withColumn("headline", remove_stops_udf(col("headline")))        # Remove stopwords
       .withColumn("headline", remove_features_udf(col("headline")))     # Remove unnecessary features
       .withColumn("headline", tag_and_remove_udf(col("headline")))      # POS tagging and filtering
       .withColumn("headline", lemmatize_udf(col("headline")))           # Lemmatization
       .withColumn("is_blank", check_blanks_udf(col("headline")))        # Check for blank rows
       .filter(col("is_blank") != "true")                                # Filter out blank headlines
)

final_df = df.select("headline", "label")
dedup_df = final_df.dropDuplicates(["headline", "label"])

In [13]:
# Randomly splits this :class:`DataFrame` with the provided weights.
splits = dedup_df.randomSplit([0.6, 0.4])
training_df = splits[0]
test_df= splits[1]

In [14]:
# Take the first num elements of the RDD.
training_df.show(5)
test_df.show(5)

+--------------------+-----+
|            headline|label|
+--------------------+-----+
|buffett run raise...|    0|
|hillary melt wein...|    1|
|assange donald tr...|    1|
|be long overdue p...|    1|
|historic climate ...|    0|
+--------------------+-----+
only showing top 5 rows

+--------------------+-----+
|            headline|label|
+--------------------+-----+
|          tehran usa|    1|
|funny republican ...|    0|
|putin congratulat...|    1|
|justice clarence ...|    1|
|latest patriot ac...|    0|
+--------------------+-----+
only showing top 5 rows



In [15]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes

# tokenizer converts the input string to lowercase and then splits it by white spaces.
tokenizer = Tokenizer(inputCol="headline", outputCol="words")

# Converting the words into term frequency vectors
hashingTF = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=1000)

# Computing the inverse document frequency (IDF) to downweight common words
idf = IDF(inputCol="raw_features", outputCol="features")

#Introducing Naive Bayes
nb = NaiveBayes(featuresCol="features", labelCol="label")

# Pipeline to chain these steps
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, nb])

# Fit and transform the training data to extract features
model = pipeline.fit(training_df)
featurized_df = model.transform(training_df)
featurized_df.show(5)


+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|            headline|label|               words|        raw_features|            features|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|buffett run raise...|    0|[buffett, run, ra...|(1000,[253,266,52...|(1000,[253,266,52...|[-89.906244871585...|[0.99998739403789...|       0.0|
|hillary melt wein...|    1|[hillary, melt, w...|(1000,[214,314,38...|(1000,[214,314,38...|[-177.40078286069...|[1.91374778094053...|       1.0|
|assange donald tr...|    1|[assange, donald,...|(1000,[70,115,366...|(1000,[70,115,366...|[-219.05588645695...|[3.43553588986293...|       1.0|
|be long overdue p...|    1|[be, long, overdu...|(1000,[638,844,85...|(1000,[638,844,85...|[-120.13540184204...|[2.53078145030877.

In [16]:
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Parameter grid for cross-validation
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(nb.smoothing, [0.0, 1.0]) \
    .build()

# Cross-validation
cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=paramGrid,
                    evaluator=MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction"),
                    numFolds=3,
                    seed=42)  # Set seed for cross-validation reproducibility


In [18]:
# Fiting the model using cross-validation on the training data
cvModel = cv.fit(training_df)

# Making predictions on the test set
predictions = cvModel.transform(test_df)
predictions.select("headline", "label", "prediction").show(10)

# Evaluating the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Accuracy evaluation
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
print(f"Accuracy = {accuracy:.4f}")

# F1-score evaluation
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
print(f"F1-Score = {f1:.4f}")

+--------------------+-----+----------+
|            headline|label|prediction|
+--------------------+-----+----------+
|          tehran usa|    1|       1.0|
|funny republican ...|    0|       1.0|
|putin congratulat...|    1|       1.0|
|justice clarence ...|    1|       0.0|
|latest patriot ac...|    0|       1.0|
|donald weak deleg...|    0|       0.0|
|effect substance ...|    1|       0.0|
|dems sue gop trum...|    1|       1.0|
|don blame immigra...|    0|       0.0|
|john kerry isi re...|    0|       0.0|
+--------------------+-----+----------+
only showing top 10 rows

Accuracy = 0.4956
F1-Score = 0.4972
