<pre>
Table: Activity

+----------------+---------+
| Column Name    | Type    |
+----------------+---------+
| machine_id     | int     |
| process_id     | int     |
| activity_type  | enum    |
| timestamp      | float   |
+----------------+---------+
The table shows the user activities for a factory website.
(machine_id, process_id, activity_type) is the primary key (combination of columns with unique values) of this table.
machine_id is the ID of a machine.
process_id is the ID of a process running on the machine with ID machine_id.
activity_type is an ENUM (category) of type ('start', 'end').
timestamp is a float representing the current time in seconds.
'start' means the machine starts the process at the given timestamp and 'end' means the machine ends the process at the given timestamp.
The 'start' timestamp will always be before the 'end' timestamp for every (machine_id, process_id) pair.
It is guaranteed that each (machine_id, process_id) pair has a 'start' and 'end' timestamp.
 

There is a factory website that has several machines each running the same number of processes. Write a solution to find the average time each machine takes to complete a process.

The time to complete a process is the 'end' timestamp minus the 'start' timestamp. The average time is calculated by the total time to complete every process on the machine divided by the number of processes that were run.

The resulting table should have the machine_id along with the average time as processing_time, which should be rounded to 3 decimal places.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Activity table:
+------------+------------+---------------+-----------+
| machine_id | process_id | activity_type | timestamp |
+------------+------------+---------------+-----------+
| 0          | 0          | start         | 0.712     |
| 0          | 0          | end           | 1.520     |
| 0          | 1          | start         | 3.140     |
| 0          | 1          | end           | 4.120     |
| 1          | 0          | start         | 0.550     |
| 1          | 0          | end           | 1.550     |
| 1          | 1          | start         | 0.430     |
| 1          | 1          | end           | 1.420     |
| 2          | 0          | start         | 4.100     |
| 2          | 0          | end           | 4.512     |
| 2          | 1          | start         | 2.500     |
| 2          | 1          | end           | 5.000     |
+------------+------------+---------------+-----------+
Output: 
+------------+-----------------+
| machine_id | processing_time |
+------------+-----------------+
| 0          | 0.894           |
| 1          | 0.995           |
| 2          | 1.456           |
+------------+-----------------+
Explanation: 
There are 3 machines running 2 processes each.
Machine 0's average time is ((1.520 - 0.712) + (4.120 - 3.140)) / 2 = 0.894
Machine 1's average time is ((1.550 - 0.550) + (1.420 - 0.430)) / 2 = 0.995
Machine 2's average time is ((4.512 - 4.100) + (5.000 - 2.500)) / 2 = 1.456
</pre>

In [0]:
spark

In [0]:
# importing pyspark sql functions
from pyspark.sql.functions import *

# importing sql types from pyspark
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType, IntegerType, DateType, FloatType

# importing SparkSession
from pyspark.sql import SparkSession


In [0]:
# creating spark session and providing app name
spark = SparkSession.builder.appName("leetcode-top-50-sql-solution-with-pyspark").getOrCreate()

In [0]:
# creating Schema
# Define the schema for the Activity table
activity_schema = StructType([
    StructField("machine_id", IntegerType(), nullable=False),
    StructField("process_id", IntegerType(), nullable=False),
    StructField("activity_type", StringType(), nullable=False),
    StructField("timestamp", FloatType(), nullable=False)
])





In [0]:

activity_df = spark.createDataFrame([
    (0, 0, "start", 0.712),
    (0, 0, "end", 1.520),
    (0, 1, "start", 3.140),
    (0, 1, "end", 4.120),
    (1, 0, "start", 0.550),
    (1, 0, "end", 1.550),
    (1, 1, "start", 0.430),
    (1, 1, "end", 1.420),
    (2, 0, "start", 4.100),
    (2, 0, "end", 4.512),
    (2, 1, "start", 2.500),
    (2, 1, "end", 5.000)
], schema=activity_schema)






In [0]:
activity_df.display()

machine_id,process_id,activity_type,timestamp
0,0,start,0.712
0,0,end,1.52
0,1,start,3.14
0,1,end,4.12
1,0,start,0.55
1,0,end,1.55
1,1,start,0.43
1,1,end,1.42
2,0,start,4.1
2,0,end,4.512


In [0]:
# Leetcode Solution in Spark SQL
# Creating Temporary view for the product dataframe for sql queries
activity_df.createOrReplaceTempView('activity')
sql_result = spark.sql(
    '''
    SELECT
    tab_one.machine_id,
    ROUND(AVG(tab_two.timestamp - tab_one.timestamp),3) AS processing_time
    FROM activity as tab_one
    JOIN activity as tab_two
    ON tab_one.machine_id = tab_two.machine_id
    AND tab_one.process_id = tab_two.process_id
    AND tab_one.activity_type = 'start'
    AND tab_two.activity_type = 'end'
    GROUP BY tab_one.machine_id
    ORDER BY tab_one.machine_id;
    
    '''
)

# Displaying Result
sql_result.display()

machine_id,processing_time
0,0.894
1,0.995
2,1.456


In [0]:
# Creating dataframe based n actictivity type Start and End
start_df = activity_df.filter(col("activity_type") == "start").select(
    col("machine_id"),
    col("process_id"),
    col("timestamp").alias("start_timestamp")
)

end_df = activity_df.filter(col("activity_type") == "end").select(
    col("machine_id"),
    col("process_id"),
    col("timestamp").alias("end_timestamp")
)

# Join start and end DataFrames on (machine_id, process_id)
joined_df = start_df.join(end_df, on=["machine_id", "process_id"], how="inner")

# Calculate processing time for each process
joined_df = joined_df.withColumn("processing_time", col("end_timestamp") - col("start_timestamp"))

# Group by machine_id and calculate average processing time
filter_result = joined_df.groupBy("machine_id") \
    .agg(round(avg("processing_time"), 3).alias("processing_time"))

# Show the result
filter_result.show()

+----------+---------------+
|machine_id|processing_time|
+----------+---------------+
|         1|          0.995|
|         2|          1.456|
|         0|          0.894|
+----------+---------------+

