In [0]:
import random
from datetime import datetime, timedelta
from pyspark.sql.functions import from_unixtime, col
import time

# Generate data with Unix timestamps
num_people = 30
names = [f'Person_{i+1}' for i in range(num_people)]
device_ids = [f'DEV{1000+i}' for i in range(num_people)]

data = []
for i in range(num_people):
    for j in range(5):  # 5 records per person
        dt = datetime.now() - timedelta(minutes=random.randint(0, 10000))
        record = {
            'id': i * 5 + j + 1,
            'time': int(dt.timestamp()),  # Convert to Unix timestamp
            'heartrate': random.randint(60, 100),
            'name': names[i],
            'device_id': device_ids[i]
        }
        data.append(record)

df = spark.createDataFrame(data)
display(df)

In [0]:
from pyspark.sql.functions import from_unixtime, col

def process_health_tracker_data(dataframe):
    return (dataframe
            .select(
                col('name'),
                from_unixtime(col('time')).cast('timestamp').alias('time'),
                from_unixtime(col('time')).cast('date').alias('dte'),
                col('heartrate'),
                col('device_id').alias('p_device_id')
            )
    )
processed_df = process_health_tracker_data(df)
display(processed_df)

In [0]:
processed_df.write.mode('overwrite').format('parquet').partitionBy('p_device_id').save('/Volumes/workspace/default/streaming_checkpoints/healthtracker')

In [0]:
display(spark.read.parquet('/Volumes/workspace/default/streaming_checkpoints/healthtracker'))

In [0]:
%sql
SHOW VOLUMES;

In [0]:
spark.sql(f"""
          DROP TABLE IF EXISTS workspace.default.health_tracker_processed
          """)
spark.sql("""
    CREATE TABLE IF NOT EXISTS workspace.default.health_tracker_processed
    USING parquet
    LOCATION '/Volumes/workspace/default/streaming_checkpoints/healthtracker'
""")
# spark.sql("""
#           CREATE TABLE workspace.default.health_tracker_processed (
#                 name STRING,
#                 time TIMESTAMP,
#                 dte DATE,
#                 -- add other columns here, for example:
#                 heart_rate INT,
#                 p_device_id STRING
#           )
#           USING parquet
#           PARTITIONED BY (p_device_id)
#           LOCATION '/Volumes/workspace/default/streaming_checkpoints/healthtracker'
#           """)