Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

One-hot encode multiple columns #16

Merged
merged 9 commits into from
Jul 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sparsity/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.11.1
0.12.0
77 changes: 56 additions & 21 deletions sparsity/dask/reshape.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,84 @@
import warnings
from collections import OrderedDict

import numpy as np

import sparsity as sp
from sparsity import sparse_one_hot
from sparsity.dask import SparseFrame
import numpy as np

from sparsity.io import _just_read_array


def one_hot_encode(ddf, column,
categories, index_col):
def one_hot_encode(ddf, column=None, categories=None, index_col=None,
order=None, prefixes=False):
"""
Sparse one hot encoding of dask.DataFrame
Sparse one hot encoding of dask.DataFrame.

Convert a dask.DataFrame into a series of SparseFrames. By one hot
encoding a single column
Convert a dask.DataFrame into a series of SparseFrames by one-hot
encoding specified columns.

Parameters
----------
ddf: dask.DataFrame
e.g. the clickstream
column: str
column name to one hot encode in with SparseFrame
categories: iterable
possible category values
index_col: str, iterable
categories: dict
Maps column name -> iterable of possible category values.
See description of `order`.
index_col: str | iterable
which columns to use as index
order: iterable
Specify order in which one-hot encoded columns should be aligned.

If `order = [col_name1, col_name2]`
and `categories = {col_name1: ['A', 'B'], col_name2: ['C', 'D']}`,
then the resulting SparseFrame will have columns
`['A', 'B', 'C', 'D']`.

If you don't specify order, then output columns' order depends on
iteration over `categories` dictionary. You can pass `categories`
as an OrderedDict instead of providing `order` explicitly.
prefixes: bool
If False, column names will be the same as categories,
so that new columns will be named like:
[cat11, cat12, cat21, cat22, ...].

If True, original column name followed by an underscore will be added
in front of each category name, so that new columns will be named like:
[col1_cat11, col1_cat12, col2_cat21, col2_cat22, ...].
column: DEPRECATED
Kept only for backward compatibility.

Returns
-------
sparse_one_hot: dask.Series
sparse_one_hot: sparsity.dask.SparseFrame
"""
if column is not None:
warnings.warn(
'`column` argument of sparsity.dask.reshape.one_hot_encode '
'function is deprecated.'
)
if order is not None:
raise ValueError('`order` and `column` arguments cannot be used '
'together.')
categories = {column: categories}

idx_meta = ddf._meta.reset_index().set_index(index_col).index[:0] \
if index_col else ddf._meta.index

if isinstance(categories, str):
columns = _just_read_array(categories)
else:
columns = categories
if order is not None:
categories = OrderedDict([(column, categories[column])
for column in order])

columns = sparse_one_hot(ddf._meta,
categories=categories,
index_col=index_col,
prefixes=prefixes).columns
meta = sp.SparseFrame(np.array([]), columns=columns,
index=idx_meta)
index=idx_meta)

dsf = ddf.map_partitions(sparse_one_hot,
column=column,
categories=categories,
index_col=index_col,
prefixes=prefixes,
meta=object)

return SparseFrame(dsf.dask, dsf._name, meta, dsf.divisions)
return SparseFrame(dsf.dask, dsf._name, meta, dsf.divisions)
57 changes: 49 additions & 8 deletions sparsity/sparse_frame.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# coding=utf-8
import traceback
import uuid
import warnings
from collections import OrderedDict
from functools import partial, reduce

import numpy as np
Expand Down Expand Up @@ -220,7 +222,7 @@ def data(self):
return self._data
return self._data[:-1,:]

# backwards comptability
# backwards compatibility
def groupby(self, by=None, level=0):
return self.groupby_sum(by, level)

Expand Down Expand Up @@ -623,21 +625,60 @@ def _create_group_matrix(group_idx, dtype='f8'):
dtype=dtype).tocsr()


def sparse_one_hot(df, column, categories, dtype='f8', index_col=None):
def _parse_legacy_soh_interface(categories, order):
"""
One-hot encode a single column of a pandas.DataFrame.
Old interface was
sparse_one_hot(df, column, categories, dtype='f8', index_col=None).
"""
new_order = None
new_categories = {categories: order}
return new_categories, new_order


def sparse_one_hot(df, column=None, categories=None, dtype='f8',
index_col=None, order=None, prefixes=False):
"""
One-hot encode specified columns of a pandas.DataFrame.
Returns a SparseFrame.

See the documentation of :func:`sparsity.dask.reshape.one_hot_encode`.
"""
if isinstance(categories, str):
categories = _just_read_array(categories)
cols, csr = _one_hot_series_csr(categories, dtype, df[column])
if column is not None:
warnings.warn(
'`column` argument of sparsity.sparse_frame.sparse_one_hot '
'function is deprecated.'
)
if order is not None:
raise ValueError('`order` and `column` arguments cannot be used '
'together.')
categories = {column: categories}

if order is not None:
categories = OrderedDict([(column, categories[column])
for column in order])

new_cols = []
csrs = []
for column, column_cat in categories.items():
if isinstance(column_cat, str):
column_cat = _just_read_array(column_cat)
cols, csr = _one_hot_series_csr(column_cat, dtype, df[column])
if prefixes:
cols = list(map(lambda x: '{}_{}'.format(column, x), cols))
new_cols.extend(cols)
csrs.append(csr)
if len(set(new_cols)) < len(new_cols):
raise ValueError('Different columns have same categories. This would '
'result in duplicated column names. '
'Set `prefix` to True to manage this situation.')
new_data = sparse.hstack(csrs, format='csr')

if not isinstance(index_col, list):
new_index = df[index_col] if index_col else df.index
else:
df = df.reset_index()
new_index = pd.MultiIndex.from_arrays(df[index_col].values.T)
return SparseFrame(csr, index=new_index, columns=cols)
return SparseFrame(new_data, index=new_index, columns=new_cols)


def _one_hot_series_csr(categories, dtype, oh_col):
Expand All @@ -659,4 +700,4 @@ def _one_hot_series_csr(categories, dtype, oh_col):
data = sparse.coo_matrix((data, (row_indices, col_indices)),
shape=(n_samples, n_features),
dtype=dtype).tocsr()
return cat.categories.values, data
return cat.categories.values, data
7 changes: 3 additions & 4 deletions sparsity/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
import tempfile
from contextlib import contextmanager

import pytest

import numpy as np
import pandas as pd

import pytest
import sparsity


Expand All @@ -20,6 +18,7 @@ def testdb():
def clickstream():
df = pd.DataFrame(dict(
page_id=np.random.choice(list('ABCDE'), size=100),
other_categorical=np.random.choice(list('FGHIJ'), size=100),
id=np.random.choice([1,2,3,4,5,6,7,8,9], size=100)
),
index=pd.date_range("2016-01-01", periods=100))
Expand All @@ -34,4 +33,4 @@ def tmpdir(dir=None):
yield dirname
finally:
if os.path.exists(dirname):
shutil.rmtree(dirname, ignore_errors=True)
shutil.rmtree(dirname, ignore_errors=True)
80 changes: 68 additions & 12 deletions sparsity/test/test_dask_sparse_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@
import numpy as np
import pandas as pd
import pytest
from dask.async import get_sync

import sparsity as sp
import sparsity.dask as dsp
from dask.async import get_sync
from sparsity import sparse_one_hot
from sparsity.dask.reshape import one_hot_encode

Expand Down Expand Up @@ -64,8 +63,7 @@ def test_dask_loc(clickstream):
sf = dd.from_pandas(clickstream, npartitions=10) \
.map_partitions(
sparse_one_hot,
column='page_id',
categories=list('ABCDE'),
categories={'page_id': list('ABCDE')},
meta=list
)

Expand All @@ -79,9 +77,8 @@ def test_dask_multi_index_loc(clickstream):
sf = dd.from_pandas(clickstream, npartitions=10) \
.map_partitions(
sparse_one_hot,
column='page_id',
index_col=['index', 'id'],
categories=list('ABCDE'),
categories={'page_id': list('ABCDE')},
meta=list
)
res = sf.loc['2016-01-15':'2016-02-15']
Expand All @@ -99,24 +96,83 @@ def test_repr():
assert isinstance(dsf.__repr__(), str)


def test_one_hot(clickstream):
def test_one_hot_legacy(clickstream):
ddf = dd.from_pandas(clickstream, npartitions=10)
dsf = one_hot_encode(ddf, column='page_id',
categories=list('ABCDE'),
index_col=['index', 'id'])
dsf = one_hot_encode(ddf, 'page_id', list('ABCDE'), ['index', 'id'])
assert dsf._meta.empty
sf = dsf.compute()
assert sf.shape == (100, 5)
assert isinstance(sf.index, pd.MultiIndex)


def test_one_hot_no_order(clickstream):
ddf = dd.from_pandas(clickstream, npartitions=10)
dsf = one_hot_encode(ddf,
categories={'page_id': list('ABCDE'),
'other_categorical': list('FGHIJ')},
index_col=['index', 'id'])
assert dsf._meta.empty
assert sorted(dsf.columns) == list('ABCDEFGHIJ')
sf = dsf.compute()
assert sf.shape == (100, 10)
assert isinstance(sf.index, pd.MultiIndex)
assert sorted(sf.columns) == list('ABCDEFGHIJ')


def test_one_hot_prefixes(clickstream):
ddf = dd.from_pandas(clickstream, npartitions=10)
dsf = one_hot_encode(ddf,
categories={'page_id': list('ABCDE'),
'other_categorical': list('FGHIJ')},
index_col=['index', 'id'],
prefixes=True)
correct_columns = list(map(lambda x: 'page_id_' + x, list('ABCDE'))) \
+ list(map(lambda x: 'other_categorical_' + x, list('FGHIJ')))
assert dsf._meta.empty
assert sorted(dsf.columns) == sorted(correct_columns)
sf = dsf.compute()
assert sf.shape == (100, 10)
assert isinstance(sf.index, pd.MultiIndex)
assert sorted(sf.columns) == sorted(correct_columns)


def test_one_hot_order1(clickstream):
ddf = dd.from_pandas(clickstream, npartitions=10)
dsf = one_hot_encode(ddf,
categories={'page_id': list('ABCDE'),
'other_categorical': list('FGHIJ')},
order=['page_id', 'other_categorical'],
index_col=['index', 'id'])
assert dsf._meta.empty
assert all(dsf.columns == list('ABCDEFGHIJ'))
sf = dsf.compute()
assert sf.shape == (100, 10)
assert isinstance(sf.index, pd.MultiIndex)
assert all(sf.columns == list('ABCDEFGHIJ'))


def test_one_hot_order2(clickstream):
ddf = dd.from_pandas(clickstream, npartitions=10)
dsf = one_hot_encode(ddf,
categories={'page_id': list('ABCDE'),
'other_categorical': list('FGHIJ')},
order=['other_categorical', 'page_id'],
index_col=['index', 'id'])
assert dsf._meta.empty
assert all(dsf.columns == list('FGHIJABCDE'))
sf = dsf.compute()
assert sf.shape == (100, 10)
assert isinstance(sf.index, pd.MultiIndex)
assert all(sf.columns == list('FGHIJABCDE'))


def test_one_hot_disk_categories(clickstream):
with tmpdir() as tmp:
cat_path = os.path.join(tmp, 'cat.pickle')
pd.Series(list('ABCDE')).to_pickle(cat_path)
ddf = dd.from_pandas(clickstream, npartitions=10)
dsf = one_hot_encode(ddf, column='page_id',
categories=cat_path,
dsf = one_hot_encode(ddf,
categories={'page_id': cat_path},
index_col=['index', 'id'])
assert dsf._meta.empty
sf = dsf.compute()
Expand Down
Loading