## Import Libraries and Start Spark Session

In [0]:
from pyspark.sql import SparkSession, DataFrame, functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import col, year, month, dayofmonth, hour, rand, from_unixtime, date_format, floor, expr
from pyspark.sql.types import DoubleType
from functools import reduce
import math

### Read data from S3 bucket and extract the year, month, day and hour

In [0]:
# Adjust the S3 path to match the root of your partitioned data
s3_path = "s3a://computeractivity/"

app_change = spark.read.option("basePath", s3_path).json(s3_path + "activity_type=app_change/year=*/month=*/day=*/hour=*")
idle_time = spark.read.option("basePath", s3_path).json(s3_path + "activity_type=idle_time/year=*/month=*/day=*/hour=*")
mouse_click = spark.read.option("basePath", s3_path).json(s3_path + "activity_type=mouse_click/year=*/month=*/day=*/hour=*")
mouse_movement = spark.read.option("basePath", s3_path).json(s3_path + "activity_type=mouse_movement/year=*/month=*/day=*/hour=*")
word_completed = spark.read.option("basePath", s3_path).json(s3_path + "activity_type=word_completed/year=*/month=*/day=*/hour=*")


In [0]:
mouse_click = mouse_click.withColumn("minute", date_format(from_unixtime(col("timestamp")), "mm"))
mouse_movement = mouse_movement.withColumn("minute", date_format(from_unixtime(col("timestamp")), "mm"))

mouse_click = mouse_click.withColumn("timestamp", date_format(from_unixtime(col("timestamp")), "yyyy-MM-dd'T'HH:mm:ss.SSSSSS"))
mouse_movement= mouse_movement.withColumn("timestamp", date_format(from_unixtime(col("timestamp")), "yyyy-MM-dd'T'HH:mm:ss.SSSSSS"))

In [0]:
app_change.show(truncate=False)

### Create an App Change dataframe of just the app change movements

In [0]:
app_counts = app_change.groupBy("app_name").count()
app_counts.show()

In [0]:
words_by_date = word_completed.groupBy("year", "month", "day", "hour", "minute").agg(
    F.count("*").alias("words_typed"),
    F.avg("word_length").alias("avg_word_length")
)
words_by_date.show()

## Calculate the Distance and Angle of the mouse movements 

In [0]:
# Angle threshold for detecting direction changes
ANGLE_CHANGE_THRESHOLD = 70  # Degrees
MIN_DISTANCE_THRESHOLD = 50  # Minimum distance to consider as movement

# Calculate distance traveled for each row
def calculate_distance(x1, y1, x2, y2):
    if x1 is None or y1 is None or x2 is None or y2 is None:
        return 0.0
    return math.sqrt((x2 - x1)**2 + (y2 - y1)**2)

def calculate_angle(x1, y1, x2, y2, x3, y3):
    if None in (x1, y1, x2, y2, x3, y3):
        return 0.0
    # Vectors (x1, y1) -> (x2, y2) and (x2, y2) -> (x3, y3)
    v1x, v1y = x2 - x1, y2 - y1
    v2x, v2y = x3 - x2, y3 - y2
    # Calculate magnitudes
    mag1 = math.sqrt(v1x**2 + v1y**2)
    mag2 = math.sqrt(v2x**2 + v2y**2)
    if mag1 < MIN_DISTANCE_THRESHOLD or mag2 < MIN_DISTANCE_THRESHOLD:
        return 0.0  # Ignore small movements
    # Calculate the cosine of the angle
    dot_product = v1x * v2x + v1y * v2y
    cos_theta = dot_product / (mag1 * mag2)
    # Clamp cos_theta to avoid domain errors
    cos_theta = max(-1, min(1, cos_theta))
    # Convert to angle in degrees
    angle = math.degrees(math.acos(cos_theta))
    return angle

# UDF to calculate distance
distance_udf = F.udf(calculate_distance, DoubleType())
# UDF for angle calculation
angle_udf = F.udf(calculate_angle, DoubleType())

# Lag to get previous coordinates
window_spec = Window.partitionBy("year", "month", "day", "hour", "minute").orderBy("timestamp")
mouse_movement = mouse_movement.withColumn("x_prev", F.lag("x", 1).over(window_spec))
mouse_movement = mouse_movement.withColumn("y_prev", F.lag("y", 1).over(window_spec))
mouse_movement = mouse_movement.withColumn("x_next", F.lead("x", 1).over(window_spec))
mouse_movement = mouse_movement.withColumn("y_next", F.lead("y", 1).over(window_spec))

mouse_movement = mouse_movement.withColumn("distance", distance_udf("x", "y", "x_prev", "y_prev"))
mouse_movement = mouse_movement.withColumn("angle", angle_udf("x_prev", "y_prev", "x", "y", "x_next", "y_next"))

# Calculate direction changes by comparing consecutive angles
mouse_movement = mouse_movement.withColumn("angle_prev", F.lag("angle").over(window_spec))

# Detect direction change based on angle exceeding threshold
mouse_movement = mouse_movement.withColumn("direction_change", F.when(F.col("angle") > ANGLE_CHANGE_THRESHOLD, 1).otherwise(0))

# Aggregation to calculate total distance and direction changes by minute

In [0]:
result = mouse_movement.groupBy("year", "month", "day", "hour", "minute").agg(
    F.sum("distance").alias("total_distance"),
    F.sum("direction_change").alias("direction_changes"),
    F.sum(F.when(F.col("activity_type") == "word_completed",1).otherwise(0)).alias("total_words"),
    F.avg(F.when(F.col("activity_type") == "word_completed",F.col("word_length")).otherwise(None)).alias("avg_word_length"),
    F.sum(F.when(F.col("activity_type") == "mouse_click", 1).otherwise(0)).alias("clicks_count")
)

result = result.withColumn(
                "date",
                F.to_date(F.concat(F.col('year'), F.lit('-'), F.col('month'), F.lit('-'), F.col('day')), "yyyy-MM-dd")
)

result.show()

print(f'Number of rows in groupby = {result.count()}')

## Activity on each app

#### Left join the mouse movements onto the app_change dataset when the mouse movement timestamp is in the range of app_changes

In [0]:
app_change = app_change.withColumn("timestamp_end", F.lead("timestamp").over(Window.orderBy('timestamp')))

mouse_movement = mouse_movement.alias("mouse_movement")
app_change = app_change.alias("app_change")

app_change_mouse_movement = app_change.join(mouse_movement,
                                             (F.col("mouse_movement.timestamp") >= F.col("app_change.timestamp")) &
                                             (F.col("mouse_movement.timestamp") < F.col("app_change.timestamp_end")),
                                             how="left")


In [0]:
mouse_movement.printSchema()

In [0]:
print(f'Number of rows of app_change = {app_change.count()}')
print(f'Number of rows of mouse_movements = {mouse_movement.count()}')
print(f'Number of rows of app_change_mouse_movements = {app_change_mouse_movement.count()}')

In [0]:
app_change_mouse_movement.printSchema()

In [0]:
app_change_mouse_movement = app_change_mouse_movement.withColumn("timestamp_start_unix", F.unix_timestamp("app_change.timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSSSSS"))
app_change_mouse_movement = app_change_mouse_movement.withColumn("timestamp_end_unix", F.unix_timestamp("app_change.timestamp_end", "yyyy-MM-dd'T'HH:mm:ss.SSSSSS"))

app_change_mouse_movement = app_change_mouse_movement.withColumn("time_diff_secs", F.col('timestamp_end_unix') - F.col('timestamp_start_unix'))

# If you want the difference in a more readable format, like hours, minutes, etc.
app_change_mouse_movement = app_change_mouse_movement.withColumn("time_diff_mins", F.col("time_diff_secs") / 60)  # For minutes
app_change_mouse_movement = app_change_mouse_movement.withColumn("time_diff_hours", F.col("time_diff_secs") / 3600)

app_change_mouse_movement = app_change_mouse_movement.withColumn("timestamp_range", F.concat(F.col("app_change.timestamp"), F.lit(" to "), F.col("app_change.timestamp_end")))

In [0]:
app_change_mouse_movement.select("app_change.app_name" ,"timestamp_range", "time_diff_secs", "time_diff_mins","mouse_movement.timestamp", "distance").show(truncate=False)

In [0]:
app_change_timeperiod_groupby = app_change_mouse_movement.groupBy("app_change.app_name", "timestamp_range").agg(
    F.count("*").alias("move_count"),
    F.first("time_diff_secs").alias("seconds"),
    F.sum("distance").alias("total_distance"),
    (F.sum("distance") / F.when(F.first("time_diff_secs") != 0, F.first("time_diff_secs")).otherwise(1)).alias("speed"),
    (F.count("*") / F.when(F.first("time_diff_secs") != 0, F.first("time_diff_secs")).otherwise(1)).alias("moves_per_sec")
)

In [0]:
app_change_timeperiod_groupby.show(truncate=False)

In [0]:
app_change_groupby = app_change_timeperiod_groupby.groupBy("app_change.app_name").agg(
                        F.sum("seconds").alias("total_time"),
                        F.sum("move_count").alias("total_moves"),
                        F.sum("total_distance").alias("total_distance"),
                        F.avg("moves_per_sec").alias("moves_per_sec"),
                        F.avg("speed").alias("speed")

)

In [0]:
app_change_groupby.show(truncate=False)

## Then send the result to to a Table that can be read by a dashboard

In [0]:
result.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable("mouse_activity_summary")
