In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=748db435b3f4c4bfc8d5ab9adf0247f846546f472dd182f666c703e450db7d06
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create Spark session
spark = SparkSession.builder.appName("EnvironmentalData").getOrCreate()


In [3]:
environment_stream = spark.read.option("header", "true").csv("/content/environmental_data.csv")


In [4]:
environment_stream = environment_stream.withColumn("PM25", col("PM25").cast("float")) \
    .withColumn("PM10", col("PM10").cast("float")) \
    .withColumn("CO2", col("CO2").cast("float")) \
    .withColumn("Temperature", col("Temperature").cast("float")) \
    .withColumn("Humidity", col("Humidity").cast("float")) \
    .withColumn("WindSpeed", col("WindSpeed").cast("float"))

In [13]:
# Define threshold values for anomalies
PM25_THRESHOLD = 15.0
PM10_THRESHOLD = 20.0
CO2_THRESHOLD = 350.0
TEMP_THRESHOLD = 25.0
HUMIDITY_THRESHOLD = 50.0
WIND_THRESHOLD = 10.0

# Function to detect anomalies
def detect_anomalies(df):
    return df.filter(
        (col("PM25") > PM25_THRESHOLD) |
        (col("PM10") > PM10_THRESHOLD) |
        (col("CO2") > CO2_THRESHOLD) |
        (col("Temperature") > TEMP_THRESHOLD) |
        (col("Humidity") > HUMIDITY_THRESHOLD) |
        (col("WindSpeed") > WIND_THRESHOLD)
    )

# Apply anomaly detection to the streaming data
anomalies = detect_anomalies(environment_stream)

anomalies.show()

# save the anomalies detected to a new csv file
anomalies.write.mode("overwrite").csv("detected_anomalies.csv", header=True)
print("Detected Anomalies saved to new csv file")

+--------+-------------------+----+----+-----+-----------+--------+---------+
|SensorID|           DateTime|PM25|PM10|  CO2|Temperature|Humidity|WindSpeed|
+--------+-------------------+----+----+-----+-----------+--------+---------+
|       1|2024-09-01 08:00:00|12.5|18.3|400.0|       22.5|    60.0|     12.3|
|       1|2024-09-01 09:00:00|15.2|20.1|410.0|       23.1|    58.0|     11.7|
|       1|2024-09-01 10:00:00|13.0|19.5|405.0|       24.0|    57.0|     10.5|
|       2|2024-09-01 08:00:00|22.0|35.0|420.0|       25.0|    70.0|     14.8|
|       2|2024-09-01 09:00:00|23.5|36.5|430.0|       26.0|    69.0|     15.3|
|       2|2024-09-01 10:00:00|NULL|38.0|440.0|       27.0|    68.0|     15.0|
|       3|2024-09-01 08:00:00|18.5|25.0|395.0|       21.5|    55.0|      9.8|
|       3|2024-09-01 09:00:00|19.0|26.0|398.0|       22.0|    54.0|     10.2|
|       3|2024-09-01 10:00:00|17.8|25.5|400.0|       22.8|    53.0|     11.0|
+--------+-------------------+----+----+-----+-----------+------