# Simulating a sensor network

In [1]:
# Cloning the repository and installing dependencies (on google colab)
#!git clone https://github.com/alfkjartan/multithread_assesment.git multithread_assessment
#%cd multithread_assessment
#!pip3 install -r requirements.txt

### Add parent directory to path, to locate code 

In [2]:
import sys
sys.path.append("../") # add parent dir

### Imports

In [3]:
import numpy as np
import random
from datetime import datetime
from multiprocessing import Event as ProcessEvent
from multiprocessing import Process
from threading import Thread
from threading import Event as ThreadEvent
from functools import partial
from service.repository.logging import Logger
from service.repository.repository import Repository
from utils.network import Connection
from service.model.message import Message
from sensors.base_sensor import Sensor
from sensors.probe import *
#import matplotlib.pyplot as plt
%matplotlib notebook

## Settings

In [4]:
connection_type = ("socket", "shared_memory", "pipe")[0]
system_sensor_data = True # If True gets data from psutil, otherwise from random.randint
num_sensors = 6 # Only used if system_sensor_data is False
log_to_plot = False # Matplotlib in separate Process will not show
log_to_screen = False # Mostly for debug
log_to_list = True # If True, also logs to a list 
csv_logfile = datetime.now().strftime("sensorlog-%Y-%m-%d.csv")
if connection_type == "socket":
    host = '127.0.0.1'
    port = 33330


## Setting up the logger

In [5]:
logger = Logger()
logger.add_repository(Repository.csv_repository(csv_logfile))
if log_to_screen: logger.add_repository(Repository.screen_dump())
if log_to_plot: logger.add_repository(Repository.plot(num_sensors))
if log_to_list: 
    log_list = []
    logger.add_repository(log_list) # Works because `Repository` objects are list-like 


## Setting up the connection between sensors and logger

In [6]:
if connection_type == 'socket':
    connection_factory = partial(Connection.create_socket_connection, host=host, port=port)
elif connection_type == 'shared_memory':
    connection_factory = Connection.create_memory_connection
elif connection_type == 'pipe':
    connection_factory = Connection.create_pipe_connection

# Creating Event objects to signal stop of simulation
proc_stop_event = ProcessEvent()
thread_stop_event = ThreadEvent()

sensor_connections = []
server_connections = [] # Need acces to these for closing down gracefully
server_threads = []
for _ in range(num_sensors):
    server_connection, client_connection = connection_factory(logger) 
    sensor_connections.append(client_connection)
        
    if server_connections != []:
        if connection_type == 'socket':
            # Singleton server connection used with socket communication
            # Avoid starting new thread to serve the same ip adress
            pass
        else:
            # Shared memory or pipe communications
            server_thread = Thread(target = server_connection.run, args = [thread_stop_event])
            server_thread.start()
            server_connections.append(server_connection)
            server_threads.append(server_thread)

    else: # Empty list server_connections, so append the first
        server_thread = Thread(target = server_connection.run, args = [thread_stop_event])
        server_thread.start()
        server_connections.append(server_connection)
        server_threads.append(server_thread)



Server 127.0.0.1:33330 is listening.


## Creating and spawning the sensors

In [7]:
if system_sensor_data:
    dts = [0.5, 5, 0.5, 0.5] # The sampling period of the sensors
    sensor_probes = {'CPU utilization (percent)' : cpu_utilization,
                     'Load average (divide with number of cpu cores)' : load_average,
                     'Memory available (Gb)' : memory_available,
                     'CPU temperature (Celcius)' : cpu_temp}
else:
    dts = np.arange(1, num_sensors+2) # The sampling period of the sensors
    sensor_probes = {}
    for dt_ in range(num_sensors):
        sensor_probes[f'Sensor-{dt_}'] = lambda : random.randint(-100, 100)
            
sensors = [Sensor(id, name, sampling_period = dt,
                  probe=sensor_probes[name],
                  connection = conn) \
               for id, name, dt, conn in zip(range(num_sensors), sensor_probes.keys(),
                                             dts, sensor_connections)]
print("Created sensor processes.")
sensor_processes = [Process(target=s.run, args=[proc_stop_event]) for s in sensors]
for p in sensor_processes:
    p.start()
print("Started sensors.")



Created sensor processes.
Started sensors.


Client connected. Type of return value  <class 'NoneType'>
Client connected. Type of return value  <class 'NoneType'>
Client connected. Type of return value  <class 'NoneType'>
Client connected. Type of return value  <class 'NoneType'>


CPU temperature (Celcius) received stop event. Sending None and closing connection.
Closing down socket.
CPU utilization (percent) received stop event. Sending None and closing connection.
Closing down socket.
Memory available (Gb) received stop event. Sending None and closing connection.
Closing down socket.
Load average (divide with number of cpu cores) received stop event. Sending None and closing connection.
Closing down socket.


## Closing down

In [8]:
input("Press return to end simulation")

thread_stop_event.set()
proc_stop_event.set()
    
for p in sensor_processes:
    p.join()

print("All sensor processes done.")

for sc in server_connections:
    sc.close()
        
for s in server_threads:
    s.join()

print("All server threads done. Exiting.")

    


Server accepting connection
Server accepting connection
Server accepting connection
Server accepting connection
Press return to end simulation
ServerConnection with socket <socket.socket fd=74, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 33330), raddr=('127.0.0.1', 47664)> received stop signal
ServerConnection with socket <socket.socket fd=64, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 33330), raddr=('127.0.0.1', 47650)> received stop signal
ServerConnection with socket <socket.socket fd=72, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 33330), raddr=('127.0.0.1', 47658)> received stop signal
ServerConnection with socket <socket.socket fd=76, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 33330), raddr=('127.0.0.1', 47674)> received stop signal
All sensor processes done.
id = 0,  name = CPU utilization (percent), dat

KeyboardInterrupt: 

## Print logged data

In [10]:
if log_to_list:
    for m in log_list:
        print(m)

id = 0,  name = CPU utilization (percent), data = 49.3, timestamp = 2023-03-12_19:31:48.935322
id = 2,  name = Memory available (Gb), data = 20.455563264000002, timestamp = 2023-03-12_19:31:48.955902
id = 3,  name = CPU temperature (Celcius), data = 25.0, timestamp = 2023-03-12_19:31:49.006507
id = 0,  name = CPU utilization (percent), data = 44.2, timestamp = 2023-03-12_19:31:49.453584
id = 2,  name = Memory available (Gb), data = 20.452274176, timestamp = 2023-03-12_19:31:49.480236
id = 3,  name = CPU temperature (Celcius), data = 25.0, timestamp = 2023-03-12_19:31:49.544226
id = 0,  name = CPU utilization (percent), data = 32.9, timestamp = 2023-03-12_19:31:49.954566
id = 2,  name = Memory available (Gb), data = 20.451561472, timestamp = 2023-03-12_19:31:49.981526
id = 3,  name = CPU temperature (Celcius), data = 25.0, timestamp = 2023-03-12_19:31:50.065755
id = 0,  name = CPU utilization (percent), data = 35.5, timestamp = 2023-03-12_19:31:50.455663
id = 2,  name = Memory available

Exception in thread Thread-7:
Traceback (most recent call last):
  File "/home/kjartan/.pyenv/versions/3.8.11/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/home/kjartan/.pyenv/versions/3.8.11/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/home/kjartan/projects/multithread_assessment/notebooks/../utils/network.py", line 196, in run
    client_sock, addr = self.server_socket.accept()
  File "/home/kjartan/.pyenv/versions/3.8.11/lib/python3.8/socket.py", line 292, in accept
    fd, addr = self._accept()
OSError: [Errno 9] Bad file descriptor


Server accepting connection
ServerConnection with socket <socket.socket fd=77, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 33330), raddr=('127.0.0.1', 42108)> received stop signal
Server received stop signal.
