# Real-Time Stock Sentiment Analysis

Dataset Link for training the model: https://huggingface.co/datasets/TimKoornstra/financial-tweets-sentiment



## A. Data Preparation

### 1. Install hugging face datasets

using  `%pip install datasets`

In [0]:
%pip install datasets

Python interpreter will be restarted.
Collecting datasets
  Using cached datasets-3.5.0-py3-none-any.whl (491 kB)
Collecting huggingface-hub>=0.24.0
  Using cached huggingface_hub-0.30.1-py3-none-any.whl (481 kB)
Collecting multiprocess<0.70.17
  Using cached multiprocess-0.70.16-py39-none-any.whl (133 kB)
Collecting xxhash
  Using cached xxhash-3.5.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (193 kB)
Collecting aiohttp
  Using cached aiohttp-3.11.16-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.6 MB)
Collecting pyarrow>=15.0.0
  Using cached pyarrow-19.0.1-cp39-cp39-manylinux_2_28_x86_64.whl (42.1 MB)
Collecting dill<0.3.9,>=0.3.0
  Using cached dill-0.3.8-py3-none-any.whl (116 kB)
Collecting requests>=2.32.2
  Using cached requests-2.32.3-py3-none-any.whl (64 kB)
Collecting tqdm>=4.66.3
  Using cached tqdm-4.67.1-py3-none-any.whl (78 kB)
Collecting pyyaml>=5.1
  Using cached PyYAML-6.0.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (737 kB)
Co

In [0]:
# Import necessary PySpark functions
from pyspark.sql.functions import col, length, current_timestamp

# Import feature engineering APIs: Tokenizer, StopWordsRemover, SQLTransformer
from pyspark.ml.feature import Tokenizer, StopWordsRemover, SQLTransformer

# Import feature engineering APIs for text to vector conversion: Word2Vec, HashingTF, IDF, CountVectorizer
from pyspark.ml.feature import HashingTF, IDF, CountVectorizer, Word2Vec

# Import the following multi-class classifier: RandomForestClassifier, LogisticRegression, DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, DecisionTreeClassifier

# Import the Evaluator for F1 and Accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# import Pipeline and PipelineModel
from pyspark.ml import Pipeline, PipelineModel

# import other python libraries, including `json`, `time`
import json
import time

# Import `load_dataset` from `datasets`
from datasets import load_dataset

### 3. Load data into PySpark DataFrame

Run the following to download the dataset

```
dataset = load_dataset("TimKoornstra/financial-tweets-sentiment")
```


In [0]:
dataset = load_dataset("TimKoornstra/financial-tweets-sentiment")
train_data = dataset['train']
df = spark.createDataFrame(train_data)
df.cache()
df.printSchema()
df.show(10)

root
 |-- sentiment: long (nullable = true)
 |-- tweet: string (nullable = true)
 |-- url: string (nullable = true)

+---------+--------------------+--------------------+
|sentiment|               tweet|                 url|
+---------+--------------------+--------------------+
|        2|$BYND - JPMorgan ...|https://huggingfa...|
|        2|$CCL $RCL - Nomur...|https://huggingfa...|
|        2|$CX - Cemex cut a...|https://huggingfa...|
|        2|$ESS: BTIG Resear...|https://huggingfa...|
|        2|$FNKO - Funko sli...|https://huggingfa...|
|        2|$FTI - TechnipFMC...|https://huggingfa...|
|        2|$GM - GM loses a ...|https://huggingfa...|
|        2|$GM: Deutsche Ban...|https://huggingfa...|
|        2|$GTT: Cowen cuts ...|https://huggingfa...|
|        2|$HNHAF $HNHPD $AA...|https://huggingfa...|
+---------+--------------------+--------------------+
only showing top 10 rows



- Load the dataset['train'] into a Spark DataFrame `spark_df`
- cache the spark dataframe for faster processing later
- Display the schema and first 10 rows.

### 3. Check the distribution of sentiment values

Query DataFrame to show the counts of different sentiment values


In [0]:
df.groupBy("sentiment").count().show()

+---------+-----+
|sentiment|count|
+---------+-----+
|        0|12181|
|        1|17368|
|        2| 8542|
+---------+-----+



## B. Build the Feature Engineering Pipeline

In the following, we will carry out feature engineering steps that will be become stages of a pipeline. The goal is to build a feature vector that is most suitable for predict the sentiment of the tweet.

To prepare the text data for training:
- Augment data with timestamp
- Cleaning tweet texts, remove identifiers, URLs, etc.
- Tokenize the tweet text.
- Remove stop words from the tokenized text.
- Convert the list of words into numerical feature vectors using Word2Vec or TF-IDF.


I ask you to 
- test Transformer/Estimators as you go, by displaying, say, the first 30 rows.
-  But keep in mind, for them to work together, input columns must already exist in the dataframe, which could be generated in the previous step.



### 1. augment original data 

augment the dataframe by adding `cleaned_tweet` and `tweet_time` columns, where 

- `cleaned_tweet` is a copy of tweet for now (we will change this in later steps)
- `tweet_time` is calculated using `current_timestamp`

> Hints: Using `SQLTransformer`

In [0]:
from pyspark.sql.functions import current_timestamp

augment = SQLTransformer(
    statement="SELECT *, tweet as cleaned_tweet, current_timestamp() as tweet_time FROM __THIS__"
)
df_augmented = augment.transform(df)
df_augmented.show(5, truncate=False)

+---------+------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------+-----------------------+
|sentiment|tweet                                                                                                 |url                                                                      |cleaned_tweet                                                                                         |tweet_time             |
+---------+------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------+-----------------------+
|2        |$BYND - JPMorgan reels in expectations on

### 2.  Text cleaning

I have written a few additional text cleaning steps. They mostly use regular expressions to remove/replace unwanted segments

> Hints: Nowadays, you can ask GPT to write these regular expressions for you, and you can test them. 

- Each transformer replace `cleaned_tweet` with a transformed one.
- Your job is to choose that ones that make most sense to you (keep in mind, your job is to use these words to predict sentiment)
- Sequence matters, e.g. you want to remove ALL CAP words before converting them to lower case. 
- Test your favoriate steps with a pipeline  `cleaning_pipeline` instead of each one individually. 
- feel free to come up new ones that make sense to you.



In [0]:

st2 = SQLTransformer(statement="select tweet_time, tweet, REGEXP_REPLACE(cleaned_tweet, r'https?://\S+', '') as cleaned_tweet, sentiment from __THIS__") #remove URLs https:// and http:// \S means a non-space character.
st3 = SQLTransformer(statement="select tweet_time, tweet, REGEXP_REPLACE(cleaned_tweet, r'[^\w\s]','') as cleaned_tweet, sentiment from __THIS__") # remove non-alphanumeric, non space characters [^\w\s]  
st4 = SQLTransformer(statement="select tweet_time, tweet, REGEXP_REPLACE(cleaned_tweet, r'\\b[A-Z]*\\b','') as cleaned_tweet, sentiment from __THIS__")  #remove all cap words,  [A-Z] is a cap character, \b marks word boundary 
st5 = SQLTransformer(statement="select tweet_time, tweet, REGEXP_REPLACE(cleaned_tweet, r'\\b\w\\b',' ') as cleaned_tweet, sentiment from __THIS__") # remove word that is exactly one-character long
st6 = SQLTransformer(statement="select tweet_time, tweet, REGEXP_REPLACE(cleaned_tweet, r'\\b\d+\\b','') as cleaned_tweet, sentiment from __THIS__") # remove all digits words, \d is a digit
st7 = SQLTransformer(statement="select tweet_time, tweet, REGEXP_REPLACE(cleaned_tweet, r'\s+',' ') as cleaned_tweet, sentiment from __THIS__") # repace consecutive space with one, \s is a space or a tab.
st8 = SQLTransformer(statement="select tweet_time, tweet, trim(lower(cleaned_tweet)) as cleaned_tweet, sentiment from __THIS__") # convert to lower case and remove trailing/leading spaces. 


In [0]:
cleaning_pipeline = Pipeline(stages=[augment, st1, st2, st3, st4, st5, st6, st7, st8])
df_cleaned = cleaning_pipeline.fit(df).transform(df)
df_cleaned.select("tweet", "cleaned_tweet").show(5, truncate=False)

+------------------------------------------------------------------------------------------------------+------------------------------------------------------------------+
|tweet                                                                                                 |cleaned_tweet                                                     |
+------------------------------------------------------------------------------------------------------+------------------------------------------------------------------+
|$BYND - JPMorgan reels in expectations on Beyond Meat https://t.co/bd0xbFGjkT                         |jpmorgan reels in expectations on beyond meat                     |
|$CCL $RCL - Nomura points to bookings weakness at Carnival and Royal Caribbean https://t.co/yGjpT2ReD3|nomura points to bookings weakness at carnival and royal caribbean|
|$CX - Cemex cut at Credit Suisse, J.P. Morgan on weak building outlook https://t.co/KN1g4AWFIb        |cemex cut at credit suisse morgan on

### 3. Tokenization

that is, to convert a sentence into a list of words

In [0]:
tokenizer = Tokenizer(inputCol="cleaned_tweet", outputCol="words")
df_tokenized = tokenizer.transform(df_cleaned)
df_tokenized.select("cleaned_tweet", "words").show(5, truncate=False)

+------------------------------------------------------------------+-----------------------------------------------------------------------------+
|cleaned_tweet                                                     |words                                                                        |
+------------------------------------------------------------------+-----------------------------------------------------------------------------+
|jpmorgan reels in expectations on beyond meat                     |[jpmorgan, reels, in, expectations, on, beyond, meat]                        |
|nomura points to bookings weakness at carnival and royal caribbean|[nomura, points, to, bookings, weakness, at, carnival, and, royal, caribbean]|
|cemex cut at credit suisse morgan on weak building outlook        |[cemex, cut, at, credit, suisse, morgan, on, weak, building, outlook]        |
|research cuts to neutral                                          |[research, cuts, to, neutral]                     

### 4. Remove stop words

A standard StopWordsRemover is defined using:
```python
StopWordsRemover(inputCol=, outputCol=)
```
It will use a standard stop word list.

If you want to modify/augment the list of stop words:

```python
# Get the default stopword list
stopwords_list = StopWordsRemover().getStopWords()
# modify or augment it 
stopwords_list_enhanced = ...
# build a remover with custom word list
remover = StopWordsRemover(inputCol=, outputCol=, stopWords=stopwords_list_enhanced)
```


In [0]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
df_filtered = remover.transform(df_tokenized)
df_filtered.select("filtered_words").show(5, truncate=False)

+----------------------------------------------------------------+
|filtered_words                                                  |
+----------------------------------------------------------------+
|[jpmorgan, reels, expectations, beyond, meat]                   |
|[nomura, points, bookings, weakness, carnival, royal, caribbean]|
|[cemex, cut, credit, suisse, morgan, weak, building, outlook]   |
|[research, cuts, neutral]                                       |
|[funko, slides, piper, jaffray, cut]                            |
+----------------------------------------------------------------+
only showing top 5 rows




### 5. Vectorization text

You can convert list of words into numercal vectors in a few different ways.

- `Word2Vec`: neural network based word embedding developed by Google in 2013
  - `Word2Vec([vectorSize=], [minCount=], inputCol=, outputCol=, ....)`
  - `[fittedModel].getVectors()`: will return a list of keyword and associated vectors
- `CountVectorizer`: a simple way to vectorize based on word count.
  - `CountVectorizer([minTF=], [vocabSize=], inputCol=, outputCol=, ...)` 
- `TF-IDF`: based on term frequency, but weighted by inverse document frequence (penalize words that appear too often, like stop words). In pySpark, this is done in two steps via HashTF and IDF
  -  `HashingTF(inputCol=, outputCol=, [numFeatures=])`: the last one is how many terms/words
  -  `IDF(inputCol=, outputCol=)`: weigh term frequency by IDF

Implement one of the approaches

In [0]:
# TF-IDF
hashingTF = HashingTF(inputCol="filtered_words", outputCol="raw_features", numFeatures=1000)
idf = IDF(inputCol="raw_features", outputCol="features")

df_featurized = hashingTF.transform(df_filtered)
df_tfidf = idf.fit(df_featurized).transform(df_featurized)
df_tfidf.select("features").show(5)

+--------------------+
|            features|
+--------------------+
|(1000,[336,408,82...|
|(1000,[42,166,213...|
|(1000,[206,348,50...|
|(1000,[78,432,437...|
|(1000,[206,346,37...|
+--------------------+
only showing top 5 rows



## C. Prepare for Classifier training

### 1. Split the prepared dataframe

- into training/testing (`train_df`, `test_df`) using 80-20 random split. 

In [0]:
train_df, test_df = df_tfidf.randomSplit([0.8, 0.2], seed=42)



### 2. Train a Model(s)

Now, we'll train machine learning models on the processed dataset and evaluate their performance.

Because the sentiment has three values,  **(1 for bullish, 2 for bearish, and 0 for neutral)**, we need to only use Classifier that works with multiple classes. 

You may try:

- Random Forest
- Logist Regression:  by default, it can automatically choose between binomial (two classes)/multinomial (multiple classes)
- Decision Tree

make sure you save the fitted model as a variable, which may use in the Pipeline.

In [0]:
# Logistic Regression
lr = LogisticRegression(featuresCol="features", labelCol="sentiment")
lr_model = lr.fit(train_df)

# Random Forest
rf = RandomForestClassifier(featuresCol="features", labelCol="sentiment")
rf_model = rf.fit(train_df)

In [0]:
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

evaluator_f1 = MulticlassClassificationEvaluator(labelCol="sentiment", metricName="f1")
evaluator_acc = MulticlassClassificationEvaluator(labelCol="sentiment", metricName="accuracy")

dt = DecisionTreeClassifier(
    featuresCol="features",
    labelCol="sentiment",
    maxDepth=15, 
    minInstancesPerNode=20,  
    impurity='gini'  
)

rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="sentiment",
    numTrees=100, 
    maxDepth=12,
    featureSubsetStrategy='sqrt', 
    subsamplingRate=0.8
)

lr = LogisticRegression(
    featuresCol="features",
    labelCol="sentiment",
    maxIter=20, 
    regParam=0.1, 
    elasticNetParam=0.5, 
    family='multinomial'
)

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 0.3]) \
    .addGrid(lr.maxIter, [10, 20]) \
    .build()

crossval = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator_f1,
    numFolds=3
)

dt_model = dt.fit(train_df)
rf_model = rf.fit(train_df)
lr_model = crossval.fit(train_df).bestModel  

### 3. Model evaluation

define an F1 evalutor and an accuracy evaluator.

- use the train model to make a prediction on the `test_df` dataset.
- Evaluate/print its F1-score and accuracy score.
- try to obtain an >0.5 F1-score and a >0.5 accuracy (>0.6 is possiable but not required)
    - try differnet feature engineering steps/models or their parameters and see if it leads to better result.




In [0]:
def evaluate_model(model, name):
    predictions = model.transform(test_df)
    f1 = evaluator_f1.evaluate(predictions)
    acc = evaluator_acc.evaluate(predictions)
    print(f"\n{name} Performance:")
    print(f"- F1 Score: {f1:.3f}")
    print(f"- Accuracy: {acc:.3f}")
    predictions.groupBy("prediction").count().show()

evaluate_model(dt_model, "Decision Tree")
evaluate_model(rf_model, "Random Forest")
evaluate_model(lr_model, "Logistic Regression")


Decision Tree Performance:
- F1 Score: 0.423
- Accuracy: 0.507
+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|  452|
|       1.0| 6544|
|       2.0|  482|
+----------+-----+


Random Forest Performance:
- F1 Score: 0.393
- Accuracy: 0.505
+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|  509|
|       1.0| 6915|
|       2.0|   54|
+----------+-----+


Logistic Regression Performance:
- F1 Score: 0.552
- Accuracy: 0.579
+----------+-----+
|prediction|count|
+----------+-----+
|       0.0| 1696|
|       1.0| 5059|
|       2.0|  723|
+----------+-----+



**Logistic Regression** performs better than other models and give >0.5 in f1 and accuracy

### 4. inpect predictions

- select `tweet`, `sentiment`, and `predition` to compare them. 
- show the distribution of `sentment`
- compare it withe the distribution of `prediction`

In [0]:
predictions.select("tweet", "sentiment", "prediction").show(10)
predictions.groupBy("sentiment").count().show()
predictions.groupBy("prediction").count().show()

+--------------------+---------+----------+
|               tweet|sentiment|prediction|
+--------------------+---------+----------+
|"Be nice to peopl...|        0|       1.0|
|"Frozen II" is ex...|        0|       0.0|
|"I almost burst i...|        0|       1.0|
|"In my time at th...|        0|       1.0|
|"No water. No wat...|        0|       0.0|
|"The Committee al...|        0|       2.0|
|"They All Knew!" ...|        0|       1.0|
|"We Totally Faile...|        2|       0.0|
|#FridayReads: Wha...|        0|       2.0|
|#HOLIDAY #SPECIAL...|        0|       0.0|
+--------------------+---------+----------+
only showing top 10 rows

+---------+-----+
|sentiment|count|
+---------+-----+
|        0| 2362|
|        1| 3415|
|        2| 1701|
+---------+-----+

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0| 1727|
|       1.0| 5181|
|       2.0|  570|
+----------+-----+




### 5. define, train, and deploy a pipeline

- define a pipeline that has your chosen feature engineering steps and the model. 
- test the pipeline on the original data

In [0]:
from pyspark.ml import Pipeline

full_pipeline = Pipeline(stages=[
    augment,
    st1, st2, st3, st4, st5, st6, st7, st8,
    tokenizer,
    remover,
    hashingTF,       
    idf,
    lr               
])


In [0]:
pipeline_model = full_pipeline.fit(df)
transformed_df = pipeline_model.transform(df)
transformed_df.select("tweet", "sentiment", "prediction").show(10, truncate=False)

+------------------------------------------------------------------------------------------------------+---------+----------+
|tweet                                                                                                 |sentiment|prediction|
+------------------------------------------------------------------------------------------------------+---------+----------+
|$BYND - JPMorgan reels in expectations on Beyond Meat https://t.co/bd0xbFGjkT                         |2        |1.0       |
|$CCL $RCL - Nomura points to bookings weakness at Carnival and Royal Caribbean https://t.co/yGjpT2ReD3|2        |1.0       |
|$CX - Cemex cut at Credit Suisse, J.P. Morgan on weak building outlook https://t.co/KN1g4AWFIb        |2        |1.0       |
|$ESS: BTIG Research cuts to Neutral https://t.co/MCyfTsXc2N                                           |2        |1.0       |
|$FNKO - Funko slides after Piper Jaffray PT cut https://t.co/z37IJmCQzB                               |2        |1.0 

### 6. Save the pipeline model

- save to `/databricks/driver/tmp/sentiment_model` on the driver node's local file system using PipelineModel's `write().overwrite().save(path)` 
    - `.overwrite()` will set it to overwrite mode (if already exists)
    - `.write()` returns an **MLWriter** instance
- we later will use this.

In [0]:
pipeline_model.write().overwrite().save("/databricks/driver/tmp/sentiment_model")

## D. Set up Streaming


### 1. Develop the streaming processing logic in the batch mode
Since we don't have **real-time tweets**, we will simulate streaming using **historical financial tweets** from the dataset.

- Please first run the data generator step 2 for a while to put some files in the target folder on the driver node.
- load the data into a Spark Dataframe `df` from `/databricks/driver/streaming_source` (in the local file system)
- load the saved pipeline and apply it on the data frame.
- creates a `counts_df` dataframe that counts the number of predicted sentiment values by value type (i.e., 0, 1, 2) and window (using 1 minute tumbling window)
- validate the results by displaying them

In [0]:
from pyspark.sql.types import StructType, StringType, IntegerType

schema = StructType() \
    .add("tweet", StringType()) \
    .add("sentiment", IntegerType()) \
    .add("tweet_time", StringType())

df = spark.read.schema(schema).json("file:/databricks/driver/streaming_source")

df.show(5, truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+-------------------+
|tweet                                                                                                                                                                                      |sentiment|tweet_time         |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+-------------------+
|Amazon Announces First-Ever ‘Amazon Future Engineer Teacher of the Year Awards,’ Awarding Seven All-Star Teachers $25,000 Prize Packages for Exemplar Work with Students Across the Country|0        |2025-04-06 00:15:01|
|Samenvatting: Japanse NEDO en Panasonic behalen ‘s werelds hoogste conversie-efficiëntie van 16,09% voor grootste zonne

In [0]:
from pyspark.ml import PipelineModel

pipeline_model = PipelineModel.load("/databricks/driver/tmp/sentiment_model")


In [0]:
df_clean = df.drop("tweet_time")
predictions = pipeline_model.transform(df_clean)
predictions.select("tweet", "sentiment", "prediction").show(10, truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+----------+
|tweet                                                                                                                                                                                      |sentiment|prediction|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+----------+
|Amazon Announces First-Ever ‘Amazon Future Engineer Teacher of the Year Awards,’ Awarding Seven All-Star Teachers $25,000 Prize Packages for Exemplar Work with Students Across the Country|0        |1.0       |
|Samenvatting: Japanse NEDO en Panasonic behalen ‘s werelds hoogste conversie-efficiëntie van 16,09% voor grootste zonnecelmodule met Perovskite per gebied 

In [0]:
from pyspark.sql.functions import col, window, to_timestamp

predictions_ts = predictions.withColumn("tweet_time", to_timestamp("tweet_time"))

counts_df = predictions_ts.groupBy(
    window(col("tweet_time"), "1 minute"),
    col("prediction")
).count()

counts_df.show(truncate=False)

+------------------------------------------+----------+-----+
|window                                    |prediction|count|
+------------------------------------------+----------+-----+
|{2025-04-06 00:48:00, 2025-04-06 00:49:00}|1.0       |2992 |
+------------------------------------------+----------+-----+



In [0]:
display(predictions.select("tweet_time", "tweet", "sentiment", "prediction").limit(10))


tweet_time,tweet,sentiment,prediction
2025-04-06T00:55:21.678+0000,"Amazon Announces First-Ever ‘Amazon Future Engineer Teacher of the Year Awards,’ Awarding Seven All-Star Teachers $25,000 Prize Packages for Exemplar Work with Students Across the Country",0,1.0
2025-04-06T00:55:21.678+0000,"Samenvatting: Japanse NEDO en Panasonic behalen ‘s werelds hoogste conversie-efficiëntie van 16,09% voor grootste zonnecelmodule met Perovskite per gebied",0,1.0
2025-04-06T00:55:21.678+0000,"Medical Properties Trust, Inc. Completes 2019 With Record $4.5 Billion in Acquisitions for 64% Growth Rate and Delivers Market-Leading Shareholder Returns",1,1.0
2025-04-06T00:55:21.678+0000,The stricken Bank of Jinzhou will unload $21 billion of assets to the central bank for less than a third of theirÃ‚Â r… https://t.co/xGuCP9lBlH,0,1.0
2025-04-06T00:55:21.678+0000,"Some gamers accessing Google's new cloud gaming platform, Stadia, through a Chromecast Ultra dongle are reporting trouble… https://t.co/t6dCbMr0C3",2,1.0
2025-04-06T00:55:21.678+0000,"“Vaping is taking us backward,” CVS Health CEO Larry Merlo says. “Something has to be done.” https://t.co/CwxbCtXvWW https://t.co/NBv10IejXx",2,1.0
2025-04-06T00:55:21.678+0000,“I think it’s a time of great transformation in retail right now.” @stitchfix CEO Katrina Lake discusses how the co… https://t.co/gQgzOBNaUg,0,1.0
2025-04-06T00:55:21.678+0000,"Greenlane Renewables Signs New $7.0 Million System Supply Contract with the Renewable Natural Gas Company, a Leader in Landfill Gas to RNG Projects",0,1.0
2025-04-06T00:55:21.678+0000,Apple delaying the theatrical release of “The Banker” after one of the movie’s producers was accused of assault by… https://t.co/rqPyMuLZob,0,1.0
2025-04-06T00:55:21.678+0000,St. Louis Fed President Jim Bullard recommends declaring a “National Pandemic Adjustment Period' and discusses thre… https://t.co/o942U7BKPI,0,1.0



### 2. Develop the Stream Processing Application
1. Create a **structured streaming DataFrame** by reading from a folder where **new files** are added over time (`readStream`).
2. using the same logic developed in the above
3. save the `counts_df` to a memory table `counts`
4. Don't start the streaming processing yet

In [0]:
from pyspark.sql.types import StructType, StringType, IntegerType

schema = StructType() \
    .add("tweet", StringType()) \
    .add("sentiment", IntegerType()) \
    .add("tweet_time", StringType())

streaming_df = spark.readStream.schema(schema).json("file:/databricks/driver/streaming_source")

In [0]:
streaming_df_clean = streaming_df.drop("tweet_time")

streaming_predictions = pipeline_model.transform(streaming_df_clean)

In [0]:
from pyspark.sql.functions import to_timestamp

streaming_predictions_ts = streaming_predictions.withColumn(
    "tweet_time", to_timestamp("tweet_time")
)

In [0]:
from pyspark.sql.functions import window, col

counts_df = streaming_predictions_ts.groupBy(
    window(col("tweet_time"), "1 minute"),
    col("prediction")
).count()

In [0]:
query = counts_df.writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("counts") 

### 4. Fetch results from counts table and visualize it

 - fetch `prediction`, `time`, and `count` from `counts` table
    - where `time` is based on the "end" of the window, and reformated using `date_format` with a format of `MMM-dd HH:mm" 
 - to prevent slow down, don't do any order by statements
 - choose an appropriate visualization to show the counts of different predictions in each window over time.


In [0]:
stream_handle = query.start()

Had to start the query or else I will get error in next code that counts table not found

In [0]:
from pyspark.sql.functions import date_format, col

counts_query_df = spark.sql("SELECT * FROM counts")

formatted_counts_df = counts_query_df.select(
    date_format(col("window.end"), "MMM-dd HH:mm").alias("time"),
    col("prediction").cast("int").alias("sentiment_prediction"),
    col("count")
)

In [0]:
display(formatted_counts_df, 10)


time,sentiment_prediction,count
Apr-06 01:12,1,40
Apr-06 01:10,1,56
Apr-06 01:08,1,4393
Apr-06 01:09,1,65
Apr-06 01:11,1,64


Databricks visualization. Run in Databricks to view.


### 5. Start the streaming pipeline system

- first start the data generator
- then start the streaming processing app
- then refresh the visualization from time to time to see the changes


In [0]:
stream_handle = query.start()

In [0]:
display(formatted_counts_df)

time,sentiment_prediction,count
Apr-06 01:14,1,59
Apr-06 01:12,1,52
Apr-06 01:15,1,62
Apr-06 01:13,1,62
Apr-06 01:10,1,56
Apr-06 01:08,1,4393
Apr-06 01:09,1,65
Apr-06 01:11,1,64


Databricks visualization. Run in Databricks to view.


### 6. Stop the data generator and the stream processing engine

In [0]:
stream_handle.stop()

In [0]:
spark.streams.active

Out[55]: []

All closed