In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=7c0e80b68cdc15024fafc6fb3952c8b8e86800c95193bfb6d845ef3f48548fc5
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,window,when
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,DoubleType,TimestampType

spark=SparkSession.builder.appName("EnergyConsumptionData").getOrCreate()

schema=StructType([
      StructField("timestamp",TimestampType(),True),
      StructField("device_id",StringType(),True),
      StructField("energy_consumed",DoubleType(),True)
  ])

def process_stream(spark, schema):
  df=spark.read.option("header","true").csv("/content/energy_consumption_data.csv",inferSchema=True)
  return df

df=process_stream(spark,schema)
df.show()

+-------------------+---------+---------------+
|          timestamp|device_id|energy_consumed|
+-------------------+---------+---------------+
|2024-08-26 12:00:00| device_1|            8.5|
|2024-08-26 11:05:00| device_2|            6.2|
|2024-08-26 12:10:00| device_3|            7.1|
|2024-08-26 10:15:00| device_4|           12.0|
|2024-08-26 18:20:00| device_1|            9.0|
|2024-08-26 12:25:00| device_2|            6.5|
|2024-08-26 12:30:00| device_3|            7.4|
|2024-08-26 20:35:00| device_4|            2.0|
|2024-08-26 12:40:00| device_1|            9.1|
|2024-08-26 23:45:00| device_2|           25.0|
|2024-08-26 12:50:00| device_3|            7.6|
|2024-08-26 22:55:00| device_4|            7.3|
|2024-08-26 13:00:00| device_1|            8.9|
|2024-08-26 13:05:00| device_2|            6.1|
|2024-08-26 13:10:00| device_3|            7.0|
|2024-08-26 18:15:00| device_4|            3.0|
|2024-08-26 13:20:00| device_1|           50.0|
|2024-08-26 13:25:00| device_2|         

In [10]:
# Detecting Anomalies
def detect_anomalies(df, threshold=10):
  df_with_anomalies=df.withColumn("anomaly",when(col("energy_consumed")> threshold,1).otherwise(0))
  anomalies_df=df_with_anomalies.filter(col("anomaly")==1)
  return anomalies_df

anomalies_df=detect_anomalies(df,threshold=10)
anomalies_df.show()

# Save the detected anomalies to a new csv file
anomalies_df.write.mode("overwrite").csv("/content/anomalies_detected.csv",header=True)
print("Detected anomalies saved to new csv file")

+-------------------+---------+---------------+-------+
|          timestamp|device_id|energy_consumed|anomaly|
+-------------------+---------+---------------+-------+
|2024-08-26 10:15:00| device_4|           12.0|      1|
|2024-08-26 23:45:00| device_2|           25.0|      1|
|2024-08-26 13:20:00| device_1|           50.0|      1|
+-------------------+---------+---------------+-------+

Detected anomalies saved to new csv file
