In [1]:
import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
import numpy as np
import pandas as pd
import psycopg2

Setup database connection:

In [3]:
conn = psycopg2.connect(database="postgres",
                        host="localhost",
                        user="admin",
                        password="admin",
                        port="5432")
cursor = conn.cursor()

In [4]:
def create_tables():
    """ Create tables in the PostgreSQL database"""
    commands = (
        """
        CREATE TABLE IF NOT EXISTS heart_rates (
            id SERIAL PRIMARY KEY,
            user_id VARCHAR(50) NOT NULL,
            email VARCHAR(50) NOT NULL,
            date VARCHAR(50),
            time VARCHAR(50),
            heart_rate INTEGER
        )
        """,
        """ CREATE TABLE IF NOT EXISTS calories (
                id SERIAL PRIMARY KEY,
                user_id VARCHAR(50) NOT NULL,
                email VARCHAR(50) NOT NULL,
                date VARCHAR(50),
                time VARCHAR(50),
                calories DOUBLE PRECISION
                )
        """,
        """
        CREATE TABLE IF NOT EXISTS weights (
                id SERIAL PRIMARY KEY,
                user_id VARCHAR(50) NOT NULL,
                email VARCHAR(50) NOT NULL,
                date VARCHAR(50),
                time VARCHAR(50),
                weight_kg DOUBLE PRECISION
        )
        """)
    try:

        # execute the CREATE TABLE statement
        for command in commands:
            cursor.execute(command)
    except (psycopg2.DatabaseError, Exception) as error:
        print(error)

create_tables()
conn.commit()

# Synthetic data generation

In [5]:
user_id = 1234
np.random.seed(0)

# generate data range
date_range = pd.date_range(start="2024-01-01", end="2024-01-30", freq = "D", closed=None)
repeated_dates = pd.to_datetime(np.repeat(date_range, 5))

new_time_samples = [ "00:00:00", "08:00:00", "12:00:00", "16:00:00", "20:00:00"]
new_time_data = np.tile(new_time_samples, len(date_range))
# new_time_date_data = [pd.to_datetime(str(date) + ' ' + str(time)) for date, time in zip(repeated_dates, new_time_data)]
new_time_date_data = pd.to_datetime(repeated_dates) + pd.to_timedelta(new_time_data)
# generate time
time_samples = ["20:41:00", "14:48:00", "14:47:00", "14:46:00", "14:45:00", "09:32:00", "04:48:00", "09:27:00", "02:21:00", "09:34:00"]
time_data = np.random.choice(time_samples, size=len(date_range))

# heart rate
heart_rate_data = np.random.randint(low=60, high=120, size=len(repeated_dates)).tolist()

# calories
calories_data = np.concatenate([
    np.random.uniform(1, 2, size=int(len(repeated_dates) * 0.9)), 
    np.random.uniform(2, 9, size=int(len(repeated_dates) * 0.1)) 
])
# make data disorder
np.random.shuffle(calories_data)
calories_data = calories_data.tolist()

# weight
weight_data = np.random.uniform(62, 78, size=len(repeated_dates)).tolist()


Adding the synthetic data to the database...

In [6]:
heart_rates_records =  [
       (user_id, "pps006@cancerbase.org", date.strftime('%Y-%m-%d'), str(time), heart_rate) 
        for _, _, date, time, heart_rate in 
        zip(range(len(repeated_dates)), range(len(repeated_dates)), repeated_dates, new_time_data, heart_rate_data)
    ]

calories_records = [
       (user_id, "pps006@cancerbase.org", date.strftime('%Y-%m-%d'), str(time), cal) 
        for _, _, date, time, cal in 
        zip(range(len(repeated_dates)), range(len(repeated_dates)), repeated_dates, new_time_data, calories_data)
    ]

weights_records = [
       (user_id, "pps006@cancerbase.org", date.strftime('%Y-%m-%d'), str(time), weight) 
        for _, _, date, time, weight in 
        zip(range(len(repeated_dates)), range(len(repeated_dates)), repeated_dates, new_time_data, weight_data)
    ]

In [7]:
heart_rates_records_sql = "INSERT INTO heart_rates(user_id, email, date, time, heart_rate) VALUES(%s,%s,%s,%s,%s) "
calories_records_sql = "INSERT INTO calories(user_id, email, date, time, calories) VALUES (%s,%s,%s,%s,%s)"
weights_records_sql = "INSERT INTO weights(user_id, email, date, time, weight_kg) VALUES (%s,%s,%s,%s,%s)"

In [8]:
try:
    # execute the INSERT statement
    cursor.executemany(heart_rates_records_sql, heart_rates_records)
    cursor.executemany(calories_records_sql, calories_records)
    cursor.executemany(weights_records_sql, weights_records)
    
    # commit the changes to the database
    conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
    print(error)  

# Setting up spark for mets calculation

In [9]:
spark = SparkSession.builder.appName("demo").config("spark.jars", "postgresql-42.7.2.jar").getOrCreate()

24/06/02 15:27:58 WARN Utils: Your hostname, node8 resolves to a loopback address: 127.0.1.1; using 192.168.0.27 instead (on interface eno1)
24/06/02 15:27:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/06/02 15:27:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/02 15:27:59 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [11]:
heart_rate_df = spark.read.format("jdbc"). \
options(
         url='jdbc:postgresql://localhost:5432/postgres', # jdbc:postgresql://<host>:<port>/<database>
         dbtable='heart_rates',
         user='admin',
         password='admin',
         driver='org.postgresql.Driver').\
load()

calorie_df = spark.read.format("jdbc"). \
options(
         url='jdbc:postgresql://localhost:5432/postgres', # jdbc:postgresql://<host>:<port>/<database>
         dbtable='calories',
         user='admin',
         password='admin',
         driver='org.postgresql.Driver').\
load()

weight_df = spark.read.format("jdbc"). \
options(
         url='jdbc:postgresql://localhost:5432/postgres', # jdbc:postgresql://<host>:<port>/<database>
         dbtable='weights',
         user='admin',
         password='admin',
         driver='org.postgresql.Driver').\
load()

# Calculates Mets

In [12]:
def get_mets(heart_rate_df, calorie_df, weight_df):
    merged_df = heart_rate_df.join(calorie_df, on=['date', 'time'], how='left').select('date', 'time', 'heart_rate', 'calories')  
    
    merged_df = merged_df.join(weight_df[['date', 'weight_kg']], on='date', how='left').select('date', 'time', 'heart_rate', 'calories','weight_kg')
    
    merged_df = merged_df.withColumn('joule', F.col('calories')* 4.184)
    
    merged_df = merged_df.withColumn('mets', (F.col('joule')/F.col('weight_kg')))
    
    # if(merged_df.shape[0] == 0):
    #     print('no merged data')
    #     return merged_df
    
    mode = 0.055210 # for the sake of consistency between pandas version
    # mode = merged_df.groupby('mets').count().orderBy('count', ascending=True).first()[0]
    times = 1.00 /mode
    merged_df = merged_df.withColumn('mets', F.col('mets') * times)
    
    return merged_df

merged_df = get_mets(heart_rate_df, calorie_df,weight_df)
merged_df.show()

+----------+--------+----------+-----------------+-----------------+----------------+------------------+
|      date|    time|heart_rate|         calories|        weight_kg|           joule|              mets|
+----------+--------+----------+-----------------+-----------------+----------------+------------------+
|2024-01-16|12:00:00|        83|1.097259927063174|65.86269792122519|4.59093553483232|1.2625364658819478|
|2024-01-16|12:00:00|        83|1.097259927063174| 66.6143759573039|4.59093553483232|1.2482899775299414|
|2024-01-16|12:00:00|        83|1.097259927063174|74.53175101302101|4.59093553483232|1.1156863583197796|
|2024-01-16|12:00:00|        83|1.097259927063174|63.04486731442849|4.59093553483232|1.3189663395148945|
|2024-01-16|12:00:00|        83|1.097259927063174| 77.8305564365509|4.59093553483232|1.0683986042770088|
|2024-01-16|12:00:00|        83|1.097259927063174|65.86269792122519|4.59093553483232|1.2625364658819478|
|2024-01-16|12:00:00|        83|1.097259927063174| 66.6

# Calculate Active hours

In [13]:
def categorize_mets(merged_df):
    def divide_mets(mets):
        if mets < 1.5:
            return 'sedentary'
        elif 1.5 <= mets < 3.0:
            return 'lightly_active'
        elif 3.0 <= mets < 6.0:
            return 'fairly_active'
        else:
            return 'very_active'
            
    divide_mets_udf = F.udf(lambda x:divide_mets(x), StringType()) 
    
    merged_df =  merged_df.withColumn('mets_category', divide_mets_udf(F.col('mets')))

    # categorized by date and level
    mets_df = merged_df.groupby('date', 'mets_category').pivot('mets_category').agg((F.count('time')/60)).fillna(0) 
    
    if 'sedentary' not in mets_df.columns:
        mets_df = mets_df.withColumn('sedentary', F.lit(0)) 
    if 'lightly_active' not in mets_df.columns:
        mets_df = mets_df.withColumn('lightly_active', F.lit(0)) 
    if 'fairly_active' not in mets_df.columns:
        mets_df = mets_df.withColumn('fairly_active', F.lit(0)) 
    if 'very_active' not in mets_df.columns:
        mets_df = mets_df.withColumn('very_active', F.lit(0)) 
    
    mets_df.drop('mets_category')
    mets_df = mets_df.groupby('date').agg(F.sum('fairly_active').alias('fairly_active'),
                                           F.sum('lightly_active').alias('lightly_active'),
                                           F.sum('sedentary').alias('sedentary'),
                                           F.sum('very_active').alias('very_active'))      
    
    mets_df = mets_df.withColumn('total_active', F.col('sedentary') + F.col('lightly_active') + F.col('fairly_active') + F.col('very_active'))
    mets_df = mets_df.withColumn('non-sedentary', F.col('lightly_active') + F.col('fairly_active') + F.col('very_active'))    
    
    return mets_df
mets_df = categorize_mets(merged_df)
mets_df.show()
mets_df.count()

                                                                                

+----------+------------------+------------------+------------------+------------------+------------------+------------------+
|      date|     fairly_active|    lightly_active|         sedentary|       very_active|      total_active|     non-sedentary|
+----------+------------------+------------------+------------------+------------------+------------------+------------------+
|2024-01-19|               0.0|17.066666666666666|               9.6|               0.0|26.666666666666664|17.066666666666666|
|2024-01-13|               0.0|21.333333333333332| 5.333333333333333|               0.0|26.666666666666664|21.333333333333332|
|2024-01-06| 5.333333333333333|              16.0| 5.333333333333333|               0.0|26.666666666666664|21.333333333333332|
|2024-01-14|2.1333333333333333|14.933333333333334|1.0666666666666667| 8.533333333333333|26.666666666666664|              25.6|
|2024-01-10|               0.0|              22.4| 4.266666666666667|               0.0|26.666666666666664|    

30