In [2]:
from dask.distributed import Client, LocalCluster
import dask
import subprocess
import dask.dataframe as dd
import pandas as pd
from dask_ml.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from dask_ml.preprocessing import Categorizer, OrdinalEncoder, MinMaxScaler
from dask_ml.impute import SimpleImputer
import pickle
import time

# pandas display configuration
pd.set_option('display.float_format','{:.4f}'.format)
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 1000)

In [3]:
# arguments
DATA_PATH = './data/iris/iris.data'

NAMES = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'species']

CAT_COLS = ['species']

NUMERIC_COLS = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']

TRAIN_PCT = .8

NUM_TRAINING_FILES = 3

NUM_TEST_FILES = 2

OUT_TRAIN_PATH = './train_parquet'

OUT_TEST_PATH = './test_parquet'

In [4]:
cmd = "hostname --all-ip-addresses"
process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
output, error = process.communicate()
IPADDR = str(output.decode()).split()[0]

cluster = LocalCluster(ip=IPADDR)
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://172.17.0.2:38993  Dashboard: http://172.17.0.2:8787/status,Cluster  Workers: 8  Cores: 40  Memory: 270.39 GB


In [5]:
df = dd.read_csv(DATA_PATH, names=NAMES, header=None)

In [6]:
print(df.head())

   sepal_length  sepal_width  petal_length  petal_width      species
0        5.1000       3.5000        1.4000       0.2000  Iris-setosa
1        4.9000       3.0000        1.4000       0.2000  Iris-setosa
2        4.7000       3.2000        1.3000       0.2000  Iris-setosa
3        4.6000       3.1000        1.5000       0.2000  Iris-setosa
4        5.0000       3.6000        1.4000       0.2000  Iris-setosa


In [7]:
# initially shuffle the data
df = df.sample(frac=1.0, random_state=123)

In [8]:
print(df.head())

     sepal_length  sepal_width  petal_length  petal_width         species
17         5.1000       3.5000        1.4000       0.3000     Iris-setosa
44         5.1000       3.8000        1.9000       0.4000     Iris-setosa
7          5.0000       3.4000        1.5000       0.2000     Iris-setosa
38         4.4000       3.0000        1.3000       0.2000     Iris-setosa
105        7.6000       3.0000        6.6000       2.1000  Iris-virginica


In [9]:
# split into train/test
train_idx = int(len(df) * TRAIN_PCT)
train_df, test_df = df.random_split([TRAIN_PCT, 1 - TRAIN_PCT], random_state=123)

In [10]:
print(train_df.head())

     sepal_length  sepal_width  petal_length  petal_width         species
17         5.1000       3.5000        1.4000       0.3000     Iris-setosa
44         5.1000       3.8000        1.9000       0.4000     Iris-setosa
7          5.0000       3.4000        1.5000       0.2000     Iris-setosa
38         4.4000       3.0000        1.3000       0.2000     Iris-setosa
105        7.6000       3.0000        6.6000       2.1000  Iris-virginica


In [11]:
print(test_df.head())

     sepal_length  sepal_width  petal_length  petal_width          species
25         5.0000       3.0000        1.6000       0.2000      Iris-setosa
149        5.9000       3.0000        5.1000       1.8000   Iris-virginica
82         5.8000       2.7000        3.9000       1.2000  Iris-versicolor
56         6.3000       3.3000        4.7000       1.6000  Iris-versicolor
55         5.7000       2.8000        4.5000       1.3000  Iris-versicolor


In [12]:
# scale numeric columns and label encode categorical columns
zero_imputer = SimpleImputer(strategy='constant', fill_value=0)
min_max_scaler = MinMaxScaler()

numeric_transformer = Pipeline(
    steps=[
        ('zero_imputer', zero_imputer),
        ('min_max_scaler', min_max_scaler)
    ]
)

missing_category_imputer = SimpleImputer(strategy='constant', fill_value='missing')
categorizer = Categorizer()
ordinal_encoder = OrdinalEncoder()
cat_transformer = Pipeline(
    steps=[
        ('missing_category_imputer', missing_category_imputer),
        ('categorizer', categorizer),
        ('ordinal', ordinal_encoder)
    ]
)

preprocessor = ColumnTransformer(
    transformers=[
        ('numeric', numeric_transformer, NUMERIC_COLS),
        ('categorical', cat_transformer, CAT_COLS)
    ]
)

pipe = Pipeline(steps=[('preprocessor', preprocessor)])

pipe.fit(train_df)
train_df = pipe.transform(train_df)
test_df = pipe.transform(test_df)

In [13]:
train_df.head()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,species
17,0.2,0.6818,0.0678,0.0833,0
44,0.2,0.8182,0.1525,0.125,0
7,0.1714,0.6364,0.0847,0.0417,0
38,0.0,0.4545,0.0508,0.0417,0
105,0.9143,0.4545,0.9492,0.8333,1


In [14]:
test_df.head()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,species
25,0.1714,0.4545,0.1017,0.0417,0
149,0.4286,0.4545,0.6949,0.7083,1
82,0.4,0.3182,0.4915,0.4583,2
56,0.5429,0.5909,0.6271,0.625,2
55,0.3714,0.3636,0.5932,0.5,2


In [15]:
# each dask partition will be written to a seperate parquet file
train_df = train_df.repartition(npartitions=NUM_TRAINING_FILES)
test_df = test_df.repartition(npartitions=NUM_TEST_FILES)

train_df.to_parquet(OUT_TRAIN_PATH)
test_df.to_parquet(OUT_TEST_PATH)