In [1]:
import pandas as pd
import numpy as np
import gc
import time
import os
import joblib
from tqdm.auto import tqdm 

from math import pi, sqrt, exp
import torch
from torch.utils.data import Dataset
from functools import wraps
from pyarrow.parquet import ParquetFile
import pyarrow as pa 
from sklearn import preprocessing

In [2]:
def track_time(f):
    @wraps(f)
    def wrap(*args, **kw):
        ts = time.time()
        result = f(*args, **kw)
        te = time.time()
        print('func:%r took: %2.4f sec' % \
          (f.__name__, te-ts))
        return result
    return wrap

In [4]:
class PATHS:
    MAIN_DIR = "../data/"
    SUBMISSION = MAIN_DIR + "sample_submission.csv"
    TRAIN_EVENTS = MAIN_DIR + "train_events.csv"
    TRAIN_SERIES = MAIN_DIR + "train_series.parquet"
    TEST_SERIES = MAIN_DIR + "test_series.parquet"
class CFG:
    DEMO_MODE = True
class data_reader:
    def __init__(self, demo_mode):
        super().__init__()
        self.names_mapping = {
            "submission" : {"path" : PATHS.SUBMISSION, "is_parquet" : False, "has_timestamp" : False}, 
            "train_events" : {"path" : PATHS.TRAIN_EVENTS, "is_parquet" : False, "has_timestamp" : True},
            "train_series" : {"path" : PATHS.TRAIN_SERIES, "is_parquet" : True, "has_timestamp" : True},
            "test_series" : {"path" : PATHS.TEST_SERIES, "is_parquet" : True, "has_timestamp" : True}
        }
        self.valid_names = ["submission", "train_events", "train_series", "test_series"]
        self.demo_mode = demo_mode
    
    def verify(self, data_name):
        "function for data name verification"
        if data_name not in self.valid_names:
            print("PLEASE ENTER A VALID DATASET NAME, VALID NAMES ARE : ", self.valid_names)
        return
    
    def cleaning(self, data):
        "cleaning function : drop na values"
        before_cleaning = len(data)
        print("Number of missing timestamps : ", len(data[data["timestamp"].isna()]))
        data = data.dropna(subset=["timestamp"])
        after_cleaning = len(data)
        print("Percentage of removed steps : {:.1f}%".format(100 * (before_cleaning - after_cleaning) / before_cleaning) )
        return data
    
    @staticmethod
    def reduce_memory_usage(data):
        "iterate through all the columns of a dataframe and modify the data type to reduce memory usage."
        start_mem = data.memory_usage().sum() / 1024**2
        print('Memory usage of dataframe is {:.2f} MB'.format(start_mem))
        for col in data.columns:
            col_type = data[col].dtype    
            if col_type != object:
                c_min = data[col].min()
                c_max = data[col].max()
                if str(col_type)[:3] == 'int':
                    if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                        data[col] = data[col].astype(np.int8)
                    elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                        data[col] = data[col].astype(np.int16)
                    elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                        data[col] = data[col].astype(np.int32)
                    elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                        data[col] = data[col].astype(np.int64)  
                else:
                    if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                        data[col] = data[col].astype(np.float16)
                    elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                        data[col] = data[col].astype(np.float32)
                    else:
                        data[col] = data[col].astype(np.float64)
            else:
                data[col] = data[col].astype('category')

        end_mem = data.memory_usage().sum() / 1024**2
        print('Memory usage after optimization is: {:.2f} MB'.format(end_mem))
        print('Decreased by {:.1f}%'.format(100 * (start_mem - end_mem) / start_mem))
        return data
    
    def load_data(self, data_name):
        "function for data loading"
        self.verify(data_name)
        data_props = self.names_mapping[data_name]
        if data_props["is_parquet"]:
            if self.demo_mode:
                pf = ParquetFile(data_props["path"]) 
                demo_steps = next(pf.iter_batches(batch_size=20_000)) 
                data = pa.Table.from_batches([demo_steps]).to_pandas()
            else:
                data = pd.read_parquet(data_props["path"])
        else:
            if self.demo_mode:
                data = pd.read_csv(data_props["path"], nsteps=20_000)
            else:
                data = pd.read_csv(data_props["path"])
                
        gc.collect()
        if data_props["has_timestamp"]:
            print('cleaning')
            data = self.cleaning(data)
            gc.collect()
        data = self.reduce_memory_usage(data)
        return data

In [17]:
scaler = preprocessing.MinMaxScaler()
columns_to_scale = ['anglez', 'enmo']
class DataParser:
    def __init__(self, reader, data_dir: str = "../data/") -> None:
        self.reader = reader
        self.data_dir = data_dir

    @track_time
    def _clean(self, df: pd.DataFrame) -> pd.DataFrame:
        if "step" in df.columns and "timestamp" in df.columns:
            df = df.dropna(subset=["step", "timestamp"])
            return df
        else:
            raise KeyError("Missing columns: either `step` or `timestamp` not exist.")

    @track_time
    def _transform(self, df: pd.DataFrame) -> pd.DataFrame:
        if "night" in df.columns:
            df["night"] = df["night"].astype(np.int16)

        if "step" in df.columns and "timestamp" in df.columns:
            df["step"] = df["step"].astype(np.int32)
            df["timestamp"] = pd.to_datetime(df["timestamp"], format="%Y-%m-%dT%H:%M:%S%z", utc=True)

        if "anglez" and "enmo" in df.columns:
            normalized_data = scaler.fit_transform(df[columns_to_scale])
            df['anglez_norm'] = normalized_data[:, 0]
            df['enmo_norm'] = normalized_data[:, 1]
            
        df['hour'] = df['timestamp'].dt.hour

        return df

    def load_data(self, file_name: str, file_type: str, flag: bool) -> pd.DataFrame:
        if flag:
            df = self.reader.load_data(data_name=file_name.split(".")[0])
            df = self._clean(df)
            df = self._transform(df)
        elif file_type == "parquet":
            df = pd.read_parquet(os.path.join(self.data_dir, file_name))
            df = self._clean(df)
            df = self._transform(df)
        else:
            df = pd.read_csv(os.path.join(self.data_dir, file_name))
            df = self._clean(df)
            
        return df

In [18]:
reader = data_reader(demo_mode=False)
parser = DataParser(reader)
train_series = parser.load_data("train_series.parquet", "parquet", True)
events = parser.load_data("train_events.csv", "csv", True)
ids = train_series.series_id.unique()
gc.collect()

cleaning
Number of missing timestamps :  0
Percentage of removed steps : 0.0%
Memory usage of dataframe is 3416.54 MB
Memory usage after optimization is: 2059.05 MB
Decreased by 39.7%
func:'_clean' took: 1.1989 sec
func:'_transform' took: 385.9056 sec
cleaning
Number of missing timestamps :  4923
Percentage of removed steps : 33.9%
Memory usage of dataframe is 0.44 MB
Memory usage after optimization is: 0.50 MB
Decreased by -13.5%
func:'_clean' took: 0.0005 sec
func:'_transform' took: 0.0257 sec


0

In [20]:
targets = []
data = []
for viz_id in tqdm(ids):
    viz_targets = []
    viz_events = events[events.series_id == viz_id]
    viz_series = train_series.loc[(train_series.series_id==viz_id)].copy().reset_index()
    for i in range(len(viz_events)-1):
        if viz_events.iloc[i].event =='onset' and viz_events.iloc[i+1].event =='wakeup' and viz_events.iloc[i].night==viz_events.iloc[i+1].night:
            start,end = viz_events.timestamp.iloc[i],viz_events.timestamp.iloc[i+1]
            start_id = viz_series.loc[viz_series.timestamp ==start].index.values[0]
            end_id = viz_series.loc[viz_series.timestamp ==end].index.values[0]
            viz_targets.append((start_id,end_id))
    targets.append(viz_targets)
    data.append(viz_series[['anglez_norm', 'enmo_norm', 'hour', 'step']])
joblib.dump((targets,data,ids), './train_data.pkl')
len(data)

  0%|          | 0/277 [00:00<?, ?it/s]

277

In [13]:
SIGMA = 720
SAMPLE_FREQ = 12
class SleepDataset(Dataset):
    def __init__(
        self,
        file
    ):
        self.targets,self.data,self.ids = joblib.load(file)
        self.X = []
        self.y = []
        
        for index in tqdm(range(len(self.targets))):
            X = self.data[index][["anglez_norm", "enmo_norm", "hour"]]
            target_guassian = np.zeros((len(X), 2))
            X_anglez = self.downsample_seq_generate_features(X.values[:, 0], SAMPLE_FREQ, std_only=True)
            X_enmo   = self.downsample_seq_generate_features(X.values[:, 1], SAMPLE_FREQ)
            X_hour   = self.downsample_seq_generate_features(X.values[:, 2], SAMPLE_FREQ, is_hour=True)
            X = np.concatenate([X_anglez, X_enmo, X_hour], -1)
            X = torch.from_numpy(X)
            y = self.targets[index]

            for s, e in y:
                try:
                    st1, st2 = max(0, s - SIGMA // 2), s + SIGMA // 2 + 1
                    ed1, ed2 = e - SIGMA // 2, min(len(X), e + SIGMA // 2 + 1)
                    if st1 > st2:
                        st1, st2 = st2, st1
                    if ed1 > ed2:
                        ed1, ed2 = ed2, ed1
                    target_guassian[st1:st2, 0] = self.gauss()[st1 - (s - SIGMA // 2):]
                    target_guassian[ed1:ed2, 1] = self.gauss()[:SIGMA + 1 - ((e + SIGMA // 2 + 1) - ed2)]
                except:
                    pass
            y = np.dstack([self.downsample_seq(target_guassian[:, i], SAMPLE_FREQ) for i in range(target_guassian.shape[1])])[0]

            self.X.append(X)
            self.y.append(y)
            gc.collect()
            

    def downsample_seq_generate_features(self, feat, downsample_factor=SAMPLE_FREQ, std_only=False, is_hour=False):
        if len(feat) % downsample_factor != 0:
            feat = np.concatenate([feat, np.zeros(downsample_factor-((len(feat))%downsample_factor))+feat[-1]])

        feat = np.reshape(feat, (-1, downsample_factor))
        
        if is_hour:
            feat_hour = np.max(feat, 1)
            hour_sin = np.sin(feat_hour * (2 * np.pi / 24))
            hour_cos = np.cos(feat_hour * (2 * np.pi / 24))
            return np.dstack([hour_sin, hour_cos])[0]

        feat_mean   = np.mean(feat,1)
        feat_std    = np.std(feat,1)
        feat_median = np.median(feat,1)
        feat_max    = np.max(feat,1)
        feat_min    = np.min(feat,1)

        if std_only:
            return np.dstack([feat_std])[0]
    
        return np.dstack([feat_mean, feat_std, feat_median, feat_max, feat_min])[0]
    
    def downsample_seq(self,feat, downsample_factor = SAMPLE_FREQ):
        if len(feat)%downsample_factor!=0:
            feat = np.concatenate([feat,np.zeros(downsample_factor-((len(feat))%downsample_factor))+feat[-1]])
        feat = np.reshape(feat, (-1,downsample_factor))
        feat_mean = np.mean(feat,1)
        return feat_mean

    def gauss(self,n=SIGMA,sigma=SIGMA*0.15):
        r = range(-int(n/2),int(n/2)+1)
        return [1 / (sigma * sqrt(2*pi)) * exp(-float(x)**2/(2*sigma**2)) for x in r]

    def __len__(self):
        return len(self.X)

    def __getitem__(self, index):
        return self.X[index], self.y[index]

In [14]:
train_ds = SleepDataset('./train_data.pkl')
joblib.dump(train_ds, "./train_ds.pkl")

  0%|          | 0/277 [00:00<?, ?it/s]

['./train_ds.pkl']