In [2]:
%run 03_HDFS_Data_lake.ipynb
%run 05_utils.ipynb
%run 11_Email_Notification_and_Report.ipynb

import datetime
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, min, max, round
# from pyspark import SparkConf, SparkContext
import csv
import os 



In [3]:
spark = SparkSession.builder.appName("GreenhouseSensorData").getOrCreate()

In [3]:
now = datetime.now()

hdfs_path = "hdfs://localhost:9000/Sensors_DL/{}/{}/day_{}.csv".format(now.year, now.month, now.day)
csv_f = spark.read.csv(hdfs_path, header=False , inferSchema=True)
column_names = ["sensor_id","timestamp", "sensor_type", "value", "location"]
df = csv_f.toDF(*column_names)
df.sort("location","sensor_id").show()

+---------+--------------------+-------------+------+--------+
|sensor_id|           timestamp|  sensor_type| value|location|
+---------+--------------------+-------------+------+--------+
|    CO2_1|2025-06-25 18:48:...|          CO2| 672.0|   GH_1 |
|     PH_1|2025-06-25 18:48:...|      Soil_pH|   7.0|   GH_1 |
|     SM_1|2025-06-25 18:48:...|Soil_Moisture|  72.0|   GH_1 |
|     SM_1|2025-06-25 18:48:...|Soil_Moisture|  96.9|   GH_1 |
|    CO2_2|2025-06-25 18:48:...|          CO2|1467.0|   GH_2 |
|      T_2|2025-06-25 18:48:...|  Temperature|  32.9|   GH_2 |
|      H_3|2025-06-25 18:48:...|     Humidity|  22.8|   GH_3 |
|      H_3|2025-06-25 18:48:...|     Humidity|  78.3|   GH_3 |
|      L_3|2025-06-25 18:48:...|        Light|1200.0|   GH_3 |
|     SM_3|2025-06-25 18:48:...|Soil_Moisture|  79.2|   GH_3 |
|     SM_3|2025-06-25 18:48:...|Soil_Moisture|  81.8|   GH_3 |
|      T_3|2025-06-25 18:48:...|  Temperature| 27.06|   GH_3 |
+---------+--------------------+-------------+------+--

In [4]:
avg_readings = df.groupBy("sensor_id","location","sensor_type")\
.agg(
    round(avg("value"),2).alias("avg_value"),
    min("value").alias("min_value"),
    max("value").alias("max_value")
).sort("sensor_id","location","sensor_type")
avg_readings.show()

+---------+--------+-------------+---------+---------+---------+
|sensor_id|location|  sensor_type|avg_value|min_value|max_value|
+---------+--------+-------------+---------+---------+---------+
|    CO2_1|   GH_1 |          CO2|    672.0|    672.0|    672.0|
|    CO2_2|   GH_2 |          CO2|   1467.0|   1467.0|   1467.0|
|      H_3|   GH_3 |     Humidity|    50.55|     22.8|     78.3|
|      L_3|   GH_3 |        Light|   1200.0|   1200.0|   1200.0|
|     PH_1|   GH_1 |      Soil_pH|      7.0|      7.0|      7.0|
|     SM_1|   GH_1 |Soil_Moisture|    84.45|     72.0|     96.9|
|     SM_3|   GH_3 |Soil_Moisture|     80.5|     79.2|     81.8|
|      T_2|   GH_2 |  Temperature|     32.9|     32.9|     32.9|
|      T_3|   GH_3 |  Temperature|    27.06|    27.06|    27.06|
+---------+--------+-------------+---------+---------+---------+



In [5]:
report = ""
counter = 0
for row in avg_readings.toLocalIterator():
    counter = counter + 1
    line = f"{counter}) The Average {row.sensor_type} level in {row.location}during the day is ----> {row.avg_value}\n" 
    report = report + line
    if counter % 3 == 0:
         report = report + "--"*40 +"|\n"
     

In [6]:
print(f"This is your daily repory for {now.year}-{now.month}-{now.day}")
print(report)

This is your daily repory for 2025-6-25
1) The Average CO2 level in GH_1 during the day is ----> 672.0
2) The Average CO2 level in GH_2 during the day is ----> 1467.0
3) The Average Humidity level in GH_3 during the day is ----> 50.55
--------------------------------------------------------------------------------|
4) The Average Light level in GH_3 during the day is ----> 1200.0
5) The Average Soil_pH level in GH_1 during the day is ----> 7.0
6) The Average Soil_Moisture level in GH_1 during the day is ----> 84.45
--------------------------------------------------------------------------------|
7) The Average Soil_Moisture level in GH_3 during the day is ----> 80.5
8) The Average Temperature level in GH_2 during the day is ----> 32.9
9) The Average Temperature level in GH_3 during the day is ----> 27.06
--------------------------------------------------------------------------------|



In [7]:

subject = f""" Greenhouse Performance Report - [{now.year}-{now.month}-{now.day}] """

body = f"""
Dear [Recipient Name/Team],

This email provides an automated report on the performance of the greenhouse system .

Key Performance Indicators (KPIs):

{report}


This report is automatically generated by the greenhouse monitoring system. For any questions or concerns, please contact [Contact Person/Support Team].

Sincerely,

[Greenhouse Monitoring System]"""

In [8]:
send_email(sender_email, password, reciver_email, subject, body)

In [9]:
spark.stop()