# Feature Engineering

### Main flavors of data and feature engineering
* Tabular: Dataframe model
    * "Typical" business data tables
* Batch/Tensor/Vector: Array model
    * Numeric data, timeseries, scientific data, audio, images, video, geodata, etc.
* Natural language
    * Batches of strings
    * Transformed into array data through NLP-specific techniques
    
<img src='images/flow-transform.png' width=800>

### "Must-haves" for feature engineering on large data

* Some data representation for the large dataset
    * Likely distributed, out-of-core, lazy, streaming, etc.
* Mechanism to load data from standard formats and locations into the representation
    * E.g., loading HDF5 in S3 or Parquet in HDFS
* APIs to apply feature engineering transformations
    * Mathematical operations
    * String, date, etc.
    * Custom ("user-defined")
* Integration to a modeling framework and/or ability to write to standard formats

### "Nice-to-haves"

* Intuitive data representation: similar to "small data" tooling
* APIs that resemble those of the most common industry-standard libraries
* Both modeling integration *and* ability to write out transformed data

FIXME: image

## Rise of Python

Python has become the *lingua franca* or dominant cross-cutting language for data science.

>
> __Note__ this is not to imply Python is the best or only language, or that other languages might not be intrinsically better or even, in the future, more successful. 
>
> There are wonderful things to be said for languages from Rust to R to Julia to many others, but for baseline data science capability and versatility in commercial enterprises today, it's Python
>

So we can turn to Python and look at the dominant libraries and tools within that ecosystem
* Tabular data: Pandas
* Array data: NumPy and derivatives like CuPy, JAX.numpy, etc.
* Basic modeling: scikit-learn, XGBoost, etc.
* Deep learning: PyTorch, Tensorflow
* NLP: SpaCy, NLTK, Huggingface, etc.

As we get into further parts of the workflow, like hyperparameter tuning or reinforcement learning there are more choices. 

For time reasons, we're going to stick to this core workflow of extraction through modeling and tuning, and not continue on into MLOps and deploment architectures, or meta-modeling platforms for experimentation, feature and provenance tracking, etc. That would be a bit too much to take on!

__Bottom line__: We want a data representation and APIs that are fairly close to the Pandas / NumPy / scikit-learn (SciPy) workflow. And we want elegant bridges into things like PyTorch, XGBoost, NLP tools, and tuning tools.

## Dask: SciPy at Scale

Luckily, Dask is well placed to solve this problem. 

While enterprises were still wrestling with JVM-based tools over the past 5 years, scientists, researchers, and others in the PyData and SciPy communities were building Dask, a pure-Python distributed compute platform that integrates deeply with all of the standard SciPy tools.

__What does this mean?__

We can take many of our local workflows to large-scale data via Dask with fairly minimal effort -- because under the hood, Dask is designed to use those "small data" structures in federation to create arbitrarily large ones.

FIXME: Dask DF and Array images




As an added bonus, due to the Dask architecture, it can leverage GPU-enabled versions of the underlying libraries.
* GPU + NumPy => CuPy
* GPU + Pandas => cuDF (RAPIDS CUDA dataframe)
* GPU + scikit-learn => cuML (RAPIDS CUDA algorithms)
etc.

### Using Dask for Feature Transformation

* We need to be able to load data in a standard format
* Manipulate it using dataframe or array APIs
* Write it and/or pass it efficiently to a modeling framework

In [1]:
from dask import dataframe as ddf
from dask import array as da
from dask.distributed import Client

client = Client(n_workers=2, threads_per_worker=1, memory_limit='1GB')

client

0,1
Client  Scheduler: tcp://127.0.0.1:58492  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 2  Cores: 2  Memory: 2.00 GB


In [2]:
df = ddf.read_csv('data/diamonds.csv')
df

Unnamed: 0_level_0,Unnamed: 0,carat,cut,color,clarity,depth,table,price,x,y,z
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
,int64,float64,object,object,object,float64,float64,int64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...


In [3]:
df.head()

Unnamed: 0.1,Unnamed: 0,carat,cut,color,clarity,depth,table,price,x,y,z
0,1,0.23,Ideal,E,SI2,61.5,55.0,326,3.95,3.98,2.43
1,2,0.21,Premium,E,SI1,59.8,61.0,326,3.89,3.84,2.31
2,3,0.23,Good,E,VS1,56.9,65.0,327,4.05,4.07,2.31
3,4,0.29,Premium,I,VS2,62.4,58.0,334,4.2,4.23,2.63
4,5,0.31,Good,J,SI2,63.3,58.0,335,4.34,4.35,2.75


In [4]:
df = df.drop(columns=['Unnamed: 0'])
df = df.categorize()

df

Unnamed: 0_level_0,carat,cut,color,clarity,depth,table,price,x,y,z
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
,float64,category[known],category[known],category[known],float64,float64,int64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...


In [5]:
prepared = ddf.reshape.get_dummies(df)

prepared.head()

Unnamed: 0,carat,depth,table,price,x,y,z,cut_Ideal,cut_Premium,cut_Good,...,color_G,color_D,clarity_SI2,clarity_SI1,clarity_VS1,clarity_VS2,clarity_VVS2,clarity_VVS1,clarity_I1,clarity_IF
0,0.23,61.5,55.0,326,3.95,3.98,2.43,1,0,0,...,0,0,1,0,0,0,0,0,0,0
1,0.21,59.8,61.0,326,3.89,3.84,2.31,0,1,0,...,0,0,0,1,0,0,0,0,0,0
2,0.23,56.9,65.0,327,4.05,4.07,2.31,0,0,1,...,0,0,0,0,1,0,0,0,0,0
3,0.29,62.4,58.0,334,4.2,4.23,2.63,0,1,0,...,0,0,0,0,0,1,0,0,0,0
4,0.31,63.3,58.0,335,4.34,4.35,2.75,0,0,1,...,0,0,1,0,0,0,0,0,0,0


# Modeling

If Dask makes an easy choice for some feature engineering and preprocessing, we're back into the deep end making choices for modeling.

Why?

Simply put, different kinds of modeling are handled best by different tools, so we have a lot of choices to make.

* "Classic" ML
    * Dask
    * Dask ML
    * XGBoost (with or without Dask)
* Unsupervised learning and dimensionality reduction
    * Dask supports some algorithms
    * For others, we may want to scale a deep-learning tool (PyTorch/Tensorflow)
        * Horovod
        * Ray SGD
* Deep learning (scaling PyTorch/TF easily)
    * Horovod
    * Ray SGD
    * Ray RLlib for deep reinforcement learning
* Simulations and agent-based models
    * Ray for stateful-agent simulations
    * (Dask is an option)
    
## Example: Linear Model w Dask

In [6]:
y = prepared.price.to_dask_array(lengths=True)
arr = prepared.drop('price', axis=1).to_dask_array(lengths=True)

arr[:4]

Unnamed: 0,Array,Chunk
Bytes,832 B,832 B
Shape,"(4, 26)","(4, 26)"
Count,7 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 832 B 832 B Shape (4, 26) (4, 26) Count 7 Tasks 1 Chunks Type float64 numpy.ndarray",26  4,

Unnamed: 0,Array,Chunk
Bytes,832 B,832 B
Shape,"(4, 26)","(4, 26)"
Count,7 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [7]:
arr[:4].compute()

array([[ 0.23, 61.5 , 55.  ,  3.95,  3.98,  2.43,  1.  ,  0.  ,  0.  ,
         0.  ,  0.  ,  1.  ,  0.  ,  0.  ,  0.  ,  0.  ,  0.  ,  0.  ,
         1.  ,  0.  ,  0.  ,  0.  ,  0.  ,  0.  ,  0.  ,  0.  ],
       [ 0.21, 59.8 , 61.  ,  3.89,  3.84,  2.31,  0.  ,  1.  ,  0.  ,
         0.  ,  0.  ,  1.  ,  0.  ,  0.  ,  0.  ,  0.  ,  0.  ,  0.  ,
         0.  ,  1.  ,  0.  ,  0.  ,  0.  ,  0.  ,  0.  ,  0.  ],
       [ 0.23, 56.9 , 65.  ,  4.05,  4.07,  2.31,  0.  ,  0.  ,  1.  ,
         0.  ,  0.  ,  1.  ,  0.  ,  0.  ,  0.  ,  0.  ,  0.  ,  0.  ,
         0.  ,  0.  ,  1.  ,  0.  ,  0.  ,  0.  ,  0.  ,  0.  ],
       [ 0.29, 62.4 , 58.  ,  4.2 ,  4.23,  2.63,  0.  ,  1.  ,  0.  ,
         0.  ,  0.  ,  0.  ,  1.  ,  0.  ,  0.  ,  0.  ,  0.  ,  0.  ,
         0.  ,  0.  ,  0.  ,  1.  ,  0.  ,  0.  ,  0.  ,  0.  ]])

In [8]:
from dask_ml.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(arr, y, test_size=0.1)

X_train

Unnamed: 0,Array,Chunk
Bytes,10.10 MB,10.10 MB
Shape,"(48546, 26)","(48546, 26)"
Count,7 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 10.10 MB 10.10 MB Shape (48546, 26) (48546, 26) Count 7 Tasks 1 Chunks Type float64 numpy.ndarray",26  48546,

Unnamed: 0,Array,Chunk
Bytes,10.10 MB,10.10 MB
Shape,"(48546, 26)","(48546, 26)"
Count,7 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [9]:
from dask_ml.linear_model import LinearRegression

lr = LinearRegression(solver='lbfgs', max_iter=10)
lr_model = lr.fit(X_train, y_train)



In [10]:
y_predicted = lr_model.predict(X_test)

y_predicted

Unnamed: 0,Array,Chunk
Bytes,43.15 kB,43.15 kB
Shape,"(5394,)","(5394,)"
Count,15 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 43.15 kB 43.15 kB Shape (5394,) (5394,) Count 15 Tasks 1 Chunks Type float64 numpy.ndarray",5394  1,

Unnamed: 0,Array,Chunk
Bytes,43.15 kB,43.15 kB
Shape,"(5394,)","(5394,)"
Count,15 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [16]:
from dask_ml.metrics import mean_squared_error
from math import sqrt

sqrt(mean_squared_error(y_test, y_predicted))

1111.5791557883683

In [17]:
client.close()

## What is Ray?

Ray (https://ray.io/) is a scale-out computing system designed for high-throughput, resilient stateful-actor algorithms. Ray was design at UC Berkeley's RISE lab under the supervision of some of the same team that created Apache Spark. 

Ray supports a number of languages at the API layer (Python and Java today) while most of the engine is C++. Ray's stateful actor support makes it strong in a number of key areas, like distributed SGD and reinforcement learning.

Let's try a reinforcement learning example!

> __Reinforcement Learning__ is a family of techniques that train *agents* to act in an *environment* to maximize *reward*. Famous examples include agents that can play chess, go, or Atari games ... but the field is hot because those agents can also be robots learning to do work, autonomous vehicles driving, or even virtual salesmen learning to get the best price possible from a customer.

Ray treats deep reinforcement learning (RL + deep learning) as a top-level use case and includes libraries that encapsulate many of the most popular algorithms.

Here, to create a simple example, we'll use __Deep Q-Learning__ (a foundational deep RL algorithm) to learn OpenAI's "cart-pole" environment, which you can visualize like this:

<video src='images/cpv1.mp4' controls="true">

In [12]:
import ray
import ray.rllib.agents.dqn as dqn

ray.shutdown()
ray.init()

Instructions for updating:
non-resource variables are not supported in the long term


2020-10-28 09:56:21,131	INFO services.py:1088 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


{'node_ip_address': '192.168.1.3',
 'raylet_ip_address': '192.168.1.3',
 'redis_address': '192.168.1.3:6379',
 'object_store_address': '/tmp/ray/session_2020-10-28_09-56-20_700672_11013/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2020-10-28_09-56-20_700672_11013/sockets/raylet',
 'webui_url': '127.0.0.1:8265',
 'session_dir': '/tmp/ray/session_2020-10-28_09-56-20_700672_11013',
 'metrics_export_port': 63265,
 'node_id': 'f05217739e4231826129d8367459b059bc6b2419'}

In [13]:
# Specifies the OpenAI Gym environment for CartPole, V1.
SELECT_ENV = "CartPole-v1"

# Number of training runs.
N_ITER = 50

# default configuration.
config = dqn.DEFAULT_CONFIG.copy()

# Suppress too many messages.
config["log_level"] = "WARN"

# Use > 1 for more CPU cores, e.g., over a cluster.
config['num_workers'] = 8

# Describe network
config['model']['fcnet_hiddens'] = [40,20]

# Don't pin a CPU core to each worker (allows more workers).
config['num_cpus_per_worker'] = 0
checkpoint_dir = 'checkpoints'

In [14]:
trainer = dqn.DQNTrainer(config, SELECT_ENV)

2020-10-28 09:56:46,225	INFO trainer.py:580 -- Tip: set framework=tfe or the --eager flag to enable TensorFlow eager execution
2020-10-28 09:56:46,225	INFO trainer.py:605 -- Current log_level is WARN. For more information, set 'log_level': 'INFO' / 'DEBUG' or use the -v and -vv flags.
[2m[36m(pid=11047)[0m Instructions for updating:
[2m[36m(pid=11047)[0m non-resource variables are not supported in the long term
[2m[36m(pid=11048)[0m Instructions for updating:
[2m[36m(pid=11048)[0m non-resource variables are not supported in the long term
[2m[36m(pid=11050)[0m Instructions for updating:
[2m[36m(pid=11050)[0m non-resource variables are not supported in the long term
[2m[36m(pid=11051)[0m Instructions for updating:
[2m[36m(pid=11051)[0m non-resource variables are not supported in the long term
[2m[36m(pid=11052)[0m Instructions for updating:
[2m[36m(pid=11052)[0m non-resource variables are not supported in the long term
[2m[36m(pid=11053)[0m Instructions f

In [15]:
fmt = '{:3d},{:8.4f},{:8.4f},{:8.4f}'
last_checkpoint = ''
for n in range(N_ITER):
    result = trainer.train()
    min  = result['episode_reward_min']
    mean = result['episode_reward_mean']
    max  = result['episode_reward_max']
    last_checkpoint = trainer.save(checkpoint_dir)
    print(fmt.format(n, min, mean, max))
print(f'last checkpoint file: {last_checkpoint}')

Instructions for updating:
Prefer Variable.assign which has equivalent behavior in 2.X.


[2m[36m(pid=11051)[0m Instructions for updating:
[2m[36m(pid=11051)[0m Prefer Variable.assign which has equivalent behavior in 2.X.
[2m[36m(pid=11048)[0m Instructions for updating:
[2m[36m(pid=11048)[0m Prefer Variable.assign which has equivalent behavior in 2.X.
[2m[36m(pid=11050)[0m Instructions for updating:
[2m[36m(pid=11050)[0m Prefer Variable.assign which has equivalent behavior in 2.X.
[2m[36m(pid=11047)[0m Instructions for updating:
[2m[36m(pid=11047)[0m Prefer Variable.assign which has equivalent behavior in 2.X.
[2m[36m(pid=11054)[0m Instructions for updating:
[2m[36m(pid=11054)[0m Prefer Variable.assign which has equivalent behavior in 2.X.
[2m[36m(pid=11052)[0m Instructions for updating:
[2m[36m(pid=11052)[0m Prefer Variable.assign which has equivalent behavior in 2.X.
[2m[36m(pid=11049)[0m Instructions for updating:
[2m[36m(pid=11049)[0m Prefer Variable.assign which has equivalent behavior in 2.X.
[2m[36m(pid=11053)[0m Instructi

  0,  9.0000, 18.8913, 48.0000
  1,  8.0000, 17.1200, 63.0000
  2,  8.0000, 13.8300, 33.0000
  3,  8.0000, 12.0774, 25.0000
  4,  8.0000, 11.4300, 20.0000
  5,  8.0000, 10.2950, 17.0000
  6,  8.0000,  9.4206, 13.0000
  7,  8.0000,  9.5000, 13.0000
  8,  8.0000,  9.4860, 11.0000
  9,  8.0000, 11.0215, 17.0000
 10,  8.0000, 11.8200, 17.0000
 11,  8.0000, 12.3300, 21.0000
 12,  8.0000, 14.2138, 24.0000
 13,  8.0000, 15.1300, 24.0000
 14,  8.0000, 16.8200, 28.0000
 15,  9.0000, 18.2566, 29.0000
 16,  8.0000, 16.0400, 29.0000
 17,  8.0000, 18.0500, 33.0000
 18,  8.0000, 20.6800, 33.0000
 19,  8.0000, 20.6400, 33.0000
 20,  8.0000, 18.5900, 33.0000
 21,  8.0000, 16.4400, 31.0000
 22,  8.0000, 19.3700, 33.0000
 23,  8.0000, 17.4800, 33.0000
 24,  8.0000, 18.3100, 39.0000
 25,  8.0000, 21.3000, 39.0000
 26,  8.0000, 19.3800, 33.0000
 27,  8.0000, 18.6300, 37.0000
 28,  8.0000, 19.7700, 37.0000
 29,  9.0000, 22.1900, 35.0000
 30,  8.0000, 21.4900, 34.0000
 31,  8.0000, 19.2200, 40.0000
 32,  8.

__Note__: If you've worked with RL and OpenAI gym before, you may realize these are not particularly impressive numbers, and not a particularly impressive algorithm.

Don't worry: __Ray RLlib__ includes a variety of much more powerful algorithms which achieve better results. We'll try one of them -- Proximal Policy Optimization (PPO) in the lab exercise.

In [18]:
ray.shutdown()