In [None]:
import html

from IPython.display import display, clear_output

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.streaming import DataStreamReader
from pyspark.sql import functions as f
from pyspark.ml import PipelineModel
from pyspark.sql.functions import udf

# SETTINGS
IN_PATH = "/home/jovyan/data-sets/twitter/"
MODEL_PATH = "/home/jovyan/data-sets/sentiment-140-training-data/MODEL"
timestampformat = "EEE MMM dd HH:mm:ss zzzz yyyy"

spark = SparkSession.builder.appName("StreamingSentiment").getOrCreate()
schema = spark.read.json(IN_PATH).limit(10).schema


### Reload our Pre-Trained Model 
Using ML Persistence, we simply reload the pre-trained model that we stored from before.

In [None]:
sentiment_model = PipelineModel.load(MODEL_PATH)

### SparkReader
Select between DataStreamReader or DataReader instances

In [None]:
# static spark reader
# spark_reader = spark.read.schema(schema)

# streaming spark reader
spark_reader = spark.readStream.schema(schema)


### Data Cleaning
Bring back our cleansing function from earlier (slightly extended with our findings from the modelling step)

In [None]:
@udf
def html_unescape(s: str):
    if isinstance(s, str):
        return html.unescape(s)
    return s


def clean_data(df: DataFrame):
    url_regex = r"((https?|ftp|file):\/{2,3})+([-\w+&@#/%=~|$?!:,.]*)|(www.)+([-\w+&@#/%=~|$?!:,.]*)"
    email_regex = r"[\w.-]+@[\w.-]+\.[a-zA-Z]{1,}"
    user_regex = r"(@\w{1,15})"
    
    return (
        df
        
        # Store the original text column in a new column for future reference
        .withColumn("original_text", f.col("text"))
        
        # Remove email addresses, URLs, and user mentions
        .withColumn("text", f.regexp_replace(f.col("text"), url_regex, ""))
        .withColumn("text", f.regexp_replace(f.col("text"), email_regex, ""))
        .withColumn("text", f.regexp_replace(f.col("text"), user_regex, ""))
        .withColumn("text", f.regexp_replace(f.col("text"), "#", " "))
        
        # Unescape any HTML
        .withColumn("text", html_unescape(f.col("text")))
        
        # Remove all numbers, double/multiple spaces, and leading/trailing whitespaces
        .withColumn("text", f.regexp_replace(f.col("text"), "[^a-zA-Z']", " "))
        .withColumn("text", f.regexp_replace(f.col("text"), " +", " "))
        .withColumn("text", f.trim(f.col("text")))
        
        # Ensure we don't end up with empty rows
        .filter(f.col("text") != "").na.drop(subset="text")
    )

data_in = clean_data(
    spark_reader.json(IN_PATH)
    .select(
        "id",
        # extract proper timestamp from created_at column
        f.to_timestamp(f.col("created_at"), timestampformat).alias("timestamp"),
        # extract user information
        f.col("user.screen_name").alias("user"),
        "text",
    )
    .coalesce(1)
)

if not data_in.isStreaming: 
    display(data_in.limit(10).toPandas())

### Apply Machine Learning Model

In [None]:
raw_sentiment = sentiment_model.transform(data_in)

# Select downstream columns
sentiment = raw_sentiment.select(
    "id", "timestamp", "user", "text", f.col("prediction").alias("user_sentiment")
)

if not data_in.isStreaming: 
    display(sentiment.limit(10).toPandas())

### Define Queries
Now let's define some (simple) queries

In [None]:
negative_sentiment_count = (
    sentiment.filter("user_sentiment == 0.0")
    .select(f.col("user_sentiment").alias("negative_sentiment"))
    .agg(f.count("negative_sentiment"))
)

positive_sentiment_count = (
    sentiment.filter("user_sentiment == 4.0")
    .select(f.col("user_sentiment").alias("positive_sentiment"))
    .agg(f.count("positive_sentiment"))
)

average_sentiment = sentiment.agg(f.avg("user_sentiment"))

### Select what to stream
Spark is more than capable of handling multiple streams at once, but since we are running on a small lab environment, we'll do one at a time.

Make your selection by running only 1 of the next 3 cells:

In [None]:
data_to_stream = negative_sentiment_count

In [None]:
data_to_stream = positive_sentiment_count

In [None]:
data_to_stream = average_sentiment

### Start Streaming
Now we can create our stream...

In [None]:
if isinstance(spark_reader, DataStreamReader):
    stream_writer = (
        data_to_stream.writeStream.queryName("streaming_table")
        .trigger(processingTime="20 seconds")
        # .trigger(once=True)
        .outputMode("complete")
        .format("memory")
    )
    # Calling .start on a DataStreamWriter return an instance of StreamingQuery
    query = stream_writer.start()

### Stream Output
...look at it's output.

In [None]:
# .lastProgress shows information on the last processed batch
if data_in.isStreaming:
    display(query.lastProgress)

In [None]:
# Let's see what we are outputting
if data_in.isStreaming:
    from time import sleep
    for x in range(0, 200):
        try:
            if not query.isActive:
                break
            print("Showing live view refreshed every 10 seconds")
            print(f"Seconds passed: {x*10}")
            result = spark.sql(f"SELECT * from {query.name}")
            # spark.sql can be used to request how the query is performing
            display(result.toPandas())
            sleep(10)
            clear_output(wait=True)
        except KeyboardInterrupt:
            break
    print("Live view ended...")
else:
    print("Not streaming, showing static output instead")
    result = data_to_stream
    display(result.limit(10).toPandas())

### Manage our stream

In [None]:
if data_in.isStreaming:
    display(query.isActive)

In [None]:
if data_in.isStreaming:
    # .stop() stops the query
    query.stop()

In [None]:
# start / restart our stream
if data_in.isStreaming:
    try:
        # query needs to be stopped before starting a new one.
        query.stop()
    except NameError:
        pass  # if query does not exist yet, we can ignore having to stop it
    finally:
        query = stream_writer.start()

## Clean up behind ourselves
Once done, make sure to stop the SparkSession, to avoid memory getting full.

In [None]:
spark.stop()