In [0]:
displayHTML('''
# <span style="color: #ffffff;">Streaming (Simulation)</span>
<style>
@import url('https://fonts.cdnfonts.com/css/avenir-next-lt-pro?styles=29974');
</style>

<div style="background: transparent;
            padding: 10px; color: white; border-radius: 300px; text-align: center;
            border: 2px solid #C32A68;">
    <center><h2 style="margin-left: 120px;margin-top: 10px; margin-bottom: 4px; color: #C32A68;
                       font-size: 34px; font-family: 'Avenir Next LT Pro', sans-serif;"><b>Streaming (Simulation)</b></h2></center>
</div>
''')

In [0]:
# # If it has already been run and you want to run it again, we have to make sure that the directory is cleaned and the table is recreated correctly
# spark.sql("DROP TABLE IF EXISTS live_sentiment_results_table")
# dbutils.fs.rm("dbfs:/FileStore/BDAProject/AdvancedStreamingSimulation/", recurse=True)
# dbutils.fs.mkdirs("dbfs:/FileStore/BDAProject/AdvancedStreamingSimulation/input_texts/")

Out[32]: True

# **☁️ Advanced Streaming Simulation (Demonstration)** 

## Chunk 1: New Reviews (January 2, 2023)
This cell writes a CSV file with two new reviews to the input directory. Run this cell first, then wait 10–15 seconds before running the next chunk.

In [0]:
# Run this in a new Python cell AFTER the initial chunks are processed by the stream
import os

advanced_streaming_path = "dbfs:/FileStore/BDAProject/AdvancedStreamingSimulation"
advanced_streaming_input_path = os.path.join(advanced_streaming_path, "input_texts")  # Input directory for streaming

new_review_data = """review_id,rating,review_text,parent_asin,user_id,timestamp,helpful_vote,timestamp_dt,year,month,day,day_of_week,hour
new_review_001,5.0,"This is an amazing new product I just received and it works perfectly!",new_asin_001,new_user_001,1672531200,2,"2023-01-01T00:00:00.000+0000",2023,1,1,1,0
new_review_002,1.0,"Unfortunately this item broke after one day very disappointing.",new_asin_002,new_user_002,1672531201,0,"2023-01-01T00:00:01.000+0000",2023,1,1,1,0
"""
dbutils.fs.put(os.path.join(advanced_streaming_input_path, "new_single_review_batch.csv"), new_review_data, overwrite=True)
print(f"Simulated new review file written to {os.path.join(advanced_streaming_input_path, 'new_single_review_batch.csv')}")

Wrote 460 bytes.
Simulated new review file written to dbfs:/FileStore/BDAProject/AdvancedStreamingSimulation/input_texts/new_single_review_batch.csv


In [0]:
%sql
SELECT * FROM live_sentiment_results_table;

review_id,SentimentLabel,SentimentScore
new_review_001,Positive,0.9916028380393982
new_review_002,Negative,0.9736137986183168


In [0]:
%sql
SELECT SentimentLabel, COUNT(*) as Review_Count
FROM live_sentiment_results_table
GROUP BY SentimentLabel
ORDER BY Review_Count DESC;

SentimentLabel,Review_Count
Negative,1
Positive,1


---

## **Chunk 2:** New Reviews (January 3, 2023)
This cell writes another CSV file with two new reviews. Run this after the first chunk has been processed (wait 10–15 seconds).

In [0]:
advanced_streaming_path = "dbfs:/FileStore/BDAProject/AdvancedStreamingSimulation"
advanced_streaming_input_path = os.path.join(advanced_streaming_path, "input_texts")

new_review_data = """review_id,rating,review_text,parent_asin,user_id,timestamp,helpful_vote,timestamp_dt,year,month,day,day_of_week,hour
new_review_005,5.0,"This monitor has vibrant colors and is perfect for gaming!",new_asin_005,new_user_005,1672704000,5,"2023-01-03T00:00:00.000+0000",2023,1,3,3,0
new_review_006,3.0,"Decent product but the setup was a bit confusing.",new_asin_006,new_user_006,1672704001,2,"2023-01-03T00:00:01.000+0000",2023,1,3,3,0
"""
dbutils.fs.put(os.path.join(advanced_streaming_input_path, "new_single_review_batch_2.csv"), new_review_data, overwrite=True)
print(f"Simulated new review file written to {os.path.join(advanced_streaming_input_path, 'new_single_review_batch_2.csv')}")

Wrote 434 bytes.
Simulated new review file written to dbfs:/FileStore/BDAProject/AdvancedStreamingSimulation/input_texts/new_single_review_batch_2.csv


In [0]:
%sql
SELECT * FROM live_sentiment_results_table;

review_id,SentimentLabel,SentimentScore
new_review_001,Positive,0.9916028380393982
new_review_002,Negative,0.9736137986183168
new_review_005,Positive,0.9478766918182372
new_review_006,Negative,0.8762170672416687


In [0]:
%sql
SELECT SentimentLabel, COUNT(*) as Review_Count
FROM live_sentiment_results_table
GROUP BY SentimentLabel
ORDER BY Review_Count DESC;

SentimentLabel,Review_Count
Positive,2
Negative,2


---

## **Chunk 3:** New Reviews (January 4, 2023)
This cell writes a third CSV file with two new reviews. Run this after the second chunk has been processed.

In [0]:
advanced_streaming_path = "dbfs:/FileStore/BDAProject/AdvancedStreamingSimulation"
advanced_streaming_input_path = os.path.join(advanced_streaming_path, "input_texts")

new_review_data = """review_id,rating,review_text,parent_asin,user_id,timestamp,helpful_vote,timestamp_dt,year,month,day,day_of_week,hour
new_review_007,1.0,"Terrible experience, the device stopped working after a week.",new_asin_007,new_user_007,1672790400,0,"2023-01-04T00:00:00.000+0000",2023,1,4,4,0
new_review_008,4.0,"Pretty good headphones, sound quality is impressive.",new_asin_008,new_user_008,1672790401,4,"2023-01-04T00:00:01.000+0000",2023,1,4,4,0
"""
dbutils.fs.put(os.path.join(advanced_streaming_input_path, "new_single_review_batch_3.csv"), new_review_data, overwrite=True)
print(f"Simulated new review file written to {os.path.join(advanced_streaming_input_path, 'new_single_review_batch_3.csv')}")

Wrote 440 bytes.
Simulated new review file written to dbfs:/FileStore/BDAProject/AdvancedStreamingSimulation/input_texts/new_single_review_batch_3.csv


In [0]:
%sql
SELECT * FROM live_sentiment_results_table;

review_id,SentimentLabel,SentimentScore
new_review_001,Positive,0.9916028380393982
new_review_002,Negative,0.9736137986183168
new_review_005,Positive,0.9478766918182372
new_review_007,Negative,0.9797434210777284
new_review_008,Positive,0.987636148929596
new_review_006,Negative,0.8762170672416687


In [0]:
%sql
SELECT SentimentLabel, COUNT(*) as Review_Count
FROM live_sentiment_results_table
GROUP BY SentimentLabel
ORDER BY Review_Count DESC;

SentimentLabel,Review_Count
Positive,3
Negative,3


---

## **Chunk 4:** New Reviews (January 5, 2023)
This cell writes a fourth CSV file with two new reviews. Run this after the third chunk has been processed.

In [0]:
advanced_streaming_path = "dbfs:/FileStore/BDAProject/AdvancedStreamingSimulation"
advanced_streaming_input_path = os.path.join(advanced_streaming_path, "input_texts")

new_review_data = """review_id,rating,review_text,parent_asin,user_id,timestamp,helpful_vote,timestamp_dt,year,month,day,day_of_week,hour
new_review_009,3.0,"The keyboard is okay but the keys feel a bit cheap.",new_asin_009,new_user_009,1672876800,1,"2023-01-05T00:00:00.000+0000",2023,1,5,5,0
new_review_010,5.0,"Love this smart speaker, integrates well with my home setup!",new_asin_010,new_user_010,1672876801,6,"2023-01-05T00:00:01.000+0000",2023,1,5,5,0
"""
dbutils.fs.put(os.path.join(advanced_streaming_input_path, "new_single_review_batch_4.csv"), new_review_data, overwrite=True)
print(f"Simulated new review file written to {os.path.join(advanced_streaming_input_path, 'new_single_review_batch_4.csv')}")

Wrote 438 bytes.
Simulated new review file written to dbfs:/FileStore/BDAProject/AdvancedStreamingSimulation/input_texts/new_single_review_batch_4.csv


In [0]:
%sql
SELECT * FROM live_sentiment_results_table;

review_id,SentimentLabel,SentimentScore
new_review_001,Positive,0.9916028380393982
new_review_002,Negative,0.9736137986183168
new_review_005,Positive,0.9478766918182372
new_review_007,Negative,0.9797434210777284
new_review_008,Positive,0.987636148929596
new_review_009,Negative,0.7507771253585815
new_review_010,Positive,0.9913573265075684
new_review_006,Negative,0.8762170672416687


In [0]:
%sql
SELECT SentimentLabel, COUNT(*) as Review_Count
FROM live_sentiment_results_table
GROUP BY SentimentLabel
ORDER BY Review_Count DESC;

SentimentLabel,Review_Count
Positive,4
Negative,4


### **Final Notes**
Run each chunk cell sequentially, waiting 10–15 seconds between runs to ensure the streaming query processes each file. You can extend this notebook by adding more chunks with different review data if needed.