In [1]:
import threading

# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [2]:
sc

In [3]:
spark

In [4]:
import random
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import udf, struct, array, col, lit, lower, regexp_replace, when
from pyspark.sql.types import StringType
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, SQLTransformer, IDF
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import StandardScaler
from pyspark.ml.classification import LinearSVC

Load in data

In [5]:
df = spark.read.json('C:/Users/lenne/anaconda3/envs/AA/Advanced_Analytics/Assignment_3/spark/scripts/data_full.json')

# Drop url column
df = df.drop("url", "posted_at", "domain", "user")

# Convert True to 1 and False to 0 in the "frontpage" column
df = df.withColumn("label", when(df["frontpage"] == True, 1).otherwise(0))

# Drop the original "frontpage" column
df = df.drop("frontpage")

df.show(5)

+--------+--------+--------------------+--------------------+--------------------+-----+-----+
|     aid|comments|         source_text|        source_title|               title|votes|label|
+--------+--------+--------------------+--------------------+--------------------+-----+-----+
|39958086|       0|Large Hadron Coll...|Large Hadron Coll...|Large Hadron Coll...|    1|    0|
|39958094|       0|Web Mash\n\n<\---...|            Web Mash|An editor for mak...|    1|    0|
|39958109|       0|Blocked\n\n# whoa...|             Blocked|You shouldn't hos...|    1|    0|
|39958127|       0|Isaac Asimov obit...|Isaac Asimov obit...|Isaac Asimov obit...|    1|    0|
|39958129|       0|Building Computin...|Building Computin...|Do people general...|    1|    0|
+--------+--------+--------------------+--------------------+--------------------+-----+-----+
only showing top 5 rows



In [6]:
num_rows = df.count()
num_cols = len(df.columns)
print("Shape of DataFrame: {} rows, {} columns".format(num_rows, num_cols))

Shape of DataFrame: 5747 rows, 7 columns


In [7]:
print("Column Names:")
for col in df.columns:
    print(col)

Column Names:
aid
comments
source_text
source_title
title
votes
label


Clean the data

In [8]:
from pyspark.sql.functions import col
from pyspark.sql import functions as F

text_cols = ["title", "source_title", "source_text"]

In [9]:
# Filter out rows where column has an empty string
for column in text_cols:
    df = df.filter(F.col(column) != '')

# Drop duplicates 
df = df.dropDuplicates(['aid'])
df = df.dropDuplicates(['source_text'])
df = df.dropDuplicates(['title'])

In [10]:
df.count()

4876

In [11]:
df.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1|  815|
|    0| 4061|
+-----+-----+



In [12]:
# Apply lowercase transformation to specified columns
for column in text_cols:
    df = df.withColumn(column, F.lower(F.col(column)))

# Remove newline characters (\n), hashtags (#) and double spaces
#df = df.withColumn("source_text", regexp_replace(regexp_replace(regexp_replace("source_text", "\\n", " "), "#", ""), "\\s+", " "))

In [13]:
df.show(5)

+--------+--------+--------------------+--------------------+--------------------+-----+-----+
|     aid|comments|         source_text|        source_title|               title|votes|label|
+--------+--------+--------------------+--------------------+--------------------+-----+-----+
|40017804|       0|“highly capable” ...|“highly capable” ...|"highly capable" ...|    4|    1|
|40048863|       0|"open source" sta...|"open source" sta...|"open source" sta...|    1|    0|
|40022702|       0|www.cbc.ca\n\n# t...|          www.cbc.ca|'flâneuse' honour...|    1|    0|
|40077919|       0|'human-induced' c...|'human-induced' c...|'human-induced' c...|    3|    1|
|40077522|       0|‘i’ve got a bridg...|‘i’ve got a bridg...|'i've got a bridg...|    1|    0|
+--------+--------+--------------------+--------------------+--------------------+-----+-----+
only showing top 5 rows



Create a balanced datasset (class-balanced sampling - undersampling of the majority class)

In [14]:
from pyspark.sql.functions import col

In [15]:
n = 800
seed = 42

fractions = df.groupBy("label").count().withColumn("required_n", n/col("count"))\
                .drop("count").rdd.collectAsMap()

df_balanced = df.stat.sampleBy("label", fractions, seed)
df_balanced.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1|  802|
|    0|  820|
+-----+-----+



In [16]:
(trainingData, testData) = df_balanced.randomSplit([0.7, 0.3], seed = 42)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 1145
Test Dataset Count: 467


Pipeline

In [17]:
def create_preprocessing_pipeline(inputCol):
    # Regular expression tokenizer: Tokenizes input text into words using a regex pattern
    regexTokenizer = RegexTokenizer(inputCol=inputCol, outputCol=inputCol + "_words", pattern="\\W")
    
    # Stop words: Loads default English stop words and removes them from tokenized words
    stops = StopWordsRemover.loadDefaultStopWords('english')
    stopwordsRemover = StopWordsRemover(inputCol=regexTokenizer.getOutputCol(), outputCol=inputCol + "_filtered", stopWords=stops)
    
    # Bag of words count: Converts tokenized words into a numerical feature vector
    countVectors = CountVectorizer(inputCol=stopwordsRemover.getOutputCol(), outputCol=inputCol + "_rawFeatures", vocabSize=10000, minDF=10)
    
    # IDF (Inverse Document Frequency): Calculates the Inverse Document Frequency of words
    idf = IDF(inputCol=countVectors.getOutputCol(), outputCol=inputCol + "_features", minDocFreq=10)
    
    return [regexTokenizer, stopwordsRemover, countVectors, idf]

# Create preprocessing pipelines for input columns
preprocessing_pipelines = {}
for col_name in ["source_text", "source_title", "title"]:
    preprocessing_pipelines[col_name] = create_preprocessing_pipeline(col_name)

# Combine preprocessing pipelines
all_columns_preprocessing = []
for pipeline in preprocessing_pipelines.values():
    all_columns_preprocessing.extend(pipeline)
    
# Index categorical columns
#indexers = [
#    StringIndexer(inputCol="domain", outputCol="domain_indexed"),
#    StringIndexer(inputCol="user", outputCol="user_indexed")
#]

# Use VectorAssembler to combine the feature columns into a single column
assembler = VectorAssembler(
    inputCols=[col_name + "_features" for col_name in text_cols] + ["comments", "votes"], #"domain_indexed", "user_indexed",
    outputCol="features"
)
    
# @maxIter: max nr of iterations; @regParam: Regularization parameter; @elasticNetParam: 0 for L2, 1 for L1 penalty)
lsvc = LinearSVC(maxIter=10, regParam=0.1)

# Append model to the preprocessing pipelines to create the final pipeline
final_pipeline = Pipeline(stages=all_columns_preprocessing + [assembler, lsvc])

In [18]:
paramGrid = ParamGridBuilder().addGrid(lsvc.regParam, [0.3, 0.1, 0.05]).build()

In [19]:
# Define a binary classification evaluator
evaluation = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

In [20]:
crossval = CrossValidator(estimator=final_pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluation,
                          numFolds=3)

In [21]:
# Fit the model and find the best set of parameters
model = crossval.fit(trainingData)

In [22]:
best = model.bestModel
print(best.stages)

[RegexTokenizer_720fe6fb98dc, StopWordsRemover_d5ec88f93c02, CountVectorizerModel: uid=CountVectorizer_960792bc84c3, vocabularySize=9300, IDFModel: uid=IDF_bf11e17d32d2, numDocs=1145, numFeatures=9300, RegexTokenizer_ceee4f6222da, StopWordsRemover_d69fc0255db2, CountVectorizerModel: uid=CountVectorizer_b6c6618d0b1a, vocabularySize=52, IDFModel: uid=IDF_1bec5b7d4e8f, numDocs=1145, numFeatures=52, RegexTokenizer_7b74e54b9a37, StopWordsRemover_82b3a4bfed24, CountVectorizerModel: uid=CountVectorizer_8dc8aaf32e52, vocabularySize=53, IDFModel: uid=IDF_050d68c74334, numDocs=1145, numFeatures=53, VectorAssembler_851f48e6ae10, LinearSVCModel: uid=LinearSVC_f70111964df8, numClasses=2, numFeatures=9407]


In [23]:
print("Optimal hyperparameters:", best.stages[-1].extractParamMap())

Optimal hyperparameters: {Param(parent='LinearSVC_f70111964df8', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2, Param(parent='LinearSVC_f70111964df8', name='featuresCol', doc='features column name.'): 'features', Param(parent='LinearSVC_f70111964df8', name='fitIntercept', doc='whether to fit an intercept term.'): True, Param(parent='LinearSVC_f70111964df8', name='labelCol', doc='label column name.'): 'label', Param(parent='LinearSVC_f70111964df8', name='maxBlockSizeInMB', doc='maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. Default 0.0 represents choosing optimal value, depends on specific algorithm. Must be >= 0.'): 0.0, Param(parent='LinearSVC_f70111964df8', name='maxIter', doc='max number of iterations (>= 0).'): 10, Param(parent='LinearSVC_f70111964df8', name='predictionCol', doc='prediction column name.'): 'prediction', P

Obtain predictions for the test data

In [24]:
prediction = model.transform(testData)
prediction.columns

['aid',
 'comments',
 'source_text',
 'source_title',
 'title',
 'votes',
 'label',
 'source_text_words',
 'source_text_filtered',
 'source_text_rawFeatures',
 'source_text_features',
 'source_title_words',
 'source_title_filtered',
 'source_title_rawFeatures',
 'source_title_features',
 'title_words',
 'title_filtered',
 'title_rawFeatures',
 'title_features',
 'features',
 'rawPrediction',
 'prediction']

Evaluate the model predictions

In [25]:
eva = evaluation.evaluate(prediction)
eva

0.6092769891053152

In [26]:
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(prediction)

0.6092769891053152

In [27]:
accuracy = prediction.filter(prediction.label == prediction.prediction).count() / float(testData.count())
accuracy

0.5845824411134903

Save the model locally to access later

In [28]:
#model.write().overwrite().save('C:/Users/lenne/anaconda3/envs/AA/Advanced_Analytics/Assignment_3/spark/models')