In [None]:
import asyncio
import nest_asyncio
import logging
from aiocoap import *
from aiocoap.numbers.codes import Code
import aiocoap.resource as resource
import paho.mqtt.client as mqtt
from influxdb_client import InfluxDBClient
from influxdb_client import Point
from datetime import datetime
import sys
import os
import numpy as np
import pandas as pd
import requests
import json
import telebot

RED = '\033[91m'
GREEN = '\033[92m'
BLUE = '\033[94m'
YELLOW = '\033[93m'
RESET = '\033[0m'

file_path = 'C:/Users/User/Desktop/UNI/MAGISTRALE/IoT/PROGETTO/conf.py'
directory_path = os.path.dirname(file_path)
sys.path.append(directory_path)
import conf

def server_print(protocol, message):
    prot = ['MQTT', 'CoAP', 'HTTP']
    col = ['\033[94m', '\033[92m', '\033[93m']
    now = datetime.now().replace(microsecond=0)
    if protocol in prot:
        i = prot.index(protocol)
        print(f'[{col[i]}{now} - {protocol}{RESET}] | {message}')
    else:
        print(f'[{now}-{protocol}] | {message}')

Server_IP = '192.168.1.9'
CoaPort = 5684

MQTTPort = 1883
alert_topic = "pump1/alert"
sampling_topic = "pump1/sampling"
mqtt_client_id = 'PC_ema'
mqtt_username = 'proxy'
mqtt_password = 'emanuele'

clientINFL = InfluxDBClient(url=conf.InfluxURL, token=conf.token, org=conf.org)
write_api = clientINFL.write_api()
query_api = clientINFL.query_api()

plants_df = pd.read_excel('../plants.xlsx')

Sens_IDs = plants_df['Sensor ID'].tolist()
Pump_IDs = plants_df['Pump ID'].tolist()
resources = ['moisture']

class CallBackFunctions(resource.Resource):
    def __init__(self):
        super().__init__()

        self.put_response_p = 'OK'
        self.put_response_n = 'NO'
        
    
    async def render_put(self, request):
        self.put_payload = int(request.payload)
        path = str(request.get_request_uri()).split('/')
        ID, topic = path[-2:]
        p = Point(ID).field(topic, self.put_payload).tag("Sensor ID", ID)
        write_api.write(bucket=conf.bucket,org=conf.org,record=p)
        stampa = f'PUT REQUEST from {RED}{ID}{RESET} on topic {RED}{topic}{RESET}: {self.put_payload}%'
        server_print('CoAP', stampa)
        response = Message(code=Code.CHANGED, payload= self.put_response_p.encode('utf-8'))
        return response

async def check_time():
    target_time = '20:32'
    target_hour, target_minute = map(int, target_time.split(':'))
    time = 60
    while True:
        now = datetime.now()
        current_hour = now.hour
        current_minute = now.minute
        if current_hour == 0:
            time = 60
        
        if (current_hour > target_hour) or (current_hour == target_hour and current_minute >= target_minute):
            time = (60-target_minute)*60 + (24 - (target_hour + 1))*3600
            stampa = 'Time to Irrigate!'
            server_print('ServerApp', stampa)
            await execute_procedure()
        await asyncio.sleep(time)

async def execute_procedure():
    messages = []
    try:
        get_req = requests.get('http://pump1/Water')
    except Exception as ewa:
        get_water_fail = True
        available_water = 0
        message = 'FAILED to fetch water availability'
        messages.append(message)
        server_print('HTTP', message)
    else:
        get_water_fail = False
        response = get_req.json()
        available_water = response['Water']
        stampa = f'Available water in the cistern: {available_water} cm^3'
        server_print('HTTP', stampa)

    city_water_tot = 0
    my_water_tot = 0
    stampa = 'Start Evaluation Procedure'
    server_print('ServerApp', stampa)
    for i, sensor in enumerate(Sens_IDs):
        print(f'Sensor under analysis: {RED}{sensor}{RESET}')
        to_water = plants_df.loc[i, 'To Be Watered']
        scheduled_irrigation = plants_df.loc[i, 'Scheduled Irrigation']
        if to_water == True and scheduled_irrigation <= datetime.now():
            try:
                query = f'''
                from(bucket: "{conf.bucket}")
                |> range(start: -1d)
                |> filter(fn: (r) => r._measurement == "sens1")
                |> filter(fn: (r) => r["Sensor ID"] == "{sensor}")
                |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
                '''
                DF = clientINFL.query_api().query_data_frame(org=conf.org, query=query)
            except Exception as InfluxError:
                message = 'FAILED to fetch Influx data for ' + sensor
                messages.append(message)
                last_moisture = np.nan
                server_print('HTTP', message)
                server_print('HTTP', InfluxError)
            else:  
                if len(DF) >= 1:
                    last_moisture = DF.tail(1)['moisture'].values[0]
                else:
                    message = 'No Influx data are available for '+sensor
                    messages.append(message)
                    server_print('HTTP', message)
                    last_moisture = np.nan
                    
                if np.isnan(last_moisture) == False:
                    roots_D = plants_df.loc[i, 'Roots Depth [m]']
                    roots_A = plants_df.loc[i, 'Roots Area [m^2]']
                    roots_V = roots_D * roots_A
                    max_moisture = plants_df.loc[i, 'Max-Moisture [%]']
                    
                    water_need = roots_V * (100**2) * (max_moisture - last_moisture)
                    print(f'The plant needs to watered:\n  - current soil moisture: {GREEN}{last_moisture}%{RESET}\n  - water requirements: {BLUE}{water_need}cm^3{RESET}')
                    if water_need <= available_water:
                        plant_mywater = water_need
                        my_water_tot = my_water_tot + plant_mywater
                        city_water = 0
                        print(f'The plant will be irrigated with your water')
                        headers = {"Content-Type": "application/json"}
                        data = {"water": water_need}
                        url = 'http://'+ plants_df.loc[i, 'Pump ID'] + '.local/irrigation'
                        try:
                            stampa = f'Executing PUT request at {url}'
                            server_print('HTTP', stampa)
                            put_req = requests.put(url, headers=headers, data=json.dumps(data))
                        except Exception as PUTerror:
                            message = 'FAILED tu execute PUT request for sensor '+ sensor
                            messages.append(message)
                            server_print('HTTP', message)
                        else:
                            stampa = f'The plant has been watered'
                            server_print('HTTP', stampa)
                            to_water = False
                            available_water = available_water - water_need
                    elif available_water < water_need and available_water > 0:
                        available_water = 0
                        plant_mywater = water_need - available_water
                        my_water_tot = my_water_tot + plant_my_water
                        city_water = water_need - plant_mywater
                        city_water_tot = city_water_tot + city_water
                    else:
                        plant_mywater = 0
                        city_water = water_need
                        city_water_tot = city_water_tot + city_water
                        
                    plants_df.loc[i, 'To Be Watered'] = to_water
                    plants_df.loc[i, 'Last Irrigation'] = pd.Timestamp(datetime.now())
                    plants_df.loc[i, 'Water [cm^3] - R'] = plant_mywater
                    plants_df.loc[i, 'City Water [m^3] - R'] = city_water/(100**3)
                    message = sensor + ' | Irrigation Status: PASS'
                    messages.append(message)

    server_print('ServerApp', 'Updating Plants Specification')
    plants_df.to_excel('../plants.xlsx', index=False)
    message_f1 = 'My Water usage: ' + str(np.round(my_water_tot, 2)) + 'cm^3'
    message_f2 = 'City Water usage: ' + str(np.round(city_water_tot, 2)) + 'cm^3'
    messages.append(message_f1)
    messages.append(message_f2)

    bot = telebot.TeleBot(conf.Telegram_TOKEN)
    date = datetime.now().date()
    file_path = 'Irrigation_' + str(date) +'.txt'
    with open(file_path, 'w') as file:
        for message in messages:
            file.write(message + "\n")

    try:
        with open(file_path, 'rb') as file:
            bot.send_document(chat_id=conf.Telegram_CHAT_ID, document=file)
    except Exception as Telegram_error:
        server_print('HTTP', Telegram_error)
    else:
        server_print('ServerApp', 'Sent data to the user via Telegram Bot')
        os.remove(file_path)

def on_connect(mqtt_client, userdata, flags, reason_code, properties):
    if reason_code == 0:
        server_print('MQTT', f'Connected to the Broker')
        mqtt_client.subscribe(alert_topic)
        server_print('MQTT', f'Subscription to topic: {alert_topic}')
        mqtt_client.subscribe(sampling_topic)
        server_print('MQTT', f'Subscription to topic: {sampling_topic}')

        for sensor in Sens_IDs:
            topic = sensor + "/sleep_time"
            mqtt_client.subscribe(topic)
            server_print('MQTT', f'Subscription to topic: {topic}')
        
    else:
        server_print('MQTT', f'\n{RED}FAILED{RESET} to connect, return code:\n{reason_code}\n')

def on_message(mqtt_client, userdata, msg):
    topic = msg.topic
    if topic == alert_topic:
        value = float(msg.payload.decode('utf-8'))
        server_print('MQTT', f'New Message on {RED}{alert_topic}{RESET}: {value}%')
        if value == 100:
            bot = telebot.TeleBot(conf.Telegram_TOKEN)
            message_text = "Reservoir reached the maximum limit!"
            bot.send_message(chat_id=conf.Telegram_CHAT_ID, text=message_text)
    if topic == sampling_topic:
        sampling_p = float(msg.payload.decode('utf-8'))
        server_print('MQTT', f'New Message on {RED}{sampling_topic}{RESET}: {sampling_p} [s]')

    if topic[-10:] == 'sleep_time':
        sleep_time = float(msg.payload.decode('utf-8'))
        server_print('MQTT', f'New sampling period for {YELLOW}{topic[:-11]}{RESET}: {sleep_time} [s]')

async def MQTT_loop():
    mqtt_client = mqtt.Client(client_id= mqtt_client_id, callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
    mqtt_client.username_pw_set(mqtt_username, mqtt_password)
    mqtt_client.on_connect = on_connect
    mqtt_client.on_message = on_message
    try: 
        mqtt_client.connect(Server_IP, MQTTPort)
    except Exception as mqtt_error:
        server_print('MQTT', f'{RED}ERROR{RESET}: {mqtt_error}')
    else:
        server_print('MQTT', f'CONNECTED to the Broker')
        mqtt_client.loop_start()
        while True:
            await asyncio.sleep(1)


async def main():
    root = resource.Site()
    for ID in Sens_IDs:
        for res in resources:
            root.add_resource([ID, res], CallBackFunctions())
    
    await Context.create_server_context(root, bind=(Server_IP, CoaPort))
    
    date_time_task = asyncio.create_task(check_time())
    mqtt_task = asyncio.create_task(MQTT_loop())

    await mqtt_task
    await date_time_task
    await asyncio.get_running_loop().create_future()

if __name__ == "__main__":
    import nest_asyncio
    nest_asyncio.apply()
    asyncio.run(main())