In [1]:
%run 03_HDFS_Data_lake.ipynb
%run 05_utils.ipynb
%run 11_Email_Notification_and_Report.ipynb
import findspark
findspark.init() 
import datetime
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, min, max, round
# from pyspark import SparkConf, SparkContext
import csv
import os 

In [2]:
spark = SparkSession.builder.appName("GreenhouseSensorData").getOrCreate()

In [3]:
now = datetime.now()

hdfs_path = "hdfs://localhost:9000/Sensors_DL/{}/{}/day_{}.csv".format(now.year, now.month, now.day)
csv_f = spark.read.csv(hdfs_path, header=False , inferSchema=True)
column_names = ["sensor_id","timestamp", "sensor_type", "value", "location"]
df = csv_f.toDF(*column_names)
df.sort("location","sensor_id").show()

+---------+--------------------+-----------+------+--------+
|sensor_id|           timestamp|sensor_type| value|location|
+---------+--------------------+-----------+------+--------+
|    CO2_1|2025-06-23 15:26:...|        CO2|1160.0|   GH_1 |
|    CO2_1|2025-06-23 15:25:...|        CO2| 701.0|   GH_1 |
|    CO2_1|2025-06-23 15:24:...|        CO2|1799.0|   GH_1 |
|    CO2_1|2025-06-23 15:27:...|        CO2| 699.0|   GH_1 |
|    CO2_1|2025-06-23 15:25:...|        CO2|1119.0|   GH_1 |
|    CO2_1|2025-06-23 15:27:...|        CO2|1146.0|   GH_1 |
|    CO2_1|2025-06-23 15:27:...|        CO2| 966.0|   GH_1 |
|    CO2_1|2025-06-23 15:29:...|        CO2|1345.0|   GH_1 |
|    CO2_1|2025-06-23 15:31:...|        CO2| 565.0|   GH_1 |
|    CO2_1|2025-06-23 15:31:...|        CO2|1699.0|   GH_1 |
|    CO2_1|2025-06-23 15:31:...|        CO2| 949.0|   GH_1 |
|    CO2_1|2025-06-23 15:33:...|        CO2| 595.0|   GH_1 |
|    CO2_1|2025-06-23 15:33:...|        CO2| 885.0|   GH_1 |
|    CO2_1|2025-06-23 15

In [4]:
avg_readings = df.groupBy("sensor_id","location","sensor_type")\
.agg(
    round(avg("value"),2).alias("avg_value"),
    min("value").alias("min_value"),
    max("value").alias("max_value")
).sort("sensor_id","location","sensor_type")
avg_readings.show()

+---------+--------+-------------+---------+---------+---------+
|sensor_id|location|  sensor_type|avg_value|min_value|max_value|
+---------+--------+-------------+---------+---------+---------+
|    CO2_1|   GH_1 |          CO2|  1073.65|    565.0|   1799.0|
|    CO2_2|   GH_2 |          CO2|  1091.42|    549.0|   1731.0|
|    CO2_3|   GH_3 |          CO2|   1194.8|    417.0|   1669.0|
|      H_1|   GH_1 |     Humidity|     49.8|     20.3|     91.5|
|      H_2|   GH_2 |     Humidity|    67.18|     31.7|     92.8|
|      H_3|   GH_3 |     Humidity|    53.27|     25.6|     75.3|
|      L_1|   GH_1 |        Light|  1287.18|   1209.0|   1339.0|
|      L_2|   GH_2 |        Light|  1332.92|   1281.0|   1381.0|
|      L_3|   GH_3 |        Light|   1286.5|   1213.0|   1397.0|
|     PH_1|   GH_1 |      Soil_pH|     6.88|      3.2|      9.7|
|     PH_2|   GH_2 |      Soil_pH|     6.24|      3.7|      9.6|
|     PH_3|   GH_3 |      Soil_pH|      6.4|      3.1|      8.3|
|     SM_1|   GH_1 |Soil_

In [5]:
report = ""
counter = 0
for row in avg_readings.toLocalIterator():
    counter = counter + 1
    line = f"{counter}) The Average {row.sensor_type} level in {row.location}during the day is ----> {row.avg_value}\n" 
    report = report + line
    if counter % 3 == 0:
         report = report + "--"*40 +"|\n"
     

In [6]:
print(f"This is your daily repory for {now.year}-{now.month}-{now.day}")
print(report)

This is your daily repory for 2025-6-23
1) The Average CO2 level in GH_1 during the day is ----> 1073.65
2) The Average CO2 level in GH_2 during the day is ----> 1091.42
3) The Average CO2 level in GH_3 during the day is ----> 1194.8
--------------------------------------------------------------------------------|
4) The Average Humidity level in GH_1 during the day is ----> 49.8
5) The Average Humidity level in GH_2 during the day is ----> 67.18
6) The Average Humidity level in GH_3 during the day is ----> 53.27
--------------------------------------------------------------------------------|
7) The Average Light level in GH_1 during the day is ----> 1287.18
8) The Average Light level in GH_2 during the day is ----> 1332.92
9) The Average Light level in GH_3 during the day is ----> 1286.5
--------------------------------------------------------------------------------|
10) The Average Soil_pH level in GH_1 during the day is ----> 6.88
11) The Average Soil_pH level in GH_2 during the d

In [7]:

subject = f""" Greenhouse Performance Report - [{now.year}-{now.month}-{now.day}] """

body = f"""
Dear [Recipient Name/Team],

This email provides an automated report on the performance of the greenhouse system .

Key Performance Indicators (KPIs):

{report}


This report is automatically generated by the greenhouse monitoring system. For any questions or concerns, please contact [Contact Person/Support Team].

Sincerely,

[Greenhouse Monitoring System]"""

In [8]:
send_email(sender_email, password, reciver_email, subject, body)

In [9]:
spark.stop()