In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [None]:


# Create a Spark session
spark = SparkSession.builder.appName("MyApp").getOrCreate()

# Assuming 'dframe' is a PySpark DataFrame
# If not, you can read your data into a PySpark DataFrame using spark.read.parquet or another appropriate method

# Assuming 'id' is a list of location values
lst = []
for location in id:
    # Filter PySpark DataFrame based on location
    df = dframe.filter(dframe['sensor'] == location)

    # Convert 'Clock' to timestamp and filter based on the date
    df = df.withColumn('Clock', F.to_timestamp(df['Clock']))
    df = df.filter(df['Clock'] >= '2022-11-18 00:00:00')

    # Filter rows with 0 voltage or current
    filtered_df = df.filter(
        ((df['R_Voltage'] == 0) | (df['Y_Voltage'] == 0) | (df['B_Voltage'] == 0)) &
        ((df['R_Current'] == 0) | (df['Y_Current'] == 0) | (df['B_Current'] == 0))
    )

    # Add 'Kwh' column and set its value to 0 for the filtered rows
    filtered_df = filtered_df.withColumn('Kwh', F.lit(0))

    # Update original DataFrame with modified rows
    df = df.join(filtered_df.select('Clock', 'Kwh'), 'Clock', 'left_outer')
    df = df.withColumn('Kwh', F.when(df['Kwh'].isNotNull(), df['Kwh']).otherwise(df['Kwh']))

    # Aggregate and resample data
    df = df.select('Clock', 'Kwh').groupBy(F.window('Clock', '1 hour')).agg(F.sum('Kwh').alias('Kwh'))
    
    # Calculate rolling mean
    windowSpec = Window.orderBy('window.start').rowsBetween(-23, 0)
    df = df.withColumn('Kwh_r', F.avg('Kwh').over(windowSpec))
    
    # Drop rows with missing values
    df = df.na.drop()

    lst.append(df)

# Stop the Spark session
spark.stop()
