# Simulation of an analog signal from a senor and subsequent analysis

michael@databooth.com.au

## Initial setup

Load the required packages (libraries)

In [1]:
# DONE: !pip install RPi.GPIO - only works on Raspberry Pi

In [2]:
import time
import random
from datetime import datetime, timedelta
import pandas as pd
from pathlib import Path
import matplotlib.pyplot as plt
import uuid

In [3]:
try:
    import RPi.GPIO as GPIO

    GPIO.setmode(GPIO.BCM)
    GPIO.setup(18, GPIO.IN)
except Exception as e:
    print(f"Import error: RPi.GPIO - {e}")

Import error: RPi.GPIO - No module named 'RPi'


In [4]:
N_OBSERVATION = 50
MAX_TIME_EXPERIMENT_SECONDS = 2 * 60
MAX_SENSOR_VALUE = 100.0
MAX_TIME_DIFF_SECONDS = 10
DATA_FILE = Path.cwd().parent / "data" / "sensor_data.csv"

Define any custom (user-defined) functions

In [5]:
def generate_uuid_from_datetime(current_time):
    timestamp = current_time.timestamp()
    unique_id = uuid.uuid5(uuid.NAMESPACE_DNS, str(timestamp))
    return unique_id

In [6]:
def read_analog_sensor(max_sensor_value=MAX_SENSOR_VALUE):
    sensor_value = random.uniform(
        0.0, max_sensor_value
    )  # this is returning a number in this (arbitrary range) - to simulate a reading from the sensor
    return sensor_value

In [7]:
def acquire_data(dt, data, sensor_data):
    sensor_data.append((dt, data))  # storing the data (for later use)
    return None

## Data Collection

Simulate the collection (capture) of data from the sensor for up to `N_OBSERVATION` observations (including random times between observations) or until the `MAX_TIME_EXPERIMENT_SECONDS` is reached.

In [8]:
def collect_simulated_data(n_observation=None, max_time=None):
    sensor_data = []
    acquisition_datetime = datetime.now()
    file_uuid = generate_uuid_from_datetime(acquisition_datetime)
    if n_observation is None:
        n_observation = N_OBSERVATION
    if max_time is None:
        max_time = MAX_TIME_EXPERIMENT_SECONDS
    print(
        f"Starting data collection at {acquisition_datetime} for maximum of {n_observation} observations or {max_time} seconds\n"
    )
    stop_datetime = acquisition_datetime + timedelta(seconds=max_time)
    counter = 0
    while counter < N_OBSERVATION and acquisition_datetime < stop_datetime:
        counter += 1
        sensor_value = read_analog_sensor()
        time_ran = random.uniform(0, MAX_TIME_DIFF_SECONDS)  # time between 0 and 5 seconds
        acquisition_datetime += timedelta(seconds=time_ran)
        print(f"{counter: 3d}: {acquisition_datetime}; Time since last: {time_ran:.3f} s; Sensor value: {sensor_value:.7f}")
        acquire_data(acquisition_datetime, sensor_value, sensor_data)
        time.sleep(time_ran)
    return sensor_data, file_uuid

In [9]:
# Example of running 1 experiment

# sensor_data, file_uuid = collect_simulated_data()

## Save data to file

With a unique filename (based on the current date and time)

In [11]:
def save_data_df(data_df, uuid, file_name=None):
    if file_name is None:
        file_name = DATA_FILE
    file_name = file_name.as_posix().replace(".csv", "")
    file_name = f"{file_name}_{uuid}"
    data_df.to_csv(f"{file_name}.csv", index=True)
    data_df.to_parquet(f"{file_name}.parquet")
    return None

In [12]:
def create_save_data_df(sensor_data, file_uuid):
    data_df = pd.DataFrame(sensor_data, columns=["Collection time", "Sensor value"])
    data_df["time_diff"] = data_df["Collection time"].diff() / pd.Timedelta(seconds=1)
    save_data_df(data_df, file_uuid)
    return data_df

In [None]:
for n_experiment in range(2):
    sensor_data, file_uuid = collect_simulated_data()
    print(f"\nExperiment #{n_experiment}: {file_uuid}\n")
    data_df = create_save_data_df(sensor_data, file_uuid)

## Check data

In [None]:
# TODO: Add some data reasonableness check here