In [1]:
from pyspark.sql import SparkSession

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("delta_app") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

print("Spark Started")


Spark Started


In [3]:
spark

In [4]:
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("myFiles/Mental_Health_and_Social_Media_Balance_Dataset.csv")


In [5]:
df.show(5)

+-------+---+------+----------------------+-------------------+------------------+-------------------------+------------------------+---------------------+---------------------+
|User_ID|Age|Gender|Daily_Screen_Time(hrs)|Sleep_Quality(1-10)|Stress_Level(1-10)|Days_Without_Social_Media|Exercise_Frequency(week)|Social_Media_Platform|Happiness_Index(1-10)|
+-------+---+------+----------------------+-------------------+------------------+-------------------------+------------------------+---------------------+---------------------+
|   U001| 44|  Male|                   3.1|                7.0|               6.0|                      2.0|                     5.0|             Facebook|                 10.0|
|   U002| 30| Other|                   5.1|                7.0|               8.0|                      5.0|                     3.0|             LinkedIn|                 10.0|
|   U003| 23| Other|                   7.4|                6.0|               7.0|                      1.0|  

In [6]:
df = df.withColumnRenamed("Daily_Screen_Time(hrs)", "Daily_Screen_Time") \
       .withColumnRenamed("Sleep_Quality(1-10)", "Sleep_Quality") \
       .withColumnRenamed("Stress_Level(1-10)", "Stress_Level") \
       .withColumnRenamed("Exercise_Frequency(week)", "Exercise_Frequency") \
       .withColumnRenamed("Happiness_Index(1-10)", "Happiness_Index")

In [7]:
df.write.format("delta").mode("overwrite").save("myDelta")


### Read Delta File

In [8]:
df_delta = spark.read.format("delta").load("myDelta")
df_delta.show()


+-------+---+------+-----------------+-------------+------------+-------------------------+------------------+---------------------+---------------+
|User_ID|Age|Gender|Daily_Screen_Time|Sleep_Quality|Stress_Level|Days_Without_Social_Media|Exercise_Frequency|Social_Media_Platform|Happiness_Index|
+-------+---+------+-----------------+-------------+------------+-------------------------+------------------+---------------------+---------------+
|   U001| 44|  Male|              3.1|          7.0|         6.0|                      2.0|               5.0|             Facebook|           10.0|
|   U002| 30| Other|              5.1|          7.0|         8.0|                      5.0|               3.0|             LinkedIn|           10.0|
|   U003| 23| Other|              7.4|          6.0|         7.0|                      1.0|               3.0|              YouTube|            6.0|
|   U004| 36|Female|              5.7|          7.0|         8.0|                      1.0|               

### CREATE

In [10]:
df_delta.printSchema()


root
 |-- User_ID: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Daily_Screen_Time: double (nullable = true)
 |-- Sleep_Quality: double (nullable = true)
 |-- Stress_Level: double (nullable = true)
 |-- Days_Without_Social_Media: double (nullable = true)
 |-- Exercise_Frequency: double (nullable = true)
 |-- Social_Media_Platform: string (nullable = true)
 |-- Happiness_Index: double (nullable = true)



In [48]:
from pyspark.sql.types import IntegerType

df_new = df_delta.withColumn("Age", df_delta["Age"].cast(IntegerType())) ## Cast Column


In [32]:
from pyspark.sql import Row

new_rows = [
    Row(
        User_ID="U501",
        Age=25,  # must be integer
        Gender="Male",
        Daily_Screen_Time=2.0,
        Sleep_Quality=8.0,
        Stress_Level=7.0,
        Days_Without_Social_Media=1.0,
        Exercise_Frequency=1.0,
        Social_Media_Platform="Twitter",
        Happiness_Index=6.0
    )
]


In [33]:
df_new = spark.createDataFrame(new_rows, schema=df_delta.schema)

In [34]:
df_new.write.format("delta").mode("append").save("myDelta")

In [52]:
df_delta.count()

500

In [13]:
df_delta.show()

+-------+---+------+-----------------+-------------+------------+-------------------------+------------------+---------------------+---------------+
|User_ID|Age|Gender|Daily_Screen_Time|Sleep_Quality|Stress_Level|Days_Without_Social_Media|Exercise_Frequency|Social_Media_Platform|Happiness_Index|
+-------+---+------+-----------------+-------------+------------+-------------------------+------------------+---------------------+---------------+
|   U001| 44|  Male|              3.1|          7.0|         6.0|                      2.0|               5.0|             Facebook|           10.0|
|   U002| 30| Other|              5.1|          7.0|         8.0|                      5.0|               3.0|             LinkedIn|           10.0|
|   U003| 23| Other|              7.4|          6.0|         7.0|                      1.0|               3.0|              YouTube|            6.0|
|   U004| 36|Female|              5.7|          7.0|         8.0|                      1.0|               

In [26]:
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "myDelta")

# Example: update Happiness_Index where User_ID = 'U501'
delta_table.update(
    condition = "User_ID = 'U501'",
    set = { "Happiness_Index": "Happiness_Index + 1" }
)


In [37]:
df_delta.count()

500

In [19]:
from pyspark.sql.functions import col

df_delta.filter(col("User_ID") == "U501").show()


+-------+---+------+-----------------+-------------+------------+-------------------------+------------------+---------------------+---------------+
|User_ID|Age|Gender|Daily_Screen_Time|Sleep_Quality|Stress_Level|Days_Without_Social_Media|Exercise_Frequency|Social_Media_Platform|Happiness_Index|
+-------+---+------+-----------------+-------------+------------+-------------------------+------------------+---------------------+---------------+
|   U501| 25|  Male|              2.0|          8.0|         7.0|                      1.0|               1.0|              Twitter|            8.0|
+-------+---+------+-----------------+-------------+------------+-------------------------+------------------+---------------------+---------------+



In [36]:
# Example: delete rows where User_ID = 'U501'
delta_table.delete(condition = "User_ID = 'U501'")


In [54]:
from pyspark.sql import Row

new_data = [
    Row(User_ID="U501", Age=26, Gender="Male", Daily_Screen_Time=3.0, 
        Sleep_Quality=7.0, Stress_Level=5.0, Days_Without_Social_Media=2.0, 
        Exercise_Frequency=1.0, Social_Media_Platform="Twitter", Happiness_Index=2.0),
    
    Row(User_ID="U502", Age=22, Gender="Female", Daily_Screen_Time=4.0, 
        Sleep_Quality=8.0, Stress_Level=6.0, Days_Without_Social_Media=1.0, 
        Exercise_Frequency=2.0, Social_Media_Platform="Instagram", Happiness_Index=3.0)
]

df_new = spark.createDataFrame(new_data, schema=df_delta.schema)


In [44]:
from pyspark.sql.types import IntegerType

df_new = df_delta.withColumn("Age", df_delta["Age"].cast(IntegerType())) ## Cast Column

In [55]:
df_new.write.format("delta").mode("append").save("myDelta")

In [52]:
df_delta.show()

+-------+---+------+-----------------+-------------+------------+-------------------------+------------------+---------------------+---------------+
|User_ID|Age|Gender|Daily_Screen_Time|Sleep_Quality|Stress_Level|Days_Without_Social_Media|Exercise_Frequency|Social_Media_Platform|Happiness_Index|
+-------+---+------+-----------------+-------------+------------+-------------------------+------------------+---------------------+---------------+
|   U001| 44|  Male|              3.1|          7.0|         6.0|                      2.0|               5.0|             Facebook|           10.0|
|   U002| 30| Other|              5.1|          7.0|         8.0|                      5.0|               3.0|             LinkedIn|           10.0|
|   U003| 23| Other|              7.4|          6.0|         7.0|                      1.0|               3.0|              YouTube|            6.0|
|   U004| 36|Female|              5.7|          7.0|         8.0|                      1.0|               

In [28]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

# Define window to keep only the first occurrence of each User_ID
window_spec = Window.partitionBy("User_ID").orderBy("User_ID")

df_clean = df_delta.withColumn("rn", row_number().over(window_spec)) \
                   .filter("rn = 1") \
                   .drop("rn")


In [56]:
df_delta.count()

502

In [58]:
df_delta.select("Age").show(5)

+---+
|Age|
+---+
| 44|
| 30|
| 23|
| 36|
| 34|
+---+
only showing top 5 rows



In [59]:
df_delta.filter(col("User_ID")=="U502").show()

+-------+---+------+-----------------+-------------+------------+-------------------------+------------------+---------------------+---------------+
|User_ID|Age|Gender|Daily_Screen_Time|Sleep_Quality|Stress_Level|Days_Without_Social_Media|Exercise_Frequency|Social_Media_Platform|Happiness_Index|
+-------+---+------+-----------------+-------------+------------+-------------------------+------------------+---------------------+---------------+
|   U502| 22|Female|              4.0|          8.0|         6.0|                      1.0|               2.0|            Instagram|            3.0|
+-------+---+------+-----------------+-------------+------------+-------------------------+------------------+---------------------+---------------+



In [62]:
from pyspark.sql.functions import *

df_delta.groupBy(col("Gender")).agg(count("*").alias("count")).show()


+------+-----+
|Gender|count|
+------+-----+
|Female|  230|
| Other|   23|
|  Male|  249|
+------+-----+



In [63]:

df_delta.groupBy(col("Gender")).agg(avg("Happiness_Index")).show()


+------+--------------------+
|Gender|avg(Happiness_Index)|
+------+--------------------+
|Female|   8.343478260869565|
| Other|   8.782608695652174|
|  Male|    8.32128514056225|
+------+--------------------+



In [64]:
df_delta.groupBy("Gender").count().show()


+------+-----+
|Gender|count|
+------+-----+
|Female|  230|
| Other|   23|
|  Male|  249|
+------+-----+



### Window Function

In [69]:
from pyspark.sql.functions import count, col

df_gender_count = df_delta.groupBy("Gender").agg(count("*").alias("gender_count"))


In [82]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, count, col

# Window partition by Gender
window_spec = Window.partitionBy("Gender")

# Add gender_count and row_number using window
df_final = df_delta.withColumn(
    "gender_count",
    count("*").over(window_spec)
).withColumn(
    "rn",
    row_number().over(window_spec.orderBy(col("gender_count").asc()))
)

df_final.show()


+-------+---+------+-----------------+-------------+------------+-------------------------+------------------+---------------------+---------------+------------+---+
|User_ID|Age|Gender|Daily_Screen_Time|Sleep_Quality|Stress_Level|Days_Without_Social_Media|Exercise_Frequency|Social_Media_Platform|Happiness_Index|gender_count| rn|
+-------+---+------+-----------------+-------------+------------+-------------------------+------------------+---------------------+---------------+------------+---+
|   U004| 36|Female|              5.7|          7.0|         8.0|                      1.0|               1.0|               TikTok|            8.0|         230|  1|
|   U005| 34|Female|              7.0|          4.0|         7.0|                      5.0|               1.0|          X (Twitter)|            8.0|         230|  2|
|   U007| 26|Female|              7.8|          4.0|         8.0|                      2.0|               0.0|               TikTok|            7.0|         230|  3|
|   

502