In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('sandBox').getOrCreate()

In [2]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window

In [3]:
import pandas as pd
import numpy as np

In [4]:
df = pd.DataFrame({
    'F_SESSION':[123,123,123,123,123,123,123,123,123,123,123,123,123,123,123,123,123,456,456,456,456,456,456,456,456,456,456,456,456,456,456,456,456],
    'K_FRAME_RANK':[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20],
    'delta_power':[-10,-10,10,20,30,30,30,-20,-20,20,10,10,10,0,0,0,0,10,10,10,0,20,30,30,30,-10,-10,-10,-50,-50,-50,10,20],
})

In [5]:
df_all = spark.createDataFrame(df.sample(df.shape[0]))

In [6]:
def custom_sum(input_list: list):
    l = []
    for i in range(len(input_list)):
        if i == 0:
            l.append(min(max(input_list[i],0),60))
        else:
            l.append(min(max(input_list[i]+l[-1],0),60))
    return ' '.join([str(x) for x in l])

udf_custom_sum = F.udf(custom_sum, T.StringType())

In [7]:
df_all = df_all.withColumn('FRAME_ID',F.concat_ws('_',F.col('F_SESSION'),F.col('K_FRAME_RANK')))

In [8]:
df_agg = df_all.orderBy(['F_SESSION', 'K_FRAME_RANK'],ascending=True).groupby('F_SESSION').agg(
    F.collect_list(F.col('FRAME_ID')).alias('FRAME_ID'),
    F.collect_list(F.col('delta_power')).alias('delta_power')
)

In [9]:
df_agg = df_agg.withColumn('delta_power_trunc',udf_custom_sum(F.col('delta_power')))

In [10]:
df_delta_power = df_agg.select(
    F.col('F_SESSION'),
    F.explode(F.split(F.col("delta_power_trunc"), " ")).alias("delta_power_trunc")
)
df_delta_power = df_delta_power.withColumn('original_order', F.monotonically_increasing_id())
df_delta_power = df_delta_power.withColumn('row_num', F.row_number().over(Window.orderBy('original_order')))

df_frame_id= df_agg.select(
    F.col('F_SESSION'),
    F.explode(F.col('FRAME_ID')).alias("FRAME_ID")
)
df_frame_id = df_frame_id.withColumn('original_order', F.monotonically_increasing_id())
df_frame_id = df_frame_id.withColumn('row_num', F.row_number().over(Window.orderBy('original_order')))

In [11]:
df_res = df_frame_id.select('row_num','FRAME_ID').join(
    df_delta_power.select('row_num','delta_power_trunc'),
    on=['row_num'],
    how='left'
)

In [12]:
df_all_merge = df_all.join(df_res, on=['FRAME_ID'], how='left')

In [15]:
df_all_merge = df_all_merge.withColumn('delta_power_trunc',F.col('delta_power_trunc').cast(T.FloatType()))
df_all_merge = df_all_merge.orderBy(['F_SESSION', 'K_FRAME_RANK'],ascending=True)

In [16]:
df_all_merge.toPandas()

Unnamed: 0,FRAME_ID,F_SESSION,K_FRAME_RANK,delta_power,row_num,delta_power_trunc
0,123_1,123,1,-10,1,0.0
1,123_2,123,2,-10,2,0.0
2,123_3,123,3,10,3,10.0
3,123_4,123,4,20,4,30.0
4,123_5,123,5,30,5,60.0
5,123_6,123,6,30,6,60.0
6,123_7,123,7,30,7,60.0
7,123_8,123,8,-20,8,40.0
8,123_9,123,9,-20,9,20.0
9,123_10,123,10,20,10,40.0
