In [1]:
MIN_TEMPERATURE = 97
BASE_TEMPERATURE = 98
MAX_TEMPERATURE = 103

MIN_AGE = 1
NEW_BORN_AGE = 5
ADOLESCENT_AGE = 15

MIN_WEIGHT = 4
MAX_WEIGHT = 120

MIN_HEIGHT = 50
MAX_HEIGHT = 200

MORNING_HOUR = 6
NOON_HOUR = 12
EVENING_HOUR = 18

MAX_STEP_COUNT = 30_000

MAX_CALORIES_BURNT = 3_000

MIN_REM_SLEEP_HOURS = 0

MIN_HEART_RATE = 50
MAX_HEART_RATE = 150

MIN_SYS_BP = 50
MAX_SYS_BP = 90

MIN_DIA_BP = 100
MAX_DIA_BP = 140

MIN_BREATH_FREQUENCY = 10

MIN_O2_AVG = 90

HEART_RATE_LB = 55.0
HEART_RATE_UB = 144.0

SYS_BP_LB = 51.0
SYS_BP_UB = 87.0

DIA_BP_LB = 101.0
DIA_BP_UB = 138.0

BODY_TEMP_LB = 97.0
BODY_TEMP_UB = 102.6

BF_LB = 10.0
BF_UB = 21.0

DATE_FORMAT = "%d/%m/%Y"
DATE_AND_TIME_FORMAT = "%d/%m/%Y %H:%M:%S"


In [2]:
from datetime import timedelta, datetime
from random import randrange


def fetch_random_date(start, end):
    """
        This function will return a random datetime between two datetime
        objects.
    """
    if end is None:
        return start
    delta = end - start
    int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
    random_second = randrange(int_delta)
    return start + timedelta(seconds=random_second)


In [18]:
import pandas as pd
# from app.server.constants import *


def generate_labels_for_data(event_data, generate_user):
    df_records = pd.DataFrame(event_data).T.reset_index(drop=True)

    if generate_user:
        df_records = pd.concat([df_records,
                                df_records['generalHealthStats'].apply(lambda x: pd.Series(x)),
                                df_records['user'].apply(lambda x: pd.Series(x)),
                                df_records['vitals'].apply(lambda x: pd.Series(x))
                                ], axis=1)
        columns_to_retain = ['user', 'vitals', 'generalHealthStats', 'dataSource', 'eventId', 'eventTimestamp', 'healthState']
    else:
        df_records = pd.concat([df_records,
                                df_records['generalHealthStats'].apply(lambda x: pd.Series(x)),
                                df_records['vitals'].apply(lambda x: pd.Series(x))
                                ], axis=1)
        columns_to_retain = ['vitals', 'generalHealthStats', 'dataSource', 'eventId', 'eventTimestamp', 'healthState']

    df_records['healthState'] = ''
    df_records.loc[(df_records['heartRate'] <= HEART_RATE_LB) |
                   (df_records['heartRate'] >= HEART_RATE_UB) |

                   (df_records['systolicBloodPressure'] <= SYS_BP_LB) |
                   (df_records['systolicBloodPressure'] >= SYS_BP_UB) |

                   (df_records['diastolicBloodPressure'] <= DIA_BP_LB) |
                   (df_records['diastolicBloodPressure'] >= DIA_BP_UB) |

                   (df_records['bodyTemperature'] < BODY_TEMP_LB) |
                   (df_records['bodyTemperature'] > BODY_TEMP_UB) |

                   (df_records['breathingFrequency'] <= BF_LB) |
                   (df_records['breathingFrequency'] >= BF_UB)
        , 'healthState'] = 'bad'
    df_records.loc[df_records['healthState'] == '', 'healthState'] = 'good'

    return df_records[columns_to_retain].to_dict('records')



In [19]:
import random
import secrets
# from app.server.constants import *


def synthesise_vitals():
    vitals = {}

    heartRate = synthesise_heart_rate()
    vitals['heartRate'] = heartRate

    systolicBloodPressure = synthesise_systolic_blood_pressure()
    vitals['systolicBloodPressure'] = systolicBloodPressure

    diastolicBloodPressure = synthesise_diastolic_blood_pressure()
    vitals['diastolicBloodPressure'] = diastolicBloodPressure

    bodyTemperature = synthesise_body_temperature()
    vitals['bodyTemperature'] = bodyTemperature

    breathingFrequency = synthesise_breathing_frequency()
    vitals['breathingFrequency'] = breathingFrequency

    return vitals


def synthesise_breathing_frequency():
    breathingFrequency = MIN_BREATH_FREQUENCY + secrets.randbelow(12)
    return breathingFrequency


def synthesise_body_temperature():
    bodyTemperature = BASE_TEMPERATURE + secrets.randbelow(3)
    random_sign = secrets.randbelow(2)
    random_sign = random_sign if random_sign == 1 else -1
    bodyTemperature += random_sign * round(random.random(), 2)
    bodyTemperature = round(bodyTemperature, 2)
    if bodyTemperature <= MIN_TEMPERATURE:
        bodyTemperature = MIN_TEMPERATURE
    elif bodyTemperature >= MAX_TEMPERATURE:
        bodyTemperature = MAX_TEMPERATURE
    return bodyTemperature


def synthesise_diastolic_blood_pressure():
    return MIN_DIA_BP + secrets.randbelow(40)


def synthesise_systolic_blood_pressure():
    return MIN_SYS_BP + secrets.randbelow(40)


def synthesise_heart_rate():
    return MIN_HEART_RATE + secrets.randbelow(100)

In [20]:
import random
import secrets
# from app.server.constants import *


def synthesise_health_stats(hour_of_the_day):
    generalHealthStats = {}

    stepCount = synthesise_step_count(hour_of_the_day)
    min_increment = secrets.randbelow(10)
    additional_steps = (min_increment + secrets.randbelow(100))
    stepCount = stepCount + additional_steps
    generalHealthStats['stepCount'] = stepCount

    bloodOxygenAvg = synthesise_blood_oxygen_level()
    generalHealthStats['bloodOxygenAvg'] = bloodOxygenAvg

    stress = synthesise_stress_score()
    generalHealthStats['stress'] = stress

    sleepHours = synthesise_sleep_hours()
    generalHealthStats['sleepHours'] = sleepHours

    REMsleepHours = synthesise_rem_sleep_hours()
    generalHealthStats['REMsleepHours'] = REMsleepHours

    caloriesBurnt = synthesise_calories_count(hour_of_the_day)
    min_increment = secrets.randbelow(20)
    decimal_increment = 0
    random_sign = secrets.randbelow(2)
    random_sign = random_sign if random_sign == 1 else -1
    decimal_increment += random_sign * random.random()
    additional_calories = (round(decimal_increment, 2) + min_increment + secrets.randbelow(200))
    caloriesBurnt = caloriesBurnt + additional_calories

    generalHealthStats['caloriesBurnt'] = caloriesBurnt

    return generalHealthStats


def synthesise_step_count(hour_of_the_day):
    if hour_of_the_day <= MORNING_HOUR:
        return secrets.randbelow(500)

    elif (hour_of_the_day > MORNING_HOUR) and (hour_of_the_day <= NOON_HOUR):
        return secrets.randbelow(8000)

    elif (hour_of_the_day > NOON_HOUR) and (hour_of_the_day <= EVENING_HOUR):
        return secrets.randbelow(15_000)

    elif hour_of_the_day > EVENING_HOUR:
        return secrets.randbelow(MAX_STEP_COUNT)


def synthesise_calories_count(hour_of_the_day):
    if hour_of_the_day <= MORNING_HOUR:
        return secrets.randbelow(200)

    elif (hour_of_the_day > MORNING_HOUR) and (hour_of_the_day <= NOON_HOUR):
        return secrets.randbelow(800)

    elif (hour_of_the_day > NOON_HOUR) and (hour_of_the_day <= EVENING_HOUR):
        return secrets.randbelow(1500)

    elif hour_of_the_day > EVENING_HOUR:
        return secrets.randbelow(MAX_CALORIES_BURNT)


def synthesise_blood_oxygen_level():
    bloodOxygenAvg = MIN_O2_AVG + secrets.randbelow(10)
    return bloodOxygenAvg


def synthesise_stress_score():
    stress = secrets.randbelow(5)
    return stress


def synthesise_sleep_hours():
    sleepHours = 4 + secrets.randbelow(6)
    minutes = 0
    random_sign = secrets.randbelow(2)
    random_sign = random_sign if random_sign == 1 else -1
    minutes += random_sign * round(random.random(), 2)
    sleepHours = sleepHours + minutes
    return sleepHours


def synthesise_rem_sleep_hours():
    REMsleepHours = secrets.randbelow(4)
    minutes = 0
    random_sign = secrets.randbelow(2)
    random_sign = random_sign if random_sign == 1 else -1
    minutes += random_sign * round(random.random(), 2)
    REMsleepHours += minutes
    if REMsleepHours <= MIN_REM_SLEEP_HOURS:
        REMsleepHours = MIN_REM_SLEEP_HOURS
    return REMsleepHours


In [21]:
import uuid
import secrets
from faker import Faker
# from app.server.constants import *

fake = Faker()
fake.seed_instance(8)


def synthesise_user_data(user):
    user['user_id'] = str(uuid.uuid4())

    name = fake.name()
    user['firstname'] = name.split(' ')[0]
    user['lastname'] = name.split(' ')[1]

    random_no = secrets.randbelow(2)
    gender = 'male' if random_no == 1 else 'female'
    user['gender'] = gender

    age = MIN_AGE + secrets.randbelow(65)
    user['age'] = age

    height = 0
    weight = 0

    if age < NEW_BORN_AGE:
        weight = MIN_WEIGHT + secrets.randbelow(10)
        height = MIN_HEIGHT + secrets.randbelow(40)

    elif NEW_BORN_AGE <= age <= ADOLESCENT_AGE:
        weight = 20 + secrets.randbelow(20)
        height = 70 + secrets.randbelow(60)

    elif age > ADOLESCENT_AGE:
        weight = 40 + secrets.randbelow(80)
        height = 140 + secrets.randbelow(60)

    user['weight'] = weight
    user['height'] = height
    return user


In [22]:
import uuid
import secrets
from datetime import timedelta


def synthesise_event_details(event, start_time, end_time, event_ts):
    min_increment = 1
    additional_mins = (min_increment + secrets.randbelow(5))
    additional_sec = secrets.randbelow(30)
    event_ts = event_ts + timedelta(minutes=additional_mins, seconds=additional_sec)

    if event_ts > end_time:
        event_ts = end_time
    elif event_ts < start_time:
        event_ts = start_time

    event['eventTimestamp'] = event_ts

    random_no = secrets.randbelow(2)
    source = 'app' if random_no == 1 else 'manual'
    event['dataSource'] = source

    event_id = str(uuid.uuid4())
    event['eventId'] = event_id

    return event


In [31]:

def synthesise_data_for_ui(start_time, end_time):
    return synthesise_core(start_time=start_time, end_time=end_time, generate_multiple_records=False,
                          generate_labels=True)


def synthesise_data_for_training(start_time, end_time):
    return synthesise_core(start_time=start_time, end_time=end_time, generate_multiple_records=True,
                          generate_labels=True, generate_user=True)


def synthesise_data_auto_call(start_time):
    return synthesise_core(start_time=start_time, generate_multiple_records=False, generate_labels=False)


def synthesise_bulk(event_data, start_time, end_time, generate_labels, generate_user, n_users=100):

    for _ in range(n_users):
        user = {}
        # generate user
        if generate_user:
            user = synthesise_user_data(user=user)

        # intervals: number of periods where data is captured
        # pick random number of time intervals (1,10)
        intervals = 1 + secrets.randbelow(10)
        for _ in range(intervals):

            interval_records = secrets.randbelow(30)

            start_date = start_time
            end_date = end_time

            first_ts = fetch_random_date(start_date, end_date)
            event_ts = first_ts
            hour_of_the_day = event_ts.hour

            # These health stats are monotonically increasing for the day, so they are generated outside the loop
            stepCount = synthesise_step_count(hour_of_the_day)
            caloriesBurnt = synthesise_calories_count(hour_of_the_day)

            # These health stats remain the same for the same day, so they are generated outside the loop
            sleepHours = synthesise_sleep_hours()
            REMsleepHours = synthesise_rem_sleep_hours()
            bodyTemperature = synthesise_body_temperature()

            # interval_records: number of records present in each period
            # pick random number of timestamps between start and end date for all intervals
            for _ in range(interval_records):
                event = {}
                vitals = {}
                generalHealthStats = {}

                if generate_user:
                    event['user'] = user

                # generate event details
                event = synthesise_event_details(event, start_time, end_time, event_ts)

                # generate vitals
                heartRate = synthesise_heart_rate()
                vitals['heartRate'] = heartRate

                systolicBloodPressure = synthesise_systolic_blood_pressure()
                vitals['systolicBloodPressure'] = systolicBloodPressure

                diastolicBloodPressure = synthesise_diastolic_blood_pressure()
                vitals['diastolicBloodPressure'] = diastolicBloodPressure

                random_sign = secrets.randbelow(2)
                random_sign = random_sign if random_sign == 1 else -1
                bodyTemperature += random_sign * round(random.random(), 2)
                bodyTemperature = round(bodyTemperature, 2)
                if bodyTemperature <= MIN_TEMPERATURE:
                    bodyTemperature = MIN_TEMPERATURE
                elif bodyTemperature >= MAX_TEMPERATURE:
                    bodyTemperature = MAX_TEMPERATURE
                vitals['bodyTemperature'] = bodyTemperature

                breathingFrequency = synthesise_breathing_frequency()
                vitals['breathingFrequency'] = breathingFrequency

                event['vitals'] = vitals

                # generate generalHealthStats
                min_increment = secrets.randbelow(10)
                additional_steps = (min_increment + secrets.randbelow(100))
                stepCount = stepCount + additional_steps
                generalHealthStats['stepCount'] = stepCount

                bloodOxygenAvg = synthesise_blood_oxygen_level()
                generalHealthStats['bloodOxygenAvg'] = bloodOxygenAvg

                stress = synthesise_stress_score()
                generalHealthStats['stress'] = stress

                generalHealthStats['sleepHours'] = sleepHours

                generalHealthStats['REMsleepHours'] = REMsleepHours

                min_increment = secrets.randbelow(20)
                additional_calories = (min_increment + secrets.randbelow(200))
                caloriesBurnt = caloriesBurnt + additional_calories
                generalHealthStats['caloriesBurnt'] = caloriesBurnt

                event['generalHealthStats'] = generalHealthStats
                eventId = event.get('eventId')
                event_data[f'{eventId}'] = event

    if generate_labels:
        event_data = generate_labels_for_data(event_data, generate_user=generate_user)
    return event_data


def synthesise_core(start_time, end_time=None, generate_multiple_records=False, generate_labels=False, generate_user=False, n_users=100):
    event_data = {}
    user = {}

    if generate_multiple_records:
        return synthesise_bulk(event_data=event_data, start_time=start_time, end_time=end_time,
                                       generate_labels=generate_labels, generate_user=generate_user, n_users=n_users)

    else:
        # generate random timestamp between (start, end)
        first_ts = fetch_random_date(start=start_time, end=end_time)
        event_ts = first_ts
        hour_of_the_day = event_ts.hour

        event = {}
        # generate event related details
        if end_time is None:
            end_time = start_time
        event = synthesise_event_details(event, start_time, end_time, event_ts)

        # generate user information
        if generate_user:
            user = synthesise_user_data(user)

        event['user'] = user
        # generate vitals
        event['vitals'] = synthesise_vitals()

        # generate health stats
        event['generalHealthStats'] = synthesise_health_stats(hour_of_the_day)

        eventId = event.get('eventId')
        event_data[f'{eventId}'] = event

        if generate_labels:
            return generate_labels_for_data(event_data, generate_user=generate_user)[0]

        return event_data[eventId]

In [32]:
data = {"start_time": "31/01/2022 12:23:31"}
dd = datetime.strptime(data.get("start_time"), "%d/%m/%Y %H:%M:%S") or datetime.now() - timedelta(minutes=10)
print(dd)

2022-01-31 12:23:31


In [33]:
synthesise_data_auto_call(dd)

{'eventTimestamp': datetime.datetime(2022, 1, 31, 12, 23, 31),
 'dataSource': 'manual',
 'eventId': '9b1c4228-e51b-4cf8-b57b-041bda135bb2',
 'user': {},
 'vitals': {'heartRate': 78,
  'systolicBloodPressure': 54,
  'diastolicBloodPressure': 128,
  'bodyTemperature': 100.41,
  'breathingFrequency': 21},
 'generalHealthStats': {'stepCount': 3767,
  'bloodOxygenAvg': 93,
  'stress': 4,
  'sleepHours': 8.03,
  'REMsleepHours': 2.65,
  'caloriesBurnt': 743.55}}

In [34]:
data = {"start_time": "31/01/2022", "end_time": "01/02/2022"}

In [35]:
start_time = datetime.strptime(data.get("start_time"), "%d/%m/%Y") or datetime.now() - timedelta(minutes=10)
end_time = datetime.strptime(data.get("end_time"), "%d/%m/%Y") or datetime.now()
print(start_time, end_time)

2022-01-31 00:00:00 2022-02-01 00:00:00


In [36]:
synthesise_data_for_ui(start_time=start_time, end_time=end_time)

Index(['dataSource', 'eventId', 'eventTimestamp', 'generalHealthStats', 'user',
       'vitals', 'stepCount', 'bloodOxygenAvg', 'stress', 'sleepHours',
       'REMsleepHours', 'caloriesBurnt', 'heartRate', 'systolicBloodPressure',
       'diastolicBloodPressure', 'bodyTemperature', 'breathingFrequency'],
      dtype='object')


{'vitals': {'heartRate': 109,
  'systolicBloodPressure': 59,
  'diastolicBloodPressure': 112,
  'bodyTemperature': 100.99,
  'breathingFrequency': 12},
 'generalHealthStats': {'stepCount': 516,
  'bloodOxygenAvg': 90,
  'stress': 4,
  'sleepHours': 6.89,
  'REMsleepHours': 0.92,
  'caloriesBurnt': 750.1800000000001},
 'dataSource': 'app',
 'eventId': 'd45bca0b-0f26-4463-99a2-00370a592b73',
 'eventTimestamp': Timestamp('2022-01-31 12:15:45'),
 'healthState': 'good'}

In [37]:
synthesise_data_for_ui(datetime.now() - timedelta(minutes=10), datetime.now())

Index(['dataSource', 'eventId', 'eventTimestamp', 'generalHealthStats', 'user',
       'vitals', 'stepCount', 'bloodOxygenAvg', 'stress', 'sleepHours',
       'REMsleepHours', 'caloriesBurnt', 'heartRate', 'systolicBloodPressure',
       'diastolicBloodPressure', 'bodyTemperature', 'breathingFrequency'],
      dtype='object')


{'vitals': {'heartRate': 82,
  'systolicBloodPressure': 79,
  'diastolicBloodPressure': 114,
  'bodyTemperature': 98.07,
  'breathingFrequency': 10},
 'generalHealthStats': {'stepCount': 428,
  'bloodOxygenAvg': 95,
  'stress': 0,
  'sleepHours': 5.75,
  'REMsleepHours': 0.6599999999999999,
  'caloriesBurnt': 203.13},
 'dataSource': 'app',
 'eventId': '8e58ef36-5cf7-44cf-93d5-4d8c80561c9b',
 'eventTimestamp': Timestamp('2023-02-19 02:02:23.118266'),
 'healthState': 'bad'}

In [38]:
data = synthesise_core(start_time=start_time, end_time=end_time, generate_multiple_records=True,
                      generate_labels=True, generate_user=True, n_users=1000)

Index(['user', 'eventTimestamp', 'dataSource', 'eventId', 'vitals',
       'generalHealthStats', 'stepCount', 'bloodOxygenAvg', 'stress',
       'sleepHours', 'REMsleepHours', 'caloriesBurnt', 'user_id', 'firstname',
       'lastname', 'gender', 'age', 'weight', 'height', 'heartRate',
       'systolicBloodPressure', 'diastolicBloodPressure', 'bodyTemperature',
       'breathingFrequency'],
      dtype='object')


In [39]:
df_records = pd.DataFrame.from_records(data).reset_index(drop=True)

In [40]:
df_records = pd.concat([df_records,
                        df_records['generalHealthStats'].apply(lambda x: pd.Series(x)),
                        df_records['user'].apply(lambda x: pd.Series(x)),
                        df_records['vitals'].apply(lambda x: pd.Series(x))
                        ], axis=1)
columns_to_retain = ['user', 'vitals', 'generalHealthStats', 'dataSource', 'eventId', 'eventTimestamp', 'healthState']


In [41]:
df_records.columns

Index(['user', 'vitals', 'generalHealthStats', 'dataSource', 'eventId',
       'eventTimestamp', 'healthState', 'stepCount', 'bloodOxygenAvg',
       'stress', 'sleepHours', 'REMsleepHours', 'caloriesBurnt', 'user_id',
       'firstname', 'lastname', 'gender', 'age', 'weight', 'height',
       'heartRate', 'systolicBloodPressure', 'diastolicBloodPressure',
       'bodyTemperature', 'breathingFrequency'],
      dtype='object')

In [42]:
df_records.shape

(78120, 25)

In [43]:
df_records.to_csv('data/time_series_dump_v3.csv')

In [None]:
### rough ###

In [None]:
from enum import Enum
class Shake(Enum):
    VANILLA = "vanilla"
    CHOCOLATE = "choc"
    COOKIES = "cookie"
    MINT = "mint"

dct = {i.name: i.value for i in Shake}
print(dct)

{'VANILLA': 'vanilla', 'CHOCOLATE': 'choc', 'COOKIES': 'cookie', 'MINT': 'mint'}


In [None]:
Shake['VANILLA'].name, Shake['VANILLA'].value

('VANILLA', 'vanilla')