  Week 2 



In [0]:
from pyspark.sql.functions import col

# reading the data 
input_silver_path = "dbfs:/mnt/silver-feedback/" 

df_silver = spark.read.parquet(input_silver_path)
display(df_silver)


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# dummy data
dummy_data = [
    ("P001","S001","safety",3,7,"Felt much safer after the session","2025-01-05"),
    ("P001","S001","connection",4,8,"Loved the group energy","2025-01-05"),
    ("P001","S002","clarity",5,6,"Instructions were clear","2025-01-12"),
    ("P001","S002","safety",2,7,"Very calming and grounding","2025-01-12"),
    ("P002","S003","connection",6,6,"Neutral experience","2025-01-15"),
    ("P002","S003","clarity",5,4,"A bit confusing at times","2025-01-15"),
    ("P002","S004","safety",3,8,"Felt supported and safe","2025-01-20"),
    ("P002","S004","connection",4,9,"Strong sense of community","2025-01-20"),
    ("P001","S005","clarity",6,9,"Everything clicked today","2025-01-22"),
    ("P001","S005","connection",5,8,"Instructor was very present","2025-01-22")
]

# Define schema
schema = StructType([
    StructField("PractitionerID", StringType()),
    StructField("SessionID", StringType()),
    StructField("Question", StringType()),
    StructField("Emotion_Before", IntegerType()),
    StructField("Emotion_After", IntegerType()),
    StructField("Comment_Cleaned", StringType()),
    StructField("Feedback_Date", StringType())
])

# Creating Spark DataFrame
raw_data = spark.createDataFrame(dummy_data, schema)


Handling text normalization and profanity words

In [0]:
from pyspark.sql.functions import lower, trim, regexp_replace, col

profanity_words = ["damn", "shit"]
pattern = "|".join([f"\\b{w}\\b" for w in profanity_words])

df_text_clean = raw_data.withColumn(
    "Comment_Normalized",
    lower(trim(col("Comment_Cleaned")))
)

df_text_clean = df_text_clean.withColumn(
    "Comment_Normalized",
    regexp_replace("Comment_Normalized", pattern, "***")
)

display(df_text_clean.select("Comment_Cleaned", "Comment_Normalized"))


Comment_Cleaned,Comment_Normalized
Felt much safer after the session,felt much safer after the session
Loved the group energy,loved the group energy
Instructions were clear,instructions were clear
Very calming and grounding,very calming and grounding
Neutral experience,neutral experience
A bit confusing at times,a bit confusing at times
Felt supported and safe,felt supported and safe
Strong sense of community,strong sense of community
Everything clicked today,everything clicked today
Instructor was very present,instructor was very present


In [0]:

%pip install scikit-learn


[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


Lightweight Sentiment Analysis

In [0]:
from pyspark.sql import Row

# Simple labeled examples
training_examples = [
    Row(Comment="i felt great and calm", Sentiment="Positive"),
    Row(Comment="instructor was very supportive", Sentiment="Positive"),
    Row(Comment="session was good and relaxing", Sentiment="Positive"),
    Row(Comment="it was okay overall", Sentiment="Neutral"),
    Row(Comment="i felt fine", Sentiment="Neutral"),
    Row(Comment="nothing special", Sentiment="Neutral"),
    Row(Comment="i was stressed and confused", Sentiment="Negative"),
    Row(Comment="room was too crowded and loud", Sentiment="Negative"),
    Row(Comment="i did not like the class", Sentiment="Negative")
]

train_df = spark.createDataFrame(training_examples)
display(train_df)


Comment,Sentiment
i felt great and calm,Positive
instructor was very supportive,Positive
session was good and relaxing,Positive
it was okay overall,Neutral
i felt fine,Neutral
nothing special,Neutral
i was stressed and confused,Negative
room was too crowded and loud,Negative
i did not like the class,Negative


Train a 3-class model

In [0]:
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression

# Converting Spark DF -> Pandas
train_pdf = train_df.toPandas()

#  sklearn Logistic Regression
sk_pipeline = Pipeline([
    ("tfidf", TfidfVectorizer()),
    ("clf", LogisticRegression(max_iter=200))
])

# Train 3-class model: Positive / Neutral / Negative
sk_pipeline.fit(train_pdf["Comment"], train_pdf["Sentiment"])

print("Model trained on small labeled dataset.")
display(train_pdf)

üèÉ View run bouncy-carp-224 at: https://adb-1809393011858999.19.azuredatabricks.net/ml/experiments/3197278153789376/runs/3ef816578524491e9945a067cb8178c6
üß™ View experiment at: https://adb-1809393011858999.19.azuredatabricks.net/ml/experiments/3197278153789376
Model trained on small labeled dataset.


Comment,Sentiment
i felt great and calm,Positive
instructor was very supportive,Positive
session was good and relaxing,Positive
it was okay overall,Neutral
i felt fine,Neutral
nothing special,Neutral
i was stressed and confused,Negative
room was too crowded and loud,Negative
i did not like the class,Negative


In [0]:
from pyspark.sql.functions import to_date, weekofyear, month, col, when, year

df_enriched = (
    df_text_clean
    .withColumn("feedback_date", to_date("Feedback_Date"))
    .withColumn("EmotionalDelta", col("Emotion_After") - col("Emotion_Before"))
    .withColumn("YearNumber", year("feedback_date"))
    .withColumn("WeekNumber", weekofyear("feedback_date"))
    .withColumn("MonthNumber", month("feedback_date"))
    .withColumn(
        "ClassTag",
        when(col("Question") == "safety", "Intensity")
        .when(col("Question") == "connection", "Community-building")
        .otherwise("Clarity")
    )
)
display(df_enriched)

PractitionerID,SessionID,Question,Emotion_Before,Emotion_After,Comment_Cleaned,feedback_date,Comment_Normalized,EmotionalDelta,YearNumber,WeekNumber,MonthNumber,ClassTag
P001,S001,safety,3,7,Felt much safer after the session,2025-01-05,felt much safer after the session,4,2025,1,1,Intensity
P001,S001,connection,4,8,Loved the group energy,2025-01-05,loved the group energy,4,2025,1,1,Community-building
P001,S002,clarity,5,6,Instructions were clear,2025-01-12,instructions were clear,1,2025,2,1,Clarity
P001,S002,safety,2,7,Very calming and grounding,2025-01-12,very calming and grounding,5,2025,2,1,Intensity
P002,S003,connection,6,6,Neutral experience,2025-01-15,neutral experience,0,2025,3,1,Community-building
P002,S003,clarity,5,4,A bit confusing at times,2025-01-15,a bit confusing at times,-1,2025,3,1,Clarity
P002,S004,safety,3,8,Felt supported and safe,2025-01-20,felt supported and safe,5,2025,4,1,Intensity
P002,S004,connection,4,9,Strong sense of community,2025-01-20,strong sense of community,5,2025,4,1,Community-building
P001,S005,clarity,6,9,Everything clicked today,2025-01-22,everything clicked today,3,2025,4,1,Clarity
P001,S005,connection,5,8,Instructor was very present,2025-01-22,instructor was very present,3,2025,4,1,Community-building


In [0]:
output_gold_path = "dbfs:/mnt/gold-feedback/"

gold_df.write.mode("overwrite").parquet(output_gold_path)
print("Gold layer written to:", output_gold_path)


Creating  dimenions and facts 

In [0]:
%sql
-- Practitioner dimension
CREATE TABLE dbo.Practitioner_Dim (
    PractitionerKey INT IDENTITY(1,1) PRIMARY KEY,
    PractitionerID  VARCHAR(50) NOT NULL,
    ActiveFlag      BIT NOT NULL DEFAULT 1,
    CreatedDate     DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME()
);


In [0]:
%sql
-- Session dimension
CREATE TABLE dbo.Session_Dim (
    SessionKey   INT IDENTITY(1,1) PRIMARY KEY,
    SessionID    VARCHAR(50) NOT NULL,
    SessionDate  DATE        NOT NULL,
    CreatedDate  DATETIME2   NOT NULL DEFAULT SYSUTCDATETIME()
);


In [0]:
%sql
-- Survey Question dimension
CREATE TABLE dbo.SurveyQuestion_Dim (
    QuestionKey     INT IDENTITY(1,1) PRIMARY KEY,
    QuestionID      VARCHAR(50) NOT NULL,      -- e.g. safety, connectivity, clarity
    QuestionText    VARCHAR(200) NOT NULL,     -- full question wording
    QuestionCategory VARCHAR(100),             -- grouping once analytics grows
    ActiveFlag      BIT NOT NULL DEFAULT 1,
    CreatedDate     DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME()
);


In [0]:
%sql
-- Fact table
CREATE TABLE dbo.ClientFeedback_Fact (
    FeedbackKey        INT IDENTITY(1,1) PRIMARY KEY,
    PractitionerKey    INT NOT NULL,
    SessionKey         INT NOT NULL,
    QuestionKey        INT NOT NULL,    -- updated: FK instead of text
    LikertScore        TINYINT,
    EmotionBefore      VARCHAR(50),
    EmotionAfter       VARCHAR(50),
    EmotionalDelta     INT,
    Sentiment          VARCHAR(20),
    YearNumber         INT,
    WeekNumber         INT,
    MonthNumber        INT,
    ClassTag           VARCHAR(50),
    CreatedDate        DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME(),

    CONSTRAINT FK_ClientFeedback_Practitioner
        FOREIGN KEY (PractitionerKey) REFERENCES dbo.Practitioner_Dim(PractitionerKey),

    CONSTRAINT FK_ClientFeedback_Session
        FOREIGN KEY (SessionKey) REFERENCES dbo.Session_Dim(SessionKey),

    CONSTRAINT FK_ClientFeedback_Question
        FOREIGN KEY (QuestionKey) REFERENCES dbo.SurveyQuestion_Dim(QuestionKey)
);


In [0]:
gold_sample_df = gold_df.limit(10)
gold_sample_df.createOrReplaceTempView("GoldFeedbackSample")

display(gold_sample_df)


In [0]:
dim_pract = spark.read.jdbc(jdbc_url, "dbo.Practitioner_Dim", properties=connection_props)
dim_sess  = spark.read.jdbc(jdbc_url, "dbo.Session_Dim",        properties=connection_props)
dim_qn    = spark.read.jdbc(jdbc_url, "dbo.SurveyQuestion_Dim", properties=connection_props)

dim_pract.createOrReplaceTempView("Practitioner_Dim")
dim_sess.createOrReplaceTempView("Session_Dim")
dim_qn.createOrReplaceTempView("SurveyQuestion_Dim")


In [0]:
fact_sample_df = spark.sql("""
    SELECT
        p.PractitionerKey,
        s.SessionKey,
        q.QuestionKey,
        g.LikertScore,
        g.emotion_before AS EmotionBefore,
        g.emotion_after AS EmotionAfter,
        g.EmotionalDelta,
        g.Sentiment,
        g.YearNumber,
        g.WeekNumber,
        g.MonthNumber,
        g.ClassTag
    FROM GoldFeedbackSample g

    JOIN Practitioner_Dim p
        ON g.practitioner_id = p.PractitionerID

    JOIN Session_Dim s
        ON g.session_id = s.SessionID

    JOIN SurveyQuestion_Dim q
        ON lower(g.Question) = lower(q.QuestionID)
""")

display(fact_sample_df)


In [0]:
fact_sample_df.write.jdbc(
    url=jdbc_url,
    table="dbo.ClientFeedback_Fact",
    mode="append",
    properties=connection_props
)

print("Successfully loaded 10 sample rows into Fact table!")


Create Reporting Views

In [0]:
%sql
CREATE VIEW dbo.vw_EmotionalStateTrends AS
SELECT
    s.SessionDate,
    p.PractitionerID,
    f.YearNumber,
    f.WeekNumber,
    f.MonthNumber,
    AVG(f.EmotionalDelta) AS AvgEmotionalDelta
FROM dbo.ClientFeedback_Fact f
JOIN dbo.Session_Dim s
    ON f.SessionKey = s.SessionKey
JOIN dbo.Practitioner_Dim p
    ON f.PractitionerKey = p.PractitionerKey
GROUP BY
    s.SessionDate,
    p.PractitionerID,
    f.YearNumber,
    f.WeekNumber,
    f.MonthNumber;


In [0]:
%sql
CREATE VIEW dbo.vw_SentimentRollups AS
SELECT
    p.PractitionerID,
    f.Sentiment,
    COUNT(*) AS FeedbackCount,
    100.0 * COUNT(*) OVER (PARTITION BY p.PractitionerID)
        / SUM(COUNT(*)) OVER () AS SentimentPercentage
FROM dbo.ClientFeedback_Fact f
JOIN dbo.Practitioner_Dim p
    ON f.PractitionerKey = p.PractitionerKey
GROUP BY
    p.PractitionerID,
    f.Sentiment;


In [0]:
%sql
CREATE VIEW dbo.vw_PractitionerPublicImpact AS
SELECT
    p.PractitionerID,
    AVG(f.EmotionalDelta) AS AvgEmotionalDelta,
    SUM(CASE WHEN f.Sentiment = 'Positive' THEN 1 ELSE 0 END) * 1.0 
        / COUNT(*) AS PositiveShare
FROM dbo.ClientFeedback_Fact f
JOIN dbo.Practitioner_Dim p
    ON f.PractitionerKey = p.PractitionerKey
GROUP BY
    p.PractitionerID;
