In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("smokeDetecton") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4")\
    .config("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")\
    .getOrCreate()

spark.conf.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")



:: loading settings :: url = jar:file:/usr/local/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ec2-user/.ivy2/cache
The jars for the packages stored in: /home/ec2-user/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c949eca7-ccaa-465b-a6f9-5c5ca557849e;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 374ms :: artifacts dl 8ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.4 from central in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	-----------------------------

In [2]:
# Path to the downloaded file
file_path = "/home/ec2-user/smoke_detection_iot.csv"

# Load the dataset into PySpark
raw_data = spark.read.csv(file_path, header=True, inferSchema=True)

# Drops the unnecessary first column
cleaned_data = raw_data.drop("_c0")

# Show the cleaned data
cleaned_data.show(5)
cleaned_data.printSchema()

                                                                                

+----------+--------------+-----------+---------+---------+------+-----------+-------------+-----+-----+-----+-----+-----+---+----------+
|       UTC|Temperature[C]|Humidity[%]|TVOC[ppb]|eCO2[ppm]|Raw H2|Raw Ethanol|Pressure[hPa]|PM1.0|PM2.5|NC0.5|NC1.0|NC2.5|CNT|Fire Alarm|
+----------+--------------+-----------+---------+---------+------+-----------+-------------+-----+-----+-----+-----+-----+---+----------+
|1654733331|          20.0|      57.36|        0|      400| 12306|      18520|      939.735|  0.0|  0.0|  0.0|  0.0|  0.0|  0|         0|
|1654733332|        20.015|      56.67|        0|      400| 12345|      18651|      939.744|  0.0|  0.0|  0.0|  0.0|  0.0|  1|         0|
|1654733333|        20.029|      55.96|        0|      400| 12374|      18764|      939.738|  0.0|  0.0|  0.0|  0.0|  0.0|  2|         0|
|1654733334|        20.044|      55.28|        0|      400| 12390|      18849|      939.736|  0.0|  0.0|  0.0|  0.0|  0.0|  3|         0|
|1654733335|        20.059|      5

In [3]:
pip install pandas

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [4]:
from pyspark.sql.functions import from_unixtime, month, year
import pandas as pd

# Convert UTC to timestamp format
data_with_datetime = cleaned_data.withColumn("Timestamp", from_unixtime("UTC"))

# Extract Year and Month from the Timestamp column
data_with_datetime = data_with_datetime.withColumn("Year", year("Timestamp")) \
                                         .withColumn("Month", month("Timestamp"))

# Convert the PySpark DataFrame to a Pandas DataFrame for better presentation
pandas_data = data_with_datetime.toPandas()

# Display the data in a clear format using Pandas
print("\nTransformed Data (with Year and Month columns):")
print(pandas_data.head())


                                                                                


Transformed Data (with Year and Month columns):
          UTC  Temperature[C]  Humidity[%]  TVOC[ppb]  eCO2[ppm]  Raw H2  \
0  1654733331          20.000        57.36          0        400   12306   
1  1654733332          20.015        56.67          0        400   12345   
2  1654733333          20.029        55.96          0        400   12374   
3  1654733334          20.044        55.28          0        400   12390   
4  1654733335          20.059        54.69          0        400   12403   

   Raw Ethanol  Pressure[hPa]  PM1.0  PM2.5  NC0.5  NC1.0  NC2.5  CNT  \
0        18520        939.735    0.0    0.0    0.0    0.0    0.0    0   
1        18651        939.744    0.0    0.0    0.0    0.0    0.0    1   
2        18764        939.738    0.0    0.0    0.0    0.0    0.0    2   
3        18849        939.736    0.0    0.0    0.0    0.0    0.0    3   
4        18921        939.744    0.0    0.0    0.0    0.0    0.0    4   

   Fire Alarm            Timestamp  Year  Month  
0    

In [5]:
# Rename columns to remove spaces or special characters (if needed)
data_with_datetime = data_with_datetime.withColumnRenamed("Raw H2", "Raw_H2") \
                                       .withColumnRenamed("Raw Ethanol", "Raw_Ethanol") \
                                       .withColumnRenamed("TVOC[ppb]", "TVOC_ppb") \
                                       .withColumnRenamed("Pressure[hPa]", "Pressure_hPa") \
                                       .withColumnRenamed("PM2.5", "PM2_5") \
                                       .withColumnRenamed("Fire Alarm", "Fire_Alarm")


In [6]:
# Compute Total Raw Ethanol and TVOC per Month
from pyspark.sql import functions as F

monthly_emissions = data_with_datetime.groupBy("Year", "Month") \
                                      .agg(
                                          F.sum("Raw_Ethanol").alias("Total_Raw_Ethanol"),
                                          F.sum("TVOC_ppb").alias("Total_TVOC")
                                      )
monthly_emissions.show()




+----+-----+-----------------+----------+
|Year|Month|Total_Raw_Ethanol|Total_TVOC|
+----+-----+-----------------+----------+
|2022|    6|       1237209173| 121631063|
+----+-----+-----------------+----------+



                                                                                

In [7]:
# Compute Average Temperature, Humidity, and Pressure per Month
monthly_environmental_trends = data_with_datetime.groupBy("Year", "Month") \
                                                 .agg(
                                                     F.avg("Temperature[C]").alias("Avg_Temperature"),
                                                     F.avg("Humidity[%]").alias("Avg_Humidity"),
                                                     F.avg("Pressure_hPa").alias("Avg_Pressure")
                                                 )
monthly_environmental_trends.show()





+----+-----+----------------+----------------+-----------------+
|Year|Month| Avg_Temperature|    Avg_Humidity|     Avg_Pressure|
+----+-----+----------------+----------------+-----------------+
|2022|    6|15.9704235829474|48.5394994411625|938.6276494651149|
+----+-----+----------------+----------------+-----------------+



                                                                                

In [8]:
# Top 10 Days with Highest TVOC Levels
top_10_tvoc_days = data_with_datetime.orderBy(F.desc("TVOC_ppb")).limit(10)
top_10_tvoc_days.show()

# Top 10 Days with Highest Raw Ethanol Levels
top_10_ethanol_days = data_with_datetime.orderBy(F.desc("Raw_Ethanol")).limit(10)
top_10_ethanol_days.show()


+----------+--------------+-----------+--------+---------+------+-----------+------------+------+------+-------+-------+-----+----+----------+-------------------+----+-----+
|       UTC|Temperature[C]|Humidity[%]|TVOC_ppb|eCO2[ppm]|Raw_H2|Raw_Ethanol|Pressure_hPa| PM1.0| PM2_5|  NC0.5|  NC1.0|NC2.5| CNT|Fire_Alarm|          Timestamp|Year|Month|
+----------+--------------+-----------+--------+---------+------+-----------+------------+------+------+-------+-------+-----+----+----------+-------------------+----+-----+
|1654717046|         43.81|       32.2|   60000|     1354| 12321|      18192|     936.858|  83.3| 86.54|  573.3| 89.399|2.019|4859|         0|2022-06-08 19:37:26|2022|    6|
|1654717055|          43.8|      32.29|   60000|     1480| 12256|      18096|     936.873|221.58|230.21|1525.05|237.813|5.371|4868|         0|2022-06-08 19:37:35|2022|    6|
|1654717047|         43.78|      32.38|   60000|     1363| 12318|      18181|     936.867| 69.22| 71.91| 476.39| 74.287|1.678|4860

In [9]:
# Compute Average PM2.5 per Month
avg_pm25_per_month = data_with_datetime.groupBy("Year", "Month") \
                                       .agg(F.avg("PM2_5").alias("Avg_PM2_5"))
avg_pm25_per_month.show()


+----+-----+------------------+
|Year|Month|         Avg_PM2_5|
+----+-----+------------------+
|2022|    6|184.46777023790509|
+----+-----+------------------+



In [10]:
# Compute Total Fire Alarms per Month
fire_alarm_count = data_with_datetime.groupBy("Year", "Month") \
                                     .agg(F.sum("Fire_Alarm").alias("Total_Fire_Alarms"))
fire_alarm_count.show()


+----+-----+-----------------+
|Year|Month|Total_Fire_Alarms|
+----+-----+-----------------+
|2022|    6|            44757|
+----+-----+-----------------+



In [11]:
# Compute Mean and Standard Deviation of PM2.5
pm25_stats = data_with_datetime.agg(
    F.mean("PM2_5").alias("Mean_PM2_5"),
    F.stddev("PM2_5").alias("StdDev_PM2_5")
).collect()

mean_pm25 = pm25_stats[0]["Mean_PM2_5"]
stddev_pm25 = pm25_stats[0]["StdDev_PM2_5"]

# Filter Outliers (greater than mean + 3*stddev)
outliers_pm25 = data_with_datetime.filter(
    F.col("PM2_5") > mean_pm25 + 3 * stddev_pm25
)
outliers_pm25.show()


+----------+--------------+-----------+--------+---------+------+-----------+------------+--------+--------+--------+---------+---------+---+----------+-------------------+----+-----+
|       UTC|Temperature[C]|Humidity[%]|TVOC_ppb|eCO2[ppm]|Raw_H2|Raw_Ethanol|Pressure_hPa|   PM1.0|   PM2_5|   NC0.5|    NC1.0|    NC2.5|CNT|Fire_Alarm|          Timestamp|Year|Month|
+----------+--------------+-----------+--------+---------+------+-----------+------------+--------+--------+--------+---------+---------+---+----------+-------------------+----+-----+
|1654903053|         22.11|      47.94|       0|     1405| 12961|      20049|     931.186| 7914.14| 9019.41| 52467.6| 9447.012|  965.387| 48|         1|2022-06-10 23:17:33|2022|    6|
|1654903054|         21.94|      50.24|       0|     1402| 12962|      20043|     931.195| 9608.86| 12702.7|59301.19| 13565.18| 2872.672| 49|         1|2022-06-10 23:17:34|2022|    6|
|1654903055|         21.81|      52.44|       0|     1448| 12942|      20036|   

In [12]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col


In [13]:

# Assemble environmental variables and pollutants into a vector
vector_col = "features"
assembler = VectorAssembler(
    inputCols=["Temperature[C]", "Humidity[%]", "TVOC_ppb", "Raw_Ethanol"], 
    outputCol=vector_col
)

# Transform data into vector format
data_vector = assembler.transform(data_with_datetime)

# Compute the correlation matrix for these features
correlation_matrix = Correlation.corr(data_vector, vector_col).head()[0]

# Display the correlation matrix
print(f"Correlation Matrix:\n{correlation_matrix}")


24/12/07 23:52:09 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


Correlation Matrix:
DenseMatrix([[ 1.        , -0.24398559,  0.08244183, -0.03734264],
             [-0.24398559,  1.        , -0.4888779 ,  0.06878211],
             [ 0.08244183, -0.4888779 ,  1.        , -0.67371463],
             [-0.03734264,  0.06878211, -0.67371463,  1.        ]])


                                                                                

In [13]:

# Convert the correlation matrix into a pandas DataFrame for easier manipulation
correlation_matrix_df = pd.DataFrame(correlation_matrix.toArray(), columns=["Temperature[C]", "Humidity[%]", "TVOC_ppb", "Raw_Ethanol"],
                                      index=["Temperature[C]", "Humidity[%]", "TVOC_ppb", "Raw_Ethanol"])

# Prepare the reshaped DataFrame with Feature 1, Feature 2, and Correlation Value
reshaped_data = []

# Loop through the DataFrame to extract the correlations between pairs of features
for feature1 in correlation_matrix_df.columns:
    for feature2 in correlation_matrix_df.index:
        reshaped_data.append({
            "Feature 1": feature1,
            "Feature 2": feature2,
            "Correlation Value": correlation_matrix_df.loc[feature2, feature1]
        })

# Convert the list of dictionaries into a DataFrame
correlation_matrix_reshaped_df = pd.DataFrame(reshaped_data)

In [14]:
# Correlate Fire Alarms with environmental conditions and chemical emissions
assembler_fire_alarm = VectorAssembler(
    inputCols=["Temperature[C]", "Humidity[%]", "TVOC_ppb", "Raw_Ethanol", "PM2_5"],
    outputCol="fire_alarm_features"
)

# Transform data
data_with_fire_alarm_features = assembler_fire_alarm.transform(data_with_datetime)

# Calculate correlation matrix
correlation_matrix_fire_alarm = Correlation.corr(data_with_fire_alarm_features, "fire_alarm_features").head()[0]

# Display the correlation matrix
print(f"Correlation Matrix with Fire Alarms:\n{correlation_matrix_fire_alarm}")


Correlation Matrix with Fire Alarms:
DenseMatrix([[ 1.        , -0.24398559,  0.08244183, -0.03734264,  0.03208418],
             [-0.24398559,  1.        , -0.4888779 ,  0.06878211, -0.17888169],
             [ 0.08244183, -0.4888779 ,  1.        , -0.67371463,  0.47742447],
             [-0.03734264,  0.06878211, -0.67371463,  1.        , -0.39319215],
             [ 0.03208418, -0.17888169,  0.47742447, -0.39319215,  1.        ]])


In [16]:
pip install s3fs

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [17]:
pip install fsspec s3fs

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [19]:
from pyspark.sql import functions as F
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
import pandas as pd

# Define the S3 path for processed data
s3_bucket = "s3a://smoke-detection-data-pipeline/task3Transformed/"

# Save Transformed Data (with Year and Month columns)
data_with_datetime.coalesce(1).write.csv(f"{s3_bucket}transformed_data/", header=True, mode="overwrite")

# Save Aggregated Monthly Emissions Data
monthly_emissions.write.csv(f"{s3_bucket}monthly_emissions/", header=True, mode="overwrite")

# Save Monthly Environmental Trends
monthly_environmental_trends.write.csv(f"{s3_bucket}monthly_environmental_trends/", header=True, mode="overwrite")

# Save Top 10 Days with Highest TVOC Levels
top_10_tvoc_days.write.csv(f"{s3_bucket}top_10_tvoc_days/", header=True, mode="overwrite")

# Save Top 10 Days with Highest Raw Ethanol Levels
top_10_ethanol_days.write.csv(f"{s3_bucket}top_10_ethanol_days/", header=True, mode="overwrite")

# Save Average PM2.5 Levels Per Month
avg_pm25_per_month.write.csv(f"{s3_bucket}avg_pm25_per_month/", header=True, mode="overwrite")

# Save Total Fire Alarms Per Month
fire_alarm_count.write.csv(f"{s3_bucket}fire_alarm_count/", header=True, mode="overwrite")

# Save PM2.5 Outliers Data
outliers_pm25.write.csv(f"{s3_bucket}pm25_outliers/", header=True, mode="overwrite")

# Convert Correlation Matrices to Pandas DataFrame
correlation_matrix_pd = pd.DataFrame(correlation_matrix.toArray())
correlation_matrix_fire_alarm_pd = pd.DataFrame(correlation_matrix_fire_alarm.toArray())

# Save Correlation Matrices as CSV Files in S3
correlation_matrix_pd.to_csv(f"{s3_bucket}corrMatrixEnvVar.csv", index=False, header=True, storage_options={"anon": False})
correlation_matrix_fire_alarm_pd.to_csv(f"{s3_bucket}corrMatrixFireAlarm.csv", index=False, header=True, storage_options={"anon": False})



24/12/05 22:57:52 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/12/05 22:57:53 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/12/05 22:57:55 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/12/05 22:57:55 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/12/05 22:57:57 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/12/05 22:57:57 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/12/05 22:57:58 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/12/05 22:57:59 WA

In [16]:
s3_bucket = "s3a://smoke-detection-data-pipeline/task3Transformed/"
correlation_matrix_reshaped_df.to_csv(f"{s3_bucket}correlation.csv", index=False, header=True, storage_options={"anon": False})


In [15]:
# Register the DataFrames as SQL views
data_with_datetime.createOrReplaceTempView("transformed_data")
monthly_emissions.createOrReplaceTempView("monthly_emissions")
monthly_environmental_trends.createOrReplaceTempView("monthly_environmental_trends")
top_10_tvoc_days.createOrReplaceTempView("top_10_tvoc_days")
top_10_ethanol_days.createOrReplaceTempView("top_10_ethanol_days")
fire_alarm_count.createOrReplaceTempView("fire_alarm_count")


In [16]:
query1 = """
SELECT DAY(FROM_UNIXTIME(UTC)) AS Day,
       SUM(TVOC_ppb) AS Total_TVOC,
       SUM(Raw_Ethanol) AS Total_Raw_Ethanol
FROM transformed_data
GROUP BY Day
ORDER BY Total_TVOC DESC
LIMIT 5
"""
highest_emissions_days = spark.sql(query1)
highest_emissions_days.show()


+---+----------+-----------------+
|Day|Total_TVOC|Total_Raw_Ethanol|
+---+----------+-----------------+
|  8|  40947898|        115780034|
| 13|  40947898|        115780034|
|  9|  39054608|        982481786|
| 10|    680659|         23167319|
+---+----------+-----------------+



In [17]:
query2 = """
SELECT DAY(FROM_UNIXTIME(UTC)) AS Day,
       AVG(`Temperature[C]`) AS Avg_Temperature,
       AVG(`Humidity[%]`) AS Avg_Humidity
FROM transformed_data
GROUP BY Day
ORDER BY Avg_Temperature DESC
"""
daily_temp_humidity = spark.sql(query2)
daily_temp_humidity.show()


+---+------------------+------------------+
|Day|   Avg_Temperature|      Avg_Humidity|
+---+------------------+------------------+
| 10|  34.2332235701906|23.258587521663777|
|  8|  33.7831023676881| 38.12003830083569|
|  9|14.629292450187956| 51.51767304153025|
| 13| 6.160041434540405| 38.12003830083566|
+---+------------------+------------------+



In [18]:
query3 = """
SELECT DAY(FROM_UNIXTIME(UTC)) AS Day,
       COUNT(Fire_Alarm) AS Total_Fire_Alarms
FROM transformed_data
WHERE Fire_Alarm = 1
GROUP BY Day
ORDER BY Total_Fire_Alarms DESC
LIMIT 5
"""
most_fire_alarms_days = spark.sql(query3)
most_fire_alarms_days.show()


+---+-----------------+
|Day|Total_Fire_Alarms|
+---+-----------------+
|  9|            43632|
| 10|             1121|
|  8|                2|
| 13|                2|
+---+-----------------+



In [19]:
query4 = """
SELECT DAY(FROM_UNIXTIME(UTC)) AS Day,
       AVG(PM2_5) AS Avg_PM2_5
FROM transformed_data
GROUP BY Day
ORDER BY Avg_PM2_5 DESC
LIMIT 5
"""
highest_pm25_days = spark.sql(query4)
highest_pm25_days.show()




+---+------------------+
|Day|         Avg_PM2_5|
+---+------------------+
| 10|2973.7339428076302|
|  8| 699.6721552924782|
| 13| 699.6721552924782|
|  9|1.6742770264863744|
+---+------------------+



                                                                                

In [20]:
query5 = """
SELECT DAY(FROM_UNIXTIME(UTC)) AS Day,
       AVG(`Temperature[C]`) AS Avg_Temperature,
       AVG(`Humidity[%]`) AS Avg_Humidity,
       AVG(PM2_5) AS Avg_PM2_5,
       COUNT(Fire_Alarm) AS Fire_Alarm_Count
FROM transformed_data
WHERE Fire_Alarm = 1
GROUP BY Day
ORDER BY Fire_Alarm_Count DESC
"""
fire_alarm_conditions = spark.sql(query5)
fire_alarm_conditions.show()


+---+------------------+-----------------+------------------+----------------+
|Day|   Avg_Temperature|     Avg_Humidity|         Avg_PM2_5|Fire_Alarm_Count|
+---+------------------+-----------------+------------------+----------------+
|  9|13.965375618811771| 51.5017766776682|1.8266909607627617|           43632|
| 10| 34.60313113291699|22.69262265834077|3059.8000267618245|            1121|
|  8|            27.295|            43.91|             2.335|               2|
| 13|           20.2225|            43.91|             2.335|               2|
+---+------------------+-----------------+------------------+----------------+

