In [0]:
[Total Utilization Time - DataLemur](https://datalemur.com/questions/total-utilization-time)


In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
from pyspark.sql.functions import to_timestamp

# Initialize Spark session

# Define the schema
schema = StructType([
    StructField("server_id", IntegerType(), True),
    StructField("session_status", StringType(), True),
    StructField("status_time", StringType(), True)  # initially as string to parse later
])

# Raw data as list of tuples
data = [
    (1, "start", "08/02/2022 10:00:00"),
    (1, "stop", "08/04/2022 10:00:00"),
    (1, "stop", "08/13/2022 19:00:00"),
    (1, "start", "08/13/2022 10:00:00"),
    (3, "stop", "08/19/2022 10:00:00"),
    (3, "start", "08/18/2022 10:00:00"),
    (5, "stop", "08/19/2022 10:00:00"),
    (4, "stop", "08/19/2022 14:00:00"),
    (4, "start", "08/16/2022 10:00:00"),
    (3, "stop", "08/14/2022 10:00:00"),
    (3, "start", "08/06/2022 10:00:00"),
    (2, "stop", "08/24/2022 10:00:00"),
    (2, "start", "08/17/2022 10:00:00"),
    (5, "start", "08/14/2022 21:00:00")
]

# Create DataFrame with string timestamps
df = spark.createDataFrame(data, schema=schema)

# Convert status_time string to timestamp type with the given format MM/dd/yyyy HH:mm:ss
df = df.withColumn("status_time", to_timestamp("status_time", "MM/dd/yyyy HH:mm:ss"))

# Show the DataFrame
df.show(truncate=False)

In [0]:
df1=df.sort("server_id","status_time")
df1.show()

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, sum as spark_sum, floor, datediff

# Window partitioned by server_id, ordered by status_time
w = Window.partitionBy("server_id").orderBy("status_time")

# Identify start/stop pairs and calculate uptime in days
df2 = df1.withColumn("prev_status", lag("session_status").over(w)) \
         .withColumn("prev_time", lag("status_time").over(w)) \
         .where((col("session_status") == "stop") & (col("prev_status") == "start")) \
         .withColumn("uptime_days", datediff(col("status_time"), col("prev_time")))

# Sum all uptime days across all servers
result = df2.agg(spark_sum("uptime_days").alias("total_uptime_days"))

display(result)