In [0]:
# create the widgets interactive form.

dbutils.widgets.removeAll()
dbutils.widgets.text("user_Name", "", "01 - Enter User_Name")
dbutils.widgets.text("password", "", "02 - Enter Password")
dbutils.widgets.text("user_id", "", "03 - Enter User ID (13–14 chars, A–Z, 0–9)")
dbutils.widgets.dropdown("product_id", "Loading...", ["Loading..."], "04 - Select Product ID")
dbutils.widgets.text("review", "", "05 - Enter Review")
dbutils.widgets.dropdown("day", "1", [str(i) for i in range(1, 32)], "06 - Select Day")
dbutils.widgets.dropdown("month", "January", [
    "January", "February", "March", "April", "May", "June",
    "July", "August", "September", "October", "November", "December"
], "07 - Select Month")
dbutils.widgets.dropdown("year", "2024", ["2023", "2024", "2025"], "08 - Select Year")

In [0]:
# # validate the following columns:
# Get username and password
user_Name = dbutils.widgets.get("user_Name")
password = dbutils.widgets.get("password")

# Check credentials
if user_Name != "admin" or password != "password123":
    raise ValueError("❌ Invalid login credentials. Please enter correct username and password.")
else:
    print("✅ Login successful!")


import re

# Fetch the User ID from the widget
user_id = dbutils.widgets.get("user_id").strip()

# Validate User ID: must be 13-14 chars, only uppercase A-Z and digits 0-9
if not re.match(r'^[A-Z0-9]{13,14}$', user_id):
    raise ValueError("❌ Invalid User ID. It must be 13–14 characters long and only contain A–Z and 0–9.")
else:
    print("✅ Valid User ID.")    

✅ Login successful!


In [0]:
# populat product IDs from this dataset.
df_products = spark.read.parquet("dbfs:/saved_data/processed_data3.parquet")

# Get distinct product IDs as a Python list
product_ids = df_products.select("ProductId").distinct().rdd.flatMap(lambda x: x).collect()

# Remove the placeholder dropdown widget first
dbutils.widgets.remove("product_id")

# Re-create the dropdown widget with actual product IDs (limit to first 50 for UI performance)
dbutils.widgets.dropdown("product_id", product_ids[0], product_ids[:50], "Select Product ID")

In [0]:
# load the saved model and metadata.

import joblib
from tensorflow.keras.models import load_model

# Load punctuation removal metadata
punctuation_metadata = joblib.load("/dbfs/saved_columns/punctuation_removal_metadata.pkl")

# Load other metadata as needed
spacing_metadata = joblib.load("/dbfs/saved_columns/punctuation_spacing_metadata.pkl")
lowercase_metadata = joblib.load("/dbfs/saved_columns/lowercase_metadata.pkl")
tokenization_metadata = joblib.load("/dbfs/saved_columns/tokenization_metadata.pkl")
stopword_metadata = joblib.load("/dbfs/saved_columns/stopword_removal_metadata.pkl")
lemmatization_metadata = joblib.load("/dbfs/saved_columns/lemmatization_metadata.pkl")
concat_metadata = joblib.load("/dbfs/saved_columns/concat_summary_tokens.pkl")
tokenizer_metadata = joblib.load("/dbfs/saved_columns/keras_tokenizer_metadata.pkl")
# **New: Save the actual tokenizer object**
tokenizer_object = joblib.load("/dbfs/saved_columns/keras_tokenizer.pkl")
label_encoding_metadata_metadata = joblib.load("/dbfs/saved_columns/label_encoding_Sentiment.pkl")

# Load the trained CNN sentiment model
model = load_model("/dbfs/models/cnn_sentiment_model.keras")

print("Metadata and model loaded successfully!")

Metadata and model loaded successfully!


In [0]:
# Apply the loaded models and metadata to the review text and make prediction.

import re
# Define a text input widget in Databricks
dbutils.widgets.text("review", "Type your review here")

# Get user input from the widget
review_text = dbutils.widgets.get("review")
# 3. Apply preprocessing (manual logic)
# Now review_text is safe to use
print("Review entered:", review_text)
# a. Remove punctuation using saved regex
regex_pattern = punctuation_metadata["regex_used"]
text = re.sub(regex_pattern, "", review_text)

# b. Fix spacing (simple fix: collapse multiple spaces)
text = re.sub(r"\s+", " ", text).strip()

# c. Lowercase
text = text.lower()

# d. Tokenize (split by space)
tokens = text.split()

# e. Remove stopwords
stopwords = set(stopword_metadata.get("stopwords", [])) or set([
    # fallback standard English stopwords
    "a", "an", "the", "and", "is", "it", "to", "in", "that", "this", "of", "for", "on", "with"
])
tokens = [t for t in tokens if t not in stopwords]

# f. Lemmatize (based on lemmatization dict if available)
lemmatizer_dict = lemmatization_metadata.get("lemma_dict", {})
tokens = [lemmatizer_dict.get(token, token) for token in tokens]

# g. Join tokens into single string
clean_text = " ".join(tokens)

# 4. Tokenize with Keras tokenizer
from keras.preprocessing.sequence import pad_sequences

sequences = tokenizer_object.texts_to_sequences([clean_text])
padded_input = pad_sequences(sequences, maxlen=100)  # Adjust maxlen to match training

# 5. Predict

prediction_probs = model.predict(padded_input)
predicted_index = prediction_probs.argmax(axis=1)[0]

# 6. Decode predicted label

inverse_label_encoding = {v: k for k, v in label_encoding_metadata_metadata["encoding"].items()}
predicted_label = inverse_label_encoding[predicted_index]

# 7. Output result

print(f"✅ Predicted Sentiment: {predicted_label}")

Review entered: Waoo! this product is very nice, it improved the growth of my dog.
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 118ms/step[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 136ms/step
✅ Predicted Sentiment: positive


In [0]:
# convert the prediction into a Spark DataFrame for saving to Delta Lake
from pyspark.sql import Row
from pyspark.sql.functions import current_date, dayofmonth, date_format, year, sha2, concat_ws, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta.tables import DeltaTable

# Get dynamic inputs from widgets
user_id = dbutils.widgets.get("user_id")        # Expects 13-14 char alphanumeric string
product_id = dbutils.widgets.get("product_id")  # From dropdown widget populated from your dataset
review_text = dbutils.widgets.get("review")
predicted_label = predicted_label  

# Define schema explicitly (add prediction_id)
schema = StructType([
    StructField("prediction_id", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("review", StringType(), True),
    StructField("sentiment", StringType(), True),
    StructField("day", IntegerType(), True),
    StructField("month", StringType(), True),  # Full month name as string
    StructField("year", IntegerType(), True),
])

# Create a Row object with your data and placeholder date values
prediction_row = Row(
    prediction_id=None,
    user_id=user_id,
    product_id=product_id,
    review=review_text,
    sentiment=predicted_label,
    day=None,
    month=None,
    year=None
)

# Create DataFrame with explicit schema
df_prediction = spark.createDataFrame([prediction_row], schema=schema)

# Add date columns (using current_date)
df_prediction = df_prediction.withColumn("prediction_date", current_date())
df_prediction = df_prediction.withColumn("day", dayofmonth("prediction_date"))
df_prediction = df_prediction.withColumn("month", date_format("prediction_date", "MMMM"))  # Month full name
df_prediction = df_prediction.withColumn("year", year("prediction_date"))

# Generate unique prediction_id by hashing key fields (user_id, product_id, review, prediction_date)
df_prediction = df_prediction.withColumn(
    "prediction_id",
    sha2(concat_ws("||", "user_id", "product_id", "review", "prediction_date"), 256)
)

# Delta table path
delta_table_path = "/mnt/delta/sentiment_predictions"

# If table exists, ensure 'prediction_id' column exists in the schema
if DeltaTable.isDeltaTable(spark, delta_table_path):
    df_existing = spark.read.format("delta").load(delta_table_path)
    if "prediction_id" not in df_existing.columns:
        # Add prediction_id column with nulls to existing table DataFrame
        df_existing = df_existing.withColumn("prediction_id", lit(None).cast("string"))
        # Overwrite table to update schema with new column
        df_existing.write.format("delta") \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .save(delta_table_path)

    # Load the DeltaTable object for merge
    delta_table = DeltaTable.forPath(spark, delta_table_path)

    # Perform merge/upsert on prediction_id to avoid duplicates
    delta_table.alias("tgt").merge(
        df_prediction.alias("src"),
        "tgt.prediction_id = src.prediction_id"
    ).whenNotMatchedInsertAll().execute()

else:
    # Table doesn't exist: create new Delta table
    df_prediction.write.format("delta").mode("overwrite").save(delta_table_path)

print("✅ Prediction saved with deduplication using prediction_id!")

✅ Prediction saved with deduplication using prediction_id!


In [0]:
# import the dataset from delta lake.
df = spark.read.format("delta").load("/mnt/delta/sentiment_predictions")
df.show(truncate=False)

+-------------+----------+------------------------------------------------------------------+---------+---+-----+----+---------------+----------------------------------------------------------------+
|user_id      |product_id|review                                                            |sentiment|day|month|year|prediction_date|prediction_id                                                   |
+-------------+----------+------------------------------------------------------------------+---------+---+-----+----+---------------+----------------------------------------------------------------+
|BB234TYR689JJ|B001BF7PV0|Waoo! this product is very nice, it improved the growth of my dog.|positive |23 |May  |2025|2025-05-23     |506005a9567fa162a34592df864d145e920b878a8718525bd544306b002a9aac|
|AR234TYR689JJ|B0001PB99K|This is very bad product, my dog didn't like it.                  |negative |23 |May  |2025|2025-05-23     |9169e33721e4c73c3a0d3a80d541ce70963c9e2fecf04fcc47457ec98e495f4e|


In [0]:
%sql
/*
Register the delta lake table as a SQL table.
*/
CREATE TABLE IF NOT EXISTS sentiment_predictions
USING DELTA
LOCATION '/mnt/delta/sentiment_predictions';

In [0]:
%sql
/*
View all the columns in the  SQL table.
*/
SELECT * FROM sentiment_predictions;

user_id,product_id,review,sentiment,day,month,year,prediction_date,prediction_id
BB234TYR689JJ,B001BF7PV0,"Waoo! this product is very nice, it improved the growth of my dog.",positive,23,May,2025,2025-05-23,506005a9567fa162a34592df864d145e920b878a8718525bd544306b002a9aac
AR234TYR689JJ,B0001PB99K,"This is very bad product, my dog didn't like it.",negative,23,May,2025,2025-05-23,9169e33721e4c73c3a0d3a80d541ce70963c9e2fecf04fcc47457ec98e495f4e


In [0]:
# To delete the data in the delta table and start recording new data, run the following code.

# delete the existing data in the delta table for a new ones.
delta_table_path = "/mnt/delta/sentiment_predictions"

# Load empty DataFrame with the same schema as your delta table
empty_df = spark.createDataFrame([], spark.read.format("delta").load(delta_table_path).schema)

# Overwrite the delta table with empty DataFrame to clear data
empty_df.write.format("delta").mode("overwrite").save(delta_table_path)

print("✅ Delta table data cleared, ready for fresh inserts.")

✅ Delta table data cleared, ready for fresh inserts.
