In [1]:
import pandas as pd
from tqdm import tqdm
import numpy as np
import io
tqdm.pandas()

In [2]:
prefix = 'slim_'
colnames=['timestamp', 'entity_id', 'entity_value', 'activity_annotation'] 
df = pd.read_csv("./data/data", names=colnames, sep="\t")

In [56]:
df

Unnamed: 0,timestamp,entity_id,entity_value,activity_annotation
0,2009-06-10 00:00:00.024668,T003,19,
1,2009-06-10 00:00:46.069471,T005,18.5,
2,2009-06-10 00:00:47.047655,T003,18.5,
3,2009-06-10 00:01:17.070215,T005,18,
4,2009-06-10 00:01:18.036049,T004,19.5,
...,...,...,...,...
726529,2009-08-05 23:44:43.054933,T001,24,
726530,2009-08-05 23:44:59.058871,T001,23.5,
726531,2009-08-05 23:45:15.047153,T001,24,
726532,2009-08-05 23:50:00.062322,T001,23.5,


In [3]:
activities_df = df[~df['activity_annotation'].isnull()]
activities_df

Unnamed: 0,timestamp,entity_id,entity_value,activity_annotation
270,2009-06-10 03:20:59.087874,M006,ON,Night wandering begin
293,2009-06-10 03:25:24.070558,M012,OFF,Night wandering end
342,2009-06-10 03:45:16.046068,M009,ON,Bed to toilet begin
379,2009-06-10 03:49:29.073763,M005,OFF,Bed to toilet end
380,2009-06-10 03:54:23.058206,M002,ON,Night wandering begin
...,...,...,...,...
726050,2009-08-05 20:20:34.081274,M005,OFF,R2 sleep end
726226,2009-08-05 20:41:40.049991,M006,ON,R1 sleep begin
726267,2009-08-05 20:44:54.000313,M007,OFF,R1 sleep end
726404,2009-08-05 23:30:02.031666,M005,ON,Night wandering begin


since there's no "switch" kind of events, i will construct those events for making full fram predictions. 

this is done by introducing activity entities for each of the event type:

Bed to toilet (30)
Breakfast (48)
R1 sleep (50)
R1 wake (53)
R1 work in office (46)
Dinner (42)
Laundry (10)
Leave home (69)
Lunch (37)
Night wandering (67)
R2 sleep (52)
R2 take medicine (44)
R2 wake (52)

In [4]:
def map_activity_name_to_type(act_name:str) -> str:
    return act_name.replace(' begin', '').replace(' end', '')

In [5]:
# find unique activities
activity_entity_names = df['activity_annotation'].dropna().unique().tolist()
# remove the begin/end remarks, they will be come entity values
activity_entity_type = [map_activity_name_to_type(entity_name) for entity_name in activity_entity_names]
# create an activity entity_id to name mapping
activity_entity_map = d = {s:f'A{str(i).zfill(3)}' for i, s in enumerate(activity_entity_type, 1)}
print(activity_entity_map)

{'Night wandering': 'A002', 'Bed to toilet': 'A004', 'R1 wake': 'A006', 'R2 wake': 'A008', 'R2 take medicine': 'A010', 'Breakfast': 'A012', 'Leave home': 'A014', 'Lunch': 'A016', 'Dinner': 'A018', 'R2 sleep': 'A020', 'R1 sleep': 'A022', 'R1 work in office': 'A024', 'Laundry': 'A026'}


In [6]:
_tmp_act_df = pd.DataFrame()
_tmp_act_df['entity_id'] = activities_df['activity_annotation'].map(lambda act_name: activity_entity_map[map_activity_name_to_type(act_name)])
_tmp_act_df['timestamp'] = activities_df['timestamp']
_tmp_act_df['entity_value'] = activities_df['activity_annotation'].map(lambda act_name: 'begin' if act_name.endswith('begin') else 'end')

In [7]:
_tmp_act_df

Unnamed: 0,entity_id,timestamp,entity_value
270,A002,2009-06-10 03:20:59.087874,begin
293,A002,2009-06-10 03:25:24.070558,end
342,A004,2009-06-10 03:45:16.046068,begin
379,A004,2009-06-10 03:49:29.073763,end
380,A002,2009-06-10 03:54:23.058206,begin
...,...,...,...
726050,A020,2009-08-05 20:20:34.081274,end
726226,A022,2009-08-05 20:41:40.049991,begin
726267,A022,2009-08-05 20:44:54.000313,end
726404,A002,2009-08-05 23:30:02.031666,begin


In [8]:
merged_act_sensor_df = pd.concat([df.drop('activity_annotation', axis=1), _tmp_act_df], ignore_index=True)
merged_act_sensor_df['timestamp'] = pd.to_datetime(merged_act_sensor_df['timestamp'], format='ISO8601', utc=True)
merged_act_sensor_df['entity_id'] = merged_act_sensor_df['entity_id'].astype(str)
merged_act_sensor_df.sort_values(by='timestamp')
# factorize the categorical features
temp_sensor_mask = merged_act_sensor_df['entity_id'].str.startswith('T')
merged_act_sensor_df['sensor_change'] = ~merged_act_sensor_df['entity_id'].str.startswith('A')
mapped_categories, state_dict = pd.factorize(merged_act_sensor_df['entity_value'][~temp_sensor_mask])
mapped_entities, entity_dict = pd.factorize(merged_act_sensor_df['entity_id'])
merged_act_sensor_df.loc[~temp_sensor_mask, 'entity_value'] = mapped_categories
merged_act_sensor_df['entity_id'] = mapped_entities
merged_act_sensor_df['entity_value'] = merged_act_sensor_df['entity_value'].astype(str)


In [9]:
print(f'total {len(state_dict)}: {state_dict}')
print(f'total {len(entity_dict)}: {entity_dict}')
print(merged_act_sensor_df.head(10))

total 4: Index(['ON', 'OFF', 'begin', 'end'], dtype='object')
total 45: Index(['T003', 'T005', 'T004', 'T001', 'T002', 'M005', 'M006', 'M002', 'M009',
       'M011', 'M012', 'M022', 'M008', 'M007', 'M003', 'M010', 'M014', 'M015',
       'M023', 'M001', 'M024', 'M021', 'M016', 'M018', 'M020', 'M019', 'M013',
       'M025', 'M027', 'M017', 'M004', 'M026', 'A002', 'A004', 'A006', 'A008',
       'A010', 'A012', 'A014', 'A016', 'A018', 'A020', 'A022', 'A024', 'A026'],
      dtype='object')
                         timestamp  entity_id entity_value  sensor_change
0 2009-06-10 00:00:00.024668+00:00          0           19           True
1 2009-06-10 00:00:46.069471+00:00          1         18.5           True
2 2009-06-10 00:00:47.047655+00:00          0         18.5           True
3 2009-06-10 00:01:17.070215+00:00          1           18           True
4 2009-06-10 00:01:18.036049+00:00          2         19.5           True
5 2009-06-10 00:01:48.008924+00:00          2           20        

In [10]:
# save the dataframe
merged_act_sensor_df.to_parquet(f'./data/{prefix}merged_act_sensor_df.parquet')

In [11]:
# unique entities
unique_entities = merged_act_sensor_df['entity_id'].unique()

print(f'{len(unique_entities)} unique entities: {unique_entities}')

45 unique entities: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44]


In [12]:
first_values = initial_values = merged_act_sensor_df.groupby('entity_id').first().sort_values(by='timestamp').reset_index()
print(first_values)

    entity_id                        timestamp entity_value  sensor_change
0           0 2009-06-10 00:00:00.024668+00:00           19           True
1           1 2009-06-10 00:00:46.069471+00:00         18.5           True
2           2 2009-06-10 00:01:18.036049+00:00         19.5           True
3           3 2009-06-10 00:41:04.052911+00:00         21.5           True
4           4 2009-06-10 00:41:35.035429+00:00         21.5           True
5           5 2009-06-10 01:28:39.066357+00:00            0           True
6          32 2009-06-10 03:20:59.087874+00:00            2          False
7           6 2009-06-10 03:20:59.087874+00:00            0           True
8           7 2009-06-10 03:21:01.038931+00:00            0           True
9           8 2009-06-10 03:21:08.033939+00:00            0           True
10          9 2009-06-10 03:22:13.008367+00:00            0           True
11         10 2009-06-10 03:22:14.000398+00:00            0           True
12         11 2009-06-10 

In [13]:
# create default values for each sensor before they receive their first update
# for temp sensors, the init values are their first values
# for Movement sensors, the init values are opposite of their first values, which is OFF
# for Act type, the init values are opposite of their first values, which is end
def map_init_value(x):
    try:
        float(x)
        return x
    except ValueError:
        if x == str(state_dict.get_loc('ON')):
            return str(state_dict.get_loc('OFF'))
        elif x == str(state_dict.get_loc('begin')):
            return str(state_dict.get_loc('end'))
    return x

initial_frame = pd.DataFrame()
initial_frame['entity_id'] = first_values['entity_id']
initial_frame['entity_value'] = first_values['entity_value'].map(map_init_value)
print(initial_frame)


    entity_id entity_value
0           0           19
1           1         18.5
2           2         19.5
3           3         21.5
4           4         21.5
5           5            0
6          32            2
7           6            0
8           7            0
9           8            0
10          9            0
11         10            0
12         11            0
13         33            2
14         12            0
15         13            0
16         14            0
17         15            0
18         16            0
19         17            0
20         18            0
21         34            2
22         19            0
23         20            0
24         21            0
25         22            0
26         23            0
27         24            0
28         25            0
29         35            2
30         36            2
31         26            0
32         27            0
33         28            0
34         37            2
35         29            0
3

# create snapshot data frames

In [16]:
# Pivot the dataframe so each entity_id becomes a column, and entity_value are the values
pivot_df = merged_act_sensor_df.pivot(index='timestamp', columns='entity_id', values='entity_value')
# Reset the index to have 'timestamp' as a column again
snapshot_df = pivot_df.reset_index()
# Prepend the initial states to the snapshot df
initial_state = initial_frame.set_index('entity_id').T
initial_state['timestamp'] = merged_act_sensor_df['timestamp'].min() - pd.Timedelta(seconds=1)  # Assume initial timestamp before first event

# prefix the dummy initial state to the dataset, offset by 1 second before the actual head of the events
snapshot_df = pd.concat([initial_state, snapshot_df], ignore_index=True)
# Forward-fill to propagate the last known state for each entity over time
snapshot_df = snapshot_df.ffill()

snapshot_df = snapshot_df.merge(merged_act_sensor_df, on='timestamp')
snapshot_df.rename(columns={'entity_id': 'changed_entity_id', 'entity_value': 'changed_entity_value'}, inplace=True)
# snapshot_df['sensor_change'] = ~snapshot_df['changed_entity_id'].str.startswith('A')
snapshot_df['second'] = snapshot_df['timestamp'].dt.second
snapshot_df['minute'] = snapshot_df['timestamp'].dt.minute
snapshot_df['hour'] = snapshot_df['timestamp'].dt.hour
snapshot_df['dayofweek'] = snapshot_df['timestamp'].dt.day
snapshot_df['weekofmonth'] = (snapshot_df['timestamp'].dt.day - 1) // 7 + 1
snapshot_df['monthofyear'] = snapshot_df['timestamp'].dt.month
snapshot_df['timedelta_from_last_event'] = snapshot_df['timestamp'].diff().dt.seconds
# drop the first row as it's dummy initial values
snapshot_df = snapshot_df.drop(index=0)

In [17]:
snapshot_df.head(10)

Unnamed: 0,0,1,2,3,4,5,32,6,7,8,...,changed_entity_id,changed_entity_value,sensor_change,second,minute,hour,dayofweek,weekofmonth,monthofyear,timedelta_from_last_event
1,19.0,18.5,19.5,21.5,21.5,0,2,0,0,0,...,1,18.5,True,46,0,0,10,2,6,46.0
2,18.5,18.5,19.5,21.5,21.5,0,2,0,0,0,...,0,18.5,True,47,0,0,10,2,6,0.0
3,18.5,18.0,19.5,21.5,21.5,0,2,0,0,0,...,1,18.0,True,17,1,0,10,2,6,30.0
4,18.5,18.0,19.5,21.5,21.5,0,2,0,0,0,...,2,19.5,True,18,1,0,10,2,6,0.0
5,18.5,18.0,20.0,21.5,21.5,0,2,0,0,0,...,2,20.0,True,48,1,0,10,2,6,29.0
6,19.0,18.0,20.0,21.5,21.5,0,2,0,0,0,...,0,19.0,True,49,1,0,10,2,6,1.0
7,19.0,18.0,19.5,21.5,21.5,0,2,0,0,0,...,2,19.5,True,4,2,0,10,2,6,14.0
8,18.5,18.0,19.5,21.5,21.5,0,2,0,0,0,...,0,18.5,True,4,2,0,10,2,6,0.0
9,18.5,18.0,20.0,21.5,21.5,0,2,0,0,0,...,2,20.0,True,20,2,0,10,2,6,15.0
10,19.0,18.0,20.0,21.5,21.5,0,2,0,0,0,...,0,19.0,True,20,2,0,10,2,6,0.0


In [18]:
snapshot_df[~snapshot_df['sensor_change']].head(100)

Unnamed: 0,0,1,2,3,4,5,32,6,7,8,...,changed_entity_id,changed_entity_value,sensor_change,second,minute,hour,dayofweek,weekofmonth,monthofyear,timedelta_from_last_event
271,16.5,16,17.5,21,20.5,1,2,0,0,0,...,32,2,False,59,20,3,10,2,6,0.0
295,16.5,16,17.5,21,20.5,1,3,1,1,1,...,32,3,False,24,25,3,10,2,6,0.0
345,16.5,15.5,17,20.5,20,1,3,1,1,0,...,33,2,False,16,45,3,10,2,6,0.0
383,16.5,15.5,17,20.5,20,1,3,1,1,1,...,33,3,False,29,49,3,10,2,6,0.0
385,16.5,15.5,17,20.5,20,1,2,1,0,1,...,32,2,False,23,54,3,10,2,6,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
57528,22.5,23,23.5,22.5,23,1,3,1,1,1,...,39,3,False,34,23,12,14,2,6,0.0
59487,23,23.5,23.5,24,24,1,3,1,1,1,...,38,2,False,24,30,15,14,2,6,0.0
59501,23,23.5,23.5,24,24,1,3,1,1,1,...,38,3,False,43,30,15,14,2,6,0.0
59970,23,22.5,23.5,24.5,24.5,1,3,1,1,1,...,40,2,False,32,42,17,14,2,6,0.0


In [19]:
snapshot_df.to_parquet(f'./data/{prefix}snapshot_dataset.parquet')

  table = self.api.Table.from_pandas(df, **from_pandas_kwargs)


# create sequence dataframe

In [55]:
import io
import torch
buffer = io.BytesIO()
# Helper function to convert NumPy arrays to bytes
def tensor_to_bytes(single_tensor):
    buffer.seek(0)
    buffer.truncate(0)
    torch.save(single_tensor, buffer)
    return buffer.getvalue()

# Define the window size (T minutes)
window_size = pd.Timedelta(minutes=5)
# Filter the dataframe to start T minutes after the first timestamp
start_time = snapshot_df['timestamp'].min() + window_size
filtered_df = snapshot_df[snapshot_df['timestamp'] >= start_time].copy()

# Define the Parquet file where we will write the output
output_file_prefix = './data/training_act/training_data_chunk_'

# Create an empty list to hold the rows (for batch processing)
rows = []

# Chunk size for writing Parquet in batches (adjust this based on your machine's memory)
chunk_size = 256

# Keep track of file count to generate unique file names
file_count = 1

last_processed = 0
from_row = last_processed - last_processed % chunk_size

# Iterate through each row in the filtered_df
for i, row in tqdm(filtered_df.iterrows()):
    if i < from_row: continue
    if row['sensor_change']: continue
    end_time = row['timestamp']
    if end_time < start_time: continue
    start_time_window = end_time - window_size
    
    # Get the past T minutes of snapshots (keep multi-dimensional structure)
    sequence = snapshot_df[(snapshot_df['timestamp'] > start_time_window) & 
                           (snapshot_df['timestamp'] < end_time)]
    # adding a new column to provide time relationship to the action event 
    sequence = sequence.copy()
    event_time = row['timestamp']
    sequence['time_from'] = (event_time - sequence['timestamp']).dt.seconds
    # Convert the sequence tensor to bytes
    # slim down
    columns_to_drop = ['second', 'minute', 'hour', 'dayofweek', 'weekofmonth', 'monthofyear','timestamp','changed_entity_id','changed_entity_value','sensor_change','timedelta_from_last_event']
    single_sequence = np.array(sequence.drop(columns=columns_to_drop).values)
    single_sequence_tensor = torch.tensor(single_sequence.astype(np.float32))
    sequence_bytes = tensor_to_bytes(single_sequence_tensor)
    
    # Add the sequence (as bytes) and the timestamp to the list of rows
    rows.append({
        'timestamp': end_time,
        'sequence': sequence_bytes,  # Storing as bytes
        'changed_entity_id': row['changed_entity_id'],
        'changed_entity_value': row['changed_entity_value'],
        'sensor_change': row['sensor_change'],
        'month': row['monthofyear'],
        'week': row['weekofmonth'],
        'day': row['dayofweek'],
        'hour': row['hour'],
        'min': row['minute'],
        'secs_from_last': row['timedelta_from_last_event'],
    })
    
    # Write in chunks to Parquet to avoid holding too much in memory
    if len(rows) >= chunk_size:
        # Convert rows to a DataFrame
        df_chunk = pd.DataFrame(rows)
        
        # Save the chunk to a separate Parquet file
        df_chunk.to_parquet(f'{output_file_prefix}{prefix}{file_count}.parquet', index=False)
        
        # Increment the file counter
        file_count += 1
        
        # Clear the rows list for the next chunk
        rows = []

# Write any remaining rows after the loop
if rows:
    df_chunk = pd.DataFrame(rows)
    df_chunk.to_parquet(f'{output_file_prefix}{prefix}{file_count}.parquet', index=False)

727717it [00:42, 17171.76it/s]


In [38]:
row = df_chunk.iloc[0]

In [54]:
import torch
idx = row.index.get_indexer(['min', 'hour', 'day', 'week', 'month'])
npdata = row.iloc[row.index.get_indexer(['min', 'hour', 'day', 'week', 'month'])]
print(npdata)
print(torch.tensor(row[['min', 'hour', 'day', 'week', 'month']]))

min      35
hour      7
day      30
week      5
month     7
Name: 0, dtype: object
tensor([35,  7, 30,  5,  7])


  print(torch.tensor(row[['min', 'hour', 'day', 'week', 'month']]))
