# Acknowledgement

Code in this notebook is based on the blog posts by the author of `tsfresh`, Nils Braun:

- tsfresh on Large Data Samples [Part 1](https://towardsdatascience.com/time-series-feature-extraction-on-really-large-data-samples-b732f805ba0e) & [Part 2](https://towardsdatascience.com/tsfresh-on-large-data-samples-part-ii-4d6843155dfc)


# How does `tsfresh` handle large data?

If the size of the data is very large (large number of time series; each time series consisting of large number of samples), we face two types of challenges: 

- Larger execution time while computing the features (Compute bound problem)
- Need for larger RAM (Memory bound problem)

`tsfresh` addresses both in the following way. 

### Large Execution Time
**If data fits into main memory(RAM)**, `tsfresh` solves the large execution time problem, by utlizing
- **multiple processors in a single machine** (using `multiprocessing` package). This is the default option. Whenever, `tsfresh` package is used, multiprocessing is switched on by default. Number of processors/cores to be used can be cotrolled using `n_jobs` flag.
- **multiple processors spread across multiple machines** (Cluster/Distributed Computing). `tsfresh.utilities.distribution` module consists of distributors including Dask based `tsfresh.utilities.distribution.ClusterDaskDistributor`. `ClusterDaskDistributor` distributes the job across various workers in a Dask cluster.

### Need for larger memory
**If data doesn't fit into main memory(RAM)**, `tsfresh` utilizes `Dask` for **out of core computation** and for **distributing** data across multiple machines in a cluster. `tsfresh` provides convenience function for this: `tsfresh.convenience.bindings.dask_feature_extraction_on_chunk`.

`tsfresh` provides convenience function which addresses large data issue using **Apache Spark** as well (`tsfresh.convenience.bindings.spark_feature_extraction_on_chunk()`). However, this notebook focuses only on Dask.

In [21]:
import glob
import os
import sys

import pandas as pd
import numpy as np

import dask
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd

def get_segment_id_from_path(df, path):
    """
    Returns the segment_id from the path of the file 
    """
    df.segment_id = df.segment_id.str.replace(path, "", regex=False)
    df.segment_id = df.segment_id.str.replace(".csv", "", regex=False)
    df.segment_id = df.segment_id.astype(np.int64)
    
    return df

def append_time_column(df):
    df["time"] = range(0, len(df))
    
    return df

DATA_DIR = "/datadrive/arnab/vssexclude/kaggle/volcano/data/train"

# Define the datatypes for different sensor data
data_types = {"sensor_1" : np.float32, 
                 "sensor_2" : np.float32, 
                 "sensor_3" : np.float32,
                 "sensor_4" : np.float32,
                 "sensor_5" : np.float32,
                 "sensor_6" : np.float32,
                 "sensor_7" : np.float32,
                 "sensor_8" : np.float32,
                 "sensor_9" : np.float32,
                 "sensor_10" : np.float32}

# Data fits into memory, but needs distribute FE job

### When to use?

- When data fits into memory, but we need to distribute the feature engineering job across a distributed Cluster to introduce more parallelism.

### Steps
- Start a Dask Cluster
- Create an instance of `ClusterDaskDistributor` and connect to the Dask Scheduler created in above
- Read Data (Pandas DataFrame)
- Pass the `ClusterDaskDistributor` instance created above to the `extract_features` function of `tsfresh`
- Extract features. Extracted features would also be Pandas DataFrame

### 1. Start Dask Cluster

<img src="../images/dask_architechture_diagram.png" width="600" height="200" style="border-style: solid;">

Create a LocalCluster with client, scheduler and worker running on the same machine. The scheduler will run on port 8786.

By specifying `n_worker=8`, we have asked to dask to start `8` independent python processes. Based on the nature of the cluster, they may run in the same machine or different machines. In our case, the processes have been started in this machine.

In [22]:
cluster = LocalCluster(n_workers=8, 
                       threads_per_worker=1, 
                       scheduler_port=8786, 
                       memory_limit='2GB')

cluster.scheduler_address

'tcp://127.0.0.1:8786'

### 2. Connect the `ClusterDaskDistributor` to the Dask Cluster created above. 

`ClusterDaskDistributor` is nothing but a `dask.distributed.Client`. I am using a local Dask cluster here, but, ideally it should be remote cluster. In fact, `tsfresh` provides an interface called `LocalDaskDistributor` for this purpose.

In [23]:
from tsfresh.utilities.distribution import ClusterDaskDistributor

# Connect to Dask Scheduler
dask_distributor = ClusterDaskDistributor(address="127.0.0.1:8786")

dask_distributor.client

0,1
Client  Scheduler: tcp://127.0.0.1:8786  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 8  Cores: 8  Memory: 16.00 GB


### 3. Read Data

Both input data and output data (extracted features) is Pandas DataFrame, Hence, entire data must fit into the memory

In [24]:
df = pd.DataFrame()
for name in glob.glob(f"{DATA_DIR}/140*"):
    temp_df = pd.read_csv(name, 
                          dtype=data_types, 
                          usecols=["sensor_1", "sensor_4"], 
                          nrows=1000)
    
    # Extract name of the segment from the file name
    segment_id = int(name.split(".")[0].split("/")[-1])
    temp_df["segment_id"] = segment_id
    print(f"Reading data for segment: {segment_id}")
    
    # Create a column named time
    temp_df["time"] = range(0, len(temp_df))
    
    df = df.append(temp_df)
    
print("\n")    
print(f"Shape of the dataframe consisting of all data from above files: {df.shape}")

Reading data for segment: 1400253000
Reading data for segment: 140031872
Reading data for segment: 1400727315
Reading data for segment: 1400929225
Reading data for segment: 1402556914
Reading data for segment: 1402674973
Reading data for segment: 1402914692
Reading data for segment: 1403005697
Reading data for segment: 1403222059
Reading data for segment: 1403244730
Reading data for segment: 1403440092
Reading data for segment: 140348256
Reading data for segment: 1403947680
Reading data for segment: 1404122310
Reading data for segment: 1404179874
Reading data for segment: 1404322654
Reading data for segment: 1404502479
Reading data for segment: 1405189645
Reading data for segment: 1405443107
Reading data for segment: 1406234149
Reading data for segment: 1406456924
Reading data for segment: 1406626451
Reading data for segment: 1406938061
Reading data for segment: 1407084157
Reading data for segment: 1407094442
Reading data for segment: 1407261706
Reading data for segment: 1408285202
Rea

In [25]:
df.head()

Unnamed: 0,sensor_1,sensor_4,segment_id,time
0,-486.0,-516.0,1400253000,0
1,-567.0,-591.0,1400253000,1
2,-631.0,-620.0,1400253000,2
3,-744.0,-550.0,1400253000,3
4,-725.0,-475.0,1400253000,4


### 4, 5, 6. Extract fetuares using `tsfresh`

Pass the `ClusterDaskDistributor` instance created above to the `extract_features` function

In [None]:
from tsfresh.feature_extraction import extract_features
from tsfresh.feature_extraction.settings import ComprehensiveFCParameters

extracted_features = extract_features(timeseries_container=df,
                     column_id='segment_id', column_sort='time',
                     default_fc_parameters=ComprehensiveFCParameters(),
                     distributor=dask_distributor)

In [None]:
extracted_features.head()

### 7. Close the Dask Client and Cluster

In [9]:
dask_distributor.close()
cluster.close()

In [10]:
del df, dask_distributor

# Data doesn't fit intor memory

### When to use
This is applicable when size of the data is large enough to fit into the main memory (RAM).

### Steps
- Create a Dask Cluster
- Create a `dask.distributed.Client` locally and connect to the Dask cluster above.
- Read Data using Dask DataFrame. Dask utilizes out of core computing and hence Data is loaded into RAM chunk by chunk. This is how larger than memory is handled.
- Format data into the form `tsfresh` expects it to be.
- Invoke feature extraction passing the formatted data into `dask_feature_extraction_on_chunk()`
- Pivot the output of above state. This is also a Dask DataFrame. The features are extracted and loaded into RAM only when `compute()` is invoked.
- Invoke compute(). Output would be a Pandas DataFrame

### 1, 2. Create a Dask Client and connect it to a Dask Cluster

In [11]:
cluster = LocalCluster(n_workers=8, 
                       threads_per_worker=1, 
                       scheduler_port=8786, 
                       memory_limit='2GB')

client = Client(cluster)

client

0,1
Client  Scheduler: tcp://127.0.0.1:8786  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 8  Cores: 8  Memory: 16.00 GB


### 3. Read Data using a Dask DataFrame

In [12]:
ddf = dd.read_csv(
    urlpath=f"{DATA_DIR}/140*.csv", 
    blocksize=None, 
    dtype=data_types,
    include_path_column='segment_id')

# Insert a new column with segment_id along with the values from 10 sensors
ddf = ddf.map_partitions(get_segment_id_from_path, f"{DATA_DIR}/")

# Add a column named time with ascending values staring from 0 representing time
ddf = ddf.map_partitions(append_time_column)

ddf = ddf.loc[0:999, :]

  df.segment_id = df.segment_id.str.replace(".csv", "")


In [13]:
ddf.head()

Unnamed: 0,sensor_1,sensor_2,sensor_3,sensor_4,sensor_5,sensor_6,sensor_7,sensor_8,sensor_9,sensor_10,segment_id,time
0,-486.0,34.0,-87.0,-516.0,234.0,-785.0,522.0,473.0,238.0,2802.0,1400253000,0
1,-567.0,95.0,-92.0,-591.0,231.0,-774.0,589.0,210.0,252.0,2678.0,1400253000,1
2,-631.0,261.0,-120.0,-620.0,212.0,-787.0,433.0,120.0,276.0,2517.0,1400253000,2
3,-744.0,262.0,-215.0,-550.0,174.0,-890.0,322.0,-240.0,334.0,2323.0,1400253000,3
4,-725.0,318.0,-193.0,-475.0,131.0,-806.0,267.0,-14.0,365.0,2089.0,1400253000,4


### 4. Format data into the form `tsfresh` expects it to be.

In [14]:
ddf = ddf.melt(id_vars=["segment_id", "time"],  
               value_vars=['sensor_1', 'sensor_4'],  
               var_name="sensor_type", 
               value_name="sensor_value")

ddf_grouped = ddf.groupby(["segment_id", "sensor_type"])

### 5. Invoke feature extraction

In [15]:
from tsfresh.convenience.bindings import dask_feature_extraction_on_chunk

from tsfresh.feature_extraction.settings import MinimalFCParameters

features = dask_feature_extraction_on_chunk(ddf_grouped, 
                                            column_id="segment_id", 
                                            column_kind="sensor_type", 
                                            column_value="sensor_value", 
                                            default_fc_parameters=MinimalFCParameters())

In [16]:
features.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,segment_id,variable,value
segment_id,sensor_type,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
1402674973,sensor_1,0,1402674973,sensor_1__sum_values,4700.0
1402674973,sensor_1,1,1402674973,sensor_1__median,0.5
1402674973,sensor_1,2,1402674973,sensor_1__mean,4.7
1402674973,sensor_1,3,1402674973,sensor_1__length,1000.0
1402674973,sensor_1,4,1402674973,sensor_1__standard_deviation,225.85939


### 6. Pivot extracted features

In [17]:
features = features.categorize(columns=["variable"])
features = features.reset_index(drop=True)


feature_table = features.pivot_table(index="segment_id",
                                     columns="variable",
                                     values="value",
                                     aggfunc="sum")

### 7. Do the actual computation

Till step 6, the output was a Dask DataFrame. Once we invoke `compute()` the features will actually be computed and the extracted featured will be loaded into the memory. If the size of the extracted features is huge, we may encounter out of memory error.

In [18]:
df_features = feature_table.compute()

In [19]:
df_features.head()

variable,sensor_1__sum_values,sensor_1__median,sensor_1__mean,sensor_1__length,sensor_1__standard_deviation,sensor_1__variance,sensor_1__maximum,sensor_1__minimum,sensor_4__sum_values,sensor_4__median,sensor_4__mean,sensor_4__length,sensor_4__standard_deviation,sensor_4__variance,sensor_4__maximum,sensor_4__minimum
segment_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1
140031872,-54490.0,-43.0,-54.490002,1000.0,211.92836,44913.63,477.0,-618.0,-27692.0,-60.0,-27.691999,1000.0,368.852356,136052.1,1253.0,-1156.0
140348256,-12608.0,4.5,-12.608,1000.0,252.768646,63891.99,639.0,-650.0,2447.0,3.0,2.447,1000.0,209.270218,43794.02,641.0,-567.0
140851065,-3568.0,11.0,-3.568,1000.0,553.871704,306773.9,1362.0,-1421.0,15264.0,-11.5,15.264,1000.0,840.077515,705730.2,2239.0,-2409.0
1400253000,-5286.0,19.0,-5.286,1000.0,370.556244,137311.9,1117.0,-1364.0,-16044.0,-38.5,-16.044001,1000.0,364.98819,133216.4,1109.0,-929.0
1400727315,354441.0,976.5,354.44101,1000.0,11997.532227,143940800.0,32767.0,-32767.0,-37289.0,406.5,-37.289001,1000.0,12828.428711,164568600.0,32767.0,-32767.0


### 8. Close the Dask Cluster & Client

In [20]:
client.close()
cluster.close()