In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All"
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
!pip install pyspark

In [None]:
import pyspark
from IPython.display import display, clear_output
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as f
import pandas as pd
from pyspark.ml import PipelineModel
from pyspark.sql.functions import udf
from pyspark.sql.streaming import DataStreamReader
import html

pd.options.display.max_columns = None
pd.options.display.max_rows = 30
pd.options.display.max_colwidth = 150



# SETTINGS
IN_PATH = "/kaggle/input/twitter-data-for-spark-streaming/"
#IN_PATH = "./twitterdata"
timestampformat = "EEE MMM dd HH:mm:ss zzzz yyyy"

spark = SparkSession.builder.appName("StructuredStreamingExample").getOrCreate()
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")
schema = spark.read.json(IN_PATH).limit(10).schema


spark_reader = spark.readStream.schema(schema)

In [None]:
spark.version

In [None]:
#spark_reader.json(IN_PATH).show(2)
#spark_reader.json(IN_PATH).printSchema()

In [None]:
df = (
    spark_reader.json(IN_PATH)
    .select(
        "id",
        # extract proper timestamp from created_at column
        f.to_timestamp(f.col("created_at"), timestampformat).alias("timestamp"),
        # extract user information
        f.col("user.screen_name").alias("user"),
        "text",
    )
    .coalesce(1)
)
distinct_user_count = df.select(f.approx_count_distinct("user"), f.current_timestamp())

In [None]:
if not df.isStreaming:
    print("Plain old, basic DataFrame")
    # Some actions only work on non-streaming DataFrames, like show and toPandas
    distinct_user_count.show()
    display(df.limit(5).toPandas())
else:
    print("We are streaming!")
    # Creating a DataSreamWriter and StreamingQuery
    # ===
    # Calling .writeStream on a DataFrame returns an instance of DataStreamWriter
    stream_writer = (
        distinct_user_count.writeStream
        # DataStream queries need to be named
        .queryName("distinct_user_count")
        .trigger(
            # processingTime="5 seconds",
            # Setting 'once' to True will make spark only process the stream 1 time - great for debugging
            once=True,
        )
        .outputMode("complete")
        .format("memory")
    )
    # Calling .start on a DataStreamWriter return an instance of StreamingQuery
    query = stream_writer.start()

In [None]:
# .isStreaming can be used to determine if DataFrame is of Streaming kind or not
df.isStreaming

In [None]:
# .isActive shows if the query is actively running or not
query.isActive

In [None]:
# .start() transforms a DataStreamWriter to a StreamingQuery and starts the query execution
if not query.isActive:
    query = stream_writer.start()

# Calling .start on an already active StreamingQuery will raise an IllegalArgumentException
# -> 'Cannot start query with name {StreamingQuery.name} as a query with that name is already active'

In [None]:
# spark.sql can be used to request how the query is performing
display(spark.sql(f"SELECT * from {query.name}").toPandas())

In [None]:
# show live results for 2 minutes, refreshed every 1 second
from time import sleep
for x in range(0, 120):
    # spark.sql can be used to request how the query is performing
    display(spark.sql(f"SELECT * from {query.name}").toPandas())
    sleep(1)
    clear_output(wait=True)
else:
    print("Live view ended...")

In [None]:
@udf
def html_unescape(s: str):
    if isinstance(s, str):
        return html.unescape(s)
    return s


def clean_data(df: DataFrame):
    url_regex = r"((https?|ftp|file):\/{2,3})+([-\w+&@#/%=~|$?!:,.]*)|(www.)+([-\w+&@#/%=~|$?!:,.]*)"
    email_regex = r"[\w.-]+@[\w.-]+\.[a-zA-Z]{1,}"
    user_regex = r"(@\w{1,15})"

    return (
        df

        # Store the original text column in a new column for future reference
        .withColumn("original_text", f.col("text"))

        # Remove email addresses, URLs, and user mentions
        .withColumn("text", f.regexp_replace(f.col("text"), url_regex, ""))
        .withColumn("text", f.regexp_replace(f.col("text"), email_regex, ""))
        .withColumn("text", f.regexp_replace(f.col("text"), user_regex, ""))
        .withColumn("text", f.regexp_replace(f.col("text"), "#", " "))

        # Unescape any HTML
        .withColumn("text", html_unescape(f.col("text")))

        # Remove all numbers, double/multiple spaces, and leading/trailing whitespaces
        .withColumn("text", f.regexp_replace(f.col("text"), "[^a-zA-Z']", " "))
        .withColumn("text", f.regexp_replace(f.col("text"), " +", " "))
        .withColumn("text", f.trim(f.col("text")))

        # Ensure we don't end up with empty rows
        .filter(f.col("text") != "").na.drop(subset="text")
    )


In [None]:
streaming_data_raw = (
    spark_reader.json(IN_PATH)
    .select(
        "id",
        # extract proper timestamp from created_at column
        f.to_timestamp(f.col("created_at"), timestampformat).alias("timestamp"),
        # extract user information
        f.col("user.screen_name").alias("user"),
        "text",
    )
    .coalesce(1)
)
streaming_data_clean = clean_data(streaming_data_raw)

stream_writer = (streaming_data_clean.writeStream.queryName("data").trigger(once=True).outputMode("append").format("memory"))

query = stream_writer.start()


In [None]:
display(spark.sql(f"SELECT * from {query.name}").show())

In [None]:
distinct_user_count = streaming_data_clean.select(f.approx_count_distinct("user"), f.current_timestamp())

stream_writer = (distinct_user_count.writeStream.queryName("data").trigger(once=True).outputMode("complete").format("memory"))

query = stream_writer.start()

In [None]:
display(spark.sql(f"SELECT * from {query.name}").show())

In [None]:
sentiment_model = PipelineModel.load("/kaggle/input/pyspark-nlp/MODEL")
raw_sentiment = sentiment_model.transform(streaming_data_clean)

# Select downstream columns
sentiment = raw_sentiment.select(
    "id", "timestamp", "user", "text", f.col("prediction").alias("user_sentiment")
)


In [None]:
stream_writer = (sentiment.writeStream.queryName("data").trigger(once=True).outputMode("append").format("memory"))

query = stream_writer.start()

In [None]:
display(spark.sql(f"SELECT * from {query.name}").show())

In [None]:
negative_sentiment_count = (
    sentiment.filter("user_sentiment == 0.0")
    .select(f.col("user_sentiment").alias("negative_sentiment"))
    .agg(f.count("negative_sentiment"))
)

positive_sentiment_count = (
    sentiment.filter("user_sentiment == 4.0")
    .select(f.col("user_sentiment").alias("positive_sentiment"))
    .agg(f.count("positive_sentiment"))
)

average_sentiment = sentiment.agg(f.avg("user_sentiment"))

In [None]:
data_to_stream = average_sentiment

In [None]:
data_to_stream = negative_sentiment_count

In [None]:
data_to_stream = positive_sentiment_count

In [None]:
if isinstance(spark_reader, DataStreamReader):
    stream_writer = (
        data_to_stream.writeStream.queryName("streaming_table")
        .trigger(processingTime="20 seconds")
        #.trigger(once=True)
        .outputMode("complete")
        .format("memory")
    )
    # Calling .start on a DataStreamWriter return an instance of StreamingQuery
    query = stream_writer.start()

In [None]:
display(spark.sql(f"SELECT * from {query.name}").show())

In [None]:
streaming_data_clean.isStreaming

In [None]:
# Let's see what we are outputting
if streaming_data_clean.isStreaming:
    from time import sleep
    for x in range(0, 200):
        try:
            if not query.isActive:
                break
            print("Showing live view refreshed every 10 seconds")
            print(f"Seconds passed: {x*10}")
            result = spark.sql(f"SELECT * from {query.name}")
            # spark.sql can be used to request how the query is performing
            display(result.toPandas())
            sleep(10)
            clear_output(wait=True)
        except KeyboardInterrupt:
            break
    print("Live view ended...")
else:
    print("Not streaming, showing static output instead")
    result = data_to_stream
    display(result.limit(10).toPandas())