In [1]:
#### install java & pyspark - You just have to do it one time, so you can skip that

!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!pip install pyspark -q

[31mERROR: Operation cancelled by user[0m[31m
[0m

In [4]:
#### environment setup

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"

In [5]:
#### Spark Session Start

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FakeNewsStreamingSim").getOrCreate()

In [6]:
#### Mounting Google Drive (see phase 1 notebook)

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [7]:
#### setting paths

project_path = "/content/drive/MyDrive/fake_news_project"
model_path = f"{project_path}/best_model" ## Here is where is the model created on phase 1
stream_file = f"{project_path}/stream1.csv" ## csv used for streaming simulation
print("Model:", model_path)
print("Simulated stream:", stream_file)

Model: /content/drive/MyDrive/fake_news_project/best_model
Simulated stream: /content/drive/MyDrive/fake_news_project/stream1.csv


In [8]:
#### Load trained and tested (phase 1) model

from pyspark.ml.pipeline import PipelineModel
model = PipelineModel.load(model_path)
print("Model succesfully loaded!")

Model succesfully loaded!


In [9]:
#### stream reading simulation (csv file with new messages)

df_stream = spark.read.csv(stream_file, header=True, inferSchema=True).na.drop()
df_stream.show(5)

+--------------------+
|                text|
+--------------------+
|Government confir...|
|BREAKING: Aliens ...|
|Study shows coffe...|
|President announc...|
|Scientists discov...|
+--------------------+
only showing top 5 rows



In [10]:
#### predicting new messages

predictions = model.transform(df_stream)
predictions.select("text", "prediction").show(10)

+--------------------+----------+
|                text|prediction|
+--------------------+----------+
|Government confir...|       0.0|
|BREAKING: Aliens ...|       0.0|
|Study shows coffe...|       0.0|
|President announc...|       0.0|
|Scientists discov...|       0.0|
|NASA finds water ...|       0.0|
|Click here to win...|       0.0|
|World leaders mee...|       0.0|
|This one weird tr...|       0.0|
|Experts warn abou...|       0.0|
+--------------------+----------+



In [11]:
#### saving results on a persistent file

output_path = f"{project_path}/stream_results.csv"
predictions.select("text", "prediction").write.mode("overwrite").option("header", True).csv(output_path)
print(f"Saved results on: {output_path}")

Saved results on: /content/drive/MyDrive/fake_news_project/stream_results.csv


In [12]:
#### Register as temporary table for SQL queries and execute one query

predictions.createOrReplaceTempView("stream_results")
spark.sql("SELECT prediction, COUNT(*) as total FROM stream_results GROUP BY prediction").show()

+----------+-----+
|prediction|total|
+----------+-----+
|       0.0|   10|
+----------+-----+

