In [1]:
import time
import threading
import numpy as np

import ntplib
import paho.mqtt.client as mqtt
from datetime import datetime, timezone, timedelta

import pymongo

import pickle

from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.ensemble import IsolationForest


ntp_time_client = ntplib.NTPClient()

In [2]:
publish_topic = "IOT/Server"
irrigation_topic = publish_topic + "/Irrigation"
anomaly_topic = publish_topic + "/Anomaly"
storm_topic = publish_topic + "/Storm"
publish_time = "IOT/Time/GlobalNetworkTime"
connect_path = "IOT/Connect"
ack_connect_path = "IOT/AckConnect"

In [3]:
with open('./model/svm_model.pkl', 'rb') as file:
    irr_model = pickle.load(file)

https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


In [4]:
class LocalClock():
    def __init__(self):
        self.last_ist_time = None
        self.local_time_at_prev_fetch = None
        
    def sync_local_clock(self, internet_time):
        self.last_ist_time = internet_time
        self.local_time_at_prev_fetch = int(time.time())
        
    def get_time(self):
        if self.last_ist_time == None:
            return False
        
        curr_time = int(time.time())
        return (curr_time - self.local_time_at_prev_fetch) + self.last_ist_time

In [5]:
def mongodb_upload(data_string):
    client = pymongo.MongoClient("mongodb+srv://user:abcd1234@cluster0.zvpuicf.mongodb.net/iot_assgn")
    db = client["iot_assgn"]
    collection = db["sensor_data"]
    
    device_id = data_string.split(" :: ")[-1]
    index = data_string.split(" :: ")[-2]
    time_stamp = data_string.split(" :: ")[-3]
    
    air_temperature, air_moisture, water_depth, soil_moisture,\
    soil_ph, solar_radiation, wind_speed, wind_direction = \
        data_string.split(" :: ")[0].replace('[','').replace(']','').replace(',','').split(' ')

    data = {
        "device_id" : device_id,
        "time_stamp" : time_stamp,
        "air_temperature" : air_temperature,
        "air_moisture" : air_moisture,
        "water_depth" : water_depth,
        "soil_moisture" : soil_moisture,
        "soil_ph" : soil_ph,
        "solar_radiation" : solar_radiation,
        "wind_speed" : wind_speed,
        "wind_direction" : wind_direction
    }

    result = collection.insert_one(data)
    print("Inserted document ID:", result.inserted_id)

In [6]:
def download_and_inference():
    client = pymongo.MongoClient("mongodb+srv://user:abcd1234@cluster0.zvpuicf.mongodb.net/iot_assgn")
    db = client["iot_assgn"]
    collection = db["sensor_data"]
    
    latest_feature = collection.find_one(sort=[("time_stamp", pymongo.DESCENDING)])

    features = np.array([float(each.replace('(','').replace(')','')) for each in list(latest_feature.values())[3:11]])
    features = features.reshape(1, -1)
    
    data = collection.find().sort("time_stamp", pymongo.DESCENDING).limit(50)
    
    data_list = [np.array([float(each.replace('(','').replace(')','')) for each in list(item.values())[3:11]]) for item in data]

    wind = [x[6] for x in data_list[:10]]
    storm = (sum(wind)/len(wind) >= 65)
    
    scaler = StandardScaler()
    scaled_data = scaler.fit_transform(data_list)

    pca = PCA(n_components=2)
    reduced_data = pca.fit_transform(scaled_data)

    anomaly_model = IsolationForest(contamination=0.05)
    anomaly_model.fit(reduced_data)

    anomalies = anomaly_model.predict(reduced_data)

    otpt = {
        "anomaly" : (anomalies[-1] == -1),
        "irrigation" : irr_model.predict(features),
        "storm" : storm
    }
    
    return otpt

In [7]:
def get_ist_time():
    # response = ntp_time_client.request('pool.ntp.org', version=3)
    response = ntp_time_client.request('time.google.com', version=3)
    ist_time = int((datetime.fromtimestamp(response.tx_time, timezone.utc) + timedelta(hours=5, minutes=30)).timestamp())
    print("Fetched IST time!")
    return ist_time

def check_dead_devices():
    global clock, devices_ids_dict
    print("Checking for inactive devices")
    dead = []
    for dev in list(devices_ids_dict.keys()):
        if clock.get_time() - devices_ids_dict[dev] > 45:
            dead.append(dev)
    for dev in dead:
        del devices_ids_dict[dev]
        print(f"Device {dev} is inactive")

In [8]:
def thread_15_seconds():
    global clock, client, devices_ids_dict
    sync = 0
    prev_time = int(time.time())
    curr_time = int(time.time())
    while True:
        curr_time = int(time.time())
        if curr_time - prev_time > 15:
            if(sync == 2):
                print("45")
                sync = 0
                try:
                    internet_time = get_ist_time()
                    clock.sync_local_clock(internet_time)
                except:
                    pass
                client.publish(publish_time, str(clock.get_time()))
                check_dead_devices()
                prev_time = curr_time
            else:
                sync+=1
                print("15")
                prev_time = curr_time
                client.publish(publish_time, str(clock.get_time()))
                otpt = download_and_inference()
                irrigation_msg = f"{otpt['irrigation']} :: {str(clock.get_time())}"
                anomaly_msg = f"{otpt['anomaly']} :: {str(clock.get_time())}"
                storm_msg = f"{otpt['storm']} :: {str(clock.get_time())}"
                client.publish(irrigation_topic, irrigation_msg)
                client.publish(anomaly_topic, anomaly_msg)
                client.publish(storm_topic, storm_msg)
                
        time.sleep(1)

In [9]:
def onboard_new_device(message):
    global devices_ids_dict
    device_id = message.split(' ')[-1]
    
    if device_id not in devices_ids_dict: # Check if device is a new connection
        client.publish(publish_time, str(clock.get_time()))
        devices_ids_dict[device_id] = clock.get_time()
        client.subscribe(f"IOT/Data/{device_id}")
        client.publish(ack_connect_path, f"Welcome {device_id}")
        time.sleep(2)
        print(f"Recognized device {device_id} at {clock.get_time()}")

In [None]:
broker_address = "192.168.138.238"
broker_port = 1883  

def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe(connect_path)
 
def on_message(client, userdata, msg):
    global clock, devices_ids_dict
    message = msg.payload.decode()
    print("Received message: " + message + f" at {clock.get_time()} in {msg.topic}") 
    if msg.topic == connect_path:
        if(message.split(' ')[0] == "HIPC"):
            onboard_new_device(message)
    if msg.topic.split('/')[1] == 'Data':
        dev = msg.topic.split('/')[2]
        devices_ids_dict[dev] = clock.get_time()
        message = message + " :: " + dev 
        mongodb_upload(message)

        
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1)
devices_ids_dict = {}

client.on_connect = on_connect
client.on_message = on_message
client.connect(broker_address, broker_port, 60)

clock = LocalClock()
internet_time = get_ist_time()
clock.sync_local_clock(internet_time)

client.publish(publish_time, clock.get_time())

threading.Thread(target=thread_15_seconds).start()

client.loop_forever()