In [5]:
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.feature import StopWordsRemover, Tokenizer, Word2Vec
from pyspark.sql import SparkSession
from pyspark.sql.functions import lower, when
import json

In [6]:
# spark = SparkSession.builder.master("spark://localhost:7077").appName("News Sentiment analysis").getOrCreate()
spark = SparkSession.builder.master("local[*]").appName("News Sentiment analysis").getOrCreate()
spark

## Load data from HDFS

In [7]:
hdfsPath = "hdfs://namenode:9000/user/spark/news_data_articles.txt"
df = spark.read.json(hdfsPath)

In [8]:
df.show()

+-------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
| author|             content|         description|       published_at|              source|               title|                 url|        url_to_image|
+-------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|bbcnews|Thirty years afte...|The BBC's Fergal ...|2023-12-03 22:21:04|[bbc-news, BBC News]|South Africa: The...|https://www.bbc.c...|https://ichef.bbc...|
|bbcnews|South Florida. A ...|The Grand Theft A...|2023-12-10 00:56:54|[bbc-news, BBC News]|Grand Theft Auto ...|https://www.bbc.c...|https://ichef.bbc...|
|bbcnews|The families of t...|Police in Vermont...|2023-11-27 00:31:15|[bbc-news, BBC News]|Vermont: Three Pa...|https://www.bbc.c...|https://ichef.bbc...|
|bbcnews|Homosexuality sho...|Cardinal Peter Tu...|2023-11-27 12

In [9]:
df.printSchema()

root
 |-- author: string (nullable = true)
 |-- content: string (nullable = true)
 |-- description: string (nullable = true)
 |-- published_at: timestamp (nullable = true)
 |-- source: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)
 |-- url_to_image: string (nullable = true)



# Data Preparation

In [10]:
labeled_data = df.withColumn("label", when(lower(df["description"]).contains("crime") |
                                            lower(df["description"]).contains("murder") |
                                            lower(df["description"]).contains("robbery"), 1).otherwise(0))
labeled_data.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1|   65|
|    0|  546|
+-----+-----+



### Define a function to convert text to lowercase and tokenize it

In [11]:
def tokenize_text(df):
    tokenizer = Tokenizer(inputCol="description", outputCol="words")
    return tokenizer.transform(df)

In [12]:
tokenizer = Tokenizer(inputCol="description", outputCol="words")
tokenized_data = tokenize_text(labeled_data)
tokenized_data.show()

+-------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+
| author|             content|         description|       published_at|              source|               title|                 url|        url_to_image|label|               words|
+-------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+
|bbcnews|Thirty years afte...|The BBC's Fergal ...|2023-12-03 22:21:04|[bbc-news, BBC News]|South Africa: The...|https://www.bbc.c...|https://ichef.bbc...|    0|[the, bbc's, ferg...|
|bbcnews|South Florida. A ...|The Grand Theft A...|2023-12-10 00:56:54|[bbc-news, BBC News]|Grand Theft Auto ...|https://www.bbc.c...|https://ichef.bbc...|    0|[the, grand, thef...|
|bbcnews|The families of t...|Police in Vermont...|2023-11-27 00:31:15|[bbc-news, BBC

### Define the StopWordsRemover and Create the "filteredWords" column

In [13]:
remover = StopWordsRemover(inputCol="words", outputCol="filteredWords")
filtered_data = remover.transform(tokenized_data)
filtered_data.show()

+-------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+
| author|             content|         description|       published_at|              source|               title|                 url|        url_to_image|label|               words|       filteredWords|
+-------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+
|bbcnews|Thirty years afte...|The BBC's Fergal ...|2023-12-03 22:21:04|[bbc-news, BBC News]|South Africa: The...|https://www.bbc.c...|https://ichef.bbc...|    0|[the, bbc's, ferg...|[bbc's, fergal, k...|
|bbcnews|South Florida. A ...|The Grand Theft A...|2023-12-10 00:56:54|[bbc-news, BBC News]|Grand Theft Auto ...|https://www.bbc.c...|https://ichef.bbc...|    0|[the, grand, thef...|[g

### Create a Word2Vec model

In [14]:
word2Vec = Word2Vec(inputCol="filteredWords", outputCol="features", vectorSize=100, minCount=0)

# Model Training

### Create a RandomForestClassifier

In [15]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10, rawPredictionCol="rawPrediction")


### Create a pipeline with the stages

In [16]:
pipeline = Pipeline(stages=[tokenizer, remover, word2Vec, rf])

paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [20, 50]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

# Create BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator() \
    .setLabelCol("label") \
    .setRawPredictionCol("prediction") \
    .setMetricName("areaUnderROC")

# Create CrossValidator
cv = CrossValidator() \
    .setEstimator(pipeline) \
    .setEvaluator(evaluator) \
    .setEstimatorParamMaps(paramGrid) \
    .setNumFolds(5)



### Split the data into training and testing sets

In [17]:
training_data, test_data = labeled_data.randomSplit([0.7, 0.3], seed=12345)

### Train the model

In [18]:
cvModel = cv.fit(training_data)

23/12/22 00:20:02 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/12/22 00:20:02 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


## Make predictions on the test set

In [19]:
predictions = cvModel.transform(test_data)

## Evaluate the model

In [20]:
accuracy =  evaluator.evaluate(predictions)
accuracy

0.7380952380952381

In [21]:
spark.stop()