In [1]:
import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, StructType, IntegerType, StructField, FloatType
import pyspark.pandas as ps
import numpy as np
import pandas as pd

In [14]:
spark = SparkSession.builder.master('spark://localhost:7077').appName("demo").getOrCreate()

In [15]:
spark

In [4]:
user_id = 1234
np.random.seed(0)

# generate data range
date_range = pd.date_range(start="2024-01-01", end="2024-01-30", freq = "D", closed=None)
repeated_dates = pd.to_datetime(np.repeat(date_range, 5))

new_time_samples = [ "00:00:00", "08:00:00", "12:00:00", "16:00:00", "20:00:00"]
new_time_data = np.tile(new_time_samples, len(date_range))
# new_time_date_data = [pd.to_datetime(str(date) + ' ' + str(time)) for date, time in zip(repeated_dates, new_time_data)]
new_time_date_data = pd.to_datetime(repeated_dates) + pd.to_timedelta(new_time_data)
# generate time
time_samples = ["20:41:00", "14:48:00", "14:47:00", "14:46:00", "14:45:00", "09:32:00", "04:48:00", "09:27:00", "02:21:00", "09:34:00"]
time_data = np.random.choice(time_samples, size=len(date_range))

# heart rate
heart_rate_data = np.random.randint(low=60, high=120, size=len(repeated_dates)).tolist()

# calories
calories_data = np.concatenate([
    np.random.uniform(1, 2, size=int(len(repeated_dates) * 0.9)), 
    np.random.uniform(2, 9, size=int(len(repeated_dates) * 0.1)) 
])
# make data disorder
np.random.shuffle(calories_data)
calories_data = calories_data.tolist()

# weight
weight_data = np.random.uniform(62, 78, size=len(repeated_dates)).tolist()


  date_range = pd.date_range(start="2024-01-01", end="2024-01-30", freq = "D", closed=None)


In [8]:
heart_rate_df = spark.createDataFrame(data =
    [
       ("pps006@cancerbase.org", user_id, date_time.to_pydatetime(), heart_rate) 
        for _, _, date_time, heart_rate in 
        zip(range(len(repeated_dates)), range(len(repeated_dates)), new_time_date_data, heart_rate_data)
    ],
    schema=["email", "user_id", "date_time", "heart_rate"]
)

calorie_df = spark.createDataFrame(data =
    [
       ("pps006@cancerbase.org", user_id, date_time.to_pydatetime(), cal) 
        for _, _, date_time, cal in 
        zip(range(len(repeated_dates)), range(len(repeated_dates)), new_time_date_data, calories_data)
    ],
    schema=["email", "user_id", "date_time", "calories"]
)

weight_df = spark.createDataFrame(data =
    [
       ("pps006@cancerbase.org", user_id, date_time.to_pydatetime(), weight) 
        for _, _, date_time, weight in 
        zip(range(len(repeated_dates)), range(len(repeated_dates)), new_time_date_data, weight_data)
    ],
    schema=["email", "user_id", "date_time", "weight_kg"]
)
heart_rate_df.show()
calorie_df.show()
weight_df.show()

+--------------------+-------+-------------------+----------+
|               email|user_id|          date_time|heart_rate|
+--------------------+-------+-------------------+----------+
|pps006@cancerbase...|   1234|2024-01-01 00:00:00|        78|
|pps006@cancerbase...|   1234|2024-01-01 08:00:00|        95|
|pps006@cancerbase...|   1234|2024-01-01 12:00:00|        84|
|pps006@cancerbase...|   1234|2024-01-01 16:00:00|       109|
|pps006@cancerbase...|   1234|2024-01-01 20:00:00|       111|
|pps006@cancerbase...|   1234|2024-01-02 00:00:00|        89|
|pps006@cancerbase...|   1234|2024-01-02 08:00:00|        79|
|pps006@cancerbase...|   1234|2024-01-02 12:00:00|        79|
|pps006@cancerbase...|   1234|2024-01-02 16:00:00|        74|
|pps006@cancerbase...|   1234|2024-01-02 20:00:00|        99|
|pps006@cancerbase...|   1234|2024-01-03 00:00:00|        92|
|pps006@cancerbase...|   1234|2024-01-03 08:00:00|        61|
|pps006@cancerbase...|   1234|2024-01-03 12:00:00|        69|
|pps006@

# Calculate Mets

In [9]:
def get_mets(heart_rate_df, calorie_df, weight_df):
    merged_df = heart_rate_df.join(calorie_df, on='date_time', how='left').select(heart_rate_df['date_time'], 'heart_rate', 'calories') 
    
    merged_df = merged_df.withColumn('date', F.to_date(F.col('date_time')))
    weight_df = weight_df.withColumn('date', F.to_date(F.col('date_time')))    
    
    merged_df = merged_df.join(weight_df, on=merged_df.date==weight_df.date, how='left').select(merged_df['date_time'], merged_df['date'],'heart_rate', 'calories','weight_kg')
    
    merged_df = merged_df.withColumn('joule', F.col('calories')* 4.184)
    
    merged_df = merged_df.withColumn('mets', (F.col('joule')/F.col('weight_kg')))
    
    # if(merged_df.shape[0] == 0):
    #     print('no merged data')
    #     return merged_df
    
    times = 1.00 / merged_df.agg(F.mode('mets')).first()[0] # mode calculation returns sth different from pandas
    merged_df = merged_df.withColumn('mets', F.col('mets') * times)
    
    return merged_df

merged_df = get_mets(heart_rate_df, calorie_df,weight_df)
merged_df.show()


                                                                                

+-------------------+----------+----------+------------------+-----------------+------------------+-------------------+
|          date_time|      date|heart_rate|          calories|        weight_kg|             joule|               mets|
+-------------------+----------+----------+------------------+-----------------+------------------+-------------------+
|2024-01-12 20:00:00|2024-01-12|       102|1.3118602206821892|62.24969703114925|  5.48882316333428|0.22545701565321616|
|2024-01-12 20:00:00|2024-01-12|       102|1.3118602206821892|62.52919346360809|  5.48882316333428|0.22444925546861433|
|2024-01-12 20:00:00|2024-01-12|       102|1.3118602206821892|71.83143530362999|  5.48882316333428| 0.1953828551334898|
|2024-01-12 20:00:00|2024-01-12|       102|1.3118602206821892|65.72390606846491|  5.48882316333428|0.21353920905644042|
|2024-01-12 20:00:00|2024-01-12|       102|1.3118602206821892|67.22721410829216|  5.48882316333428| 0.2087641307782331|
|2024-01-04 08:00:00|2024-01-04|        

# Calculate Active hours

In [11]:
def categorize_mets(merged_df):
    def divide_mets(mets):
        if mets < 1.5:
            return 'sedentary'
        elif 1.5 <= mets < 3.0:
            return 'lightly_active'
        elif 3.0 <= mets < 6.0:
            return 'fairly_active'
        else:
            return 'very_active'
            
    divide_mets_udf = F.udf(lambda x:divide_mets(x), StringType()) 
    
    merged_df =  merged_df.withColumn('mets_category', divide_mets_udf(F.col('mets')))
    
    # categorized by date and level
    merged_df = merged_df.withColumn('time', F.substring(F.col('date_time'), 11, 9)) # get the time of day from date_time column
    mets_df = merged_df.groupby(['date', 'mets_category']).agg((F.count('time')/60).alias('sedentary'))
    # mets_df.columns.name = None

    if 'sedentary' not in mets_df.columns:
        mets_df = mets_df.withColumn('sedentary', F.lit(0)) 
    if 'lightly_active' not in mets_df.columns:
        mets_df = mets_df.withColumn('lightly_active', F.lit(0)) 
    if 'fairly_active' not in mets_df.columns:
        mets_df = mets_df.withColumn('fairly_active', F.lit(0)) 
    if 'very_active' not in mets_df.columns:
        mets_df = mets_df.withColumn('very_active', F.lit(0)) 
        
    mets_df = mets_df.withColumn('total_active', F.col('sedentary') + F.col('lightly_active') + F.col('fairly_active') + F.col('very_active'))
    mets_df = mets_df.withColumn('non-sedentary', F.col('lightly_active') + F.col('fairly_active') + F.col('very_active'))    
    mets_df = mets_df.orderBy('date')
    return mets_df
mets_df = categorize_mets(merged_df)
mets_df.show()


[Stage 69:>                                                         (0 + 1) / 1]

+----------+-------------+------------------+--------------+-------------+-----------+------------------+-------------+
|      date|mets_category|         sedentary|lightly_active|fairly_active|very_active|      total_active|non-sedentary|
+----------+-------------+------------------+--------------+-------------+-----------+------------------+-------------+
|2024-01-01|    sedentary|0.4166666666666667|             0|            0|          0|0.4166666666666667|            0|
|2024-01-02|    sedentary|0.4166666666666667|             0|            0|          0|0.4166666666666667|            0|
|2024-01-03|    sedentary|0.4166666666666667|             0|            0|          0|0.4166666666666667|            0|
|2024-01-04|    sedentary|0.4166666666666667|             0|            0|          0|0.4166666666666667|            0|
|2024-01-05|    sedentary|0.4166666666666667|             0|            0|          0|0.4166666666666667|            0|
|2024-01-06|    sedentary|0.416666666666

                                                                                

In [12]:
spark.stop()