Skip to content

Commit

Permalink
Support for Dask DataFrames (#248)
Browse files Browse the repository at this point in the history
* dask logical type inference

* add dask to requirements.txt

* lint fix

* refactor inference tests

* more test refactors

* update release notes

* update test fixtures

* update conftest.py

* Various updates for Dask validation and testing (#260)

* updates for dask validation

* update release notes

* comment out unused code

* update time index test for dask

* parameterize test series and fix tests

* lint fix

* add rows to sample_df and fix head

* Skip Ordinal order validation for Dask dataframes (#270)

* updates for dask validation

* update release notes

* comment out unused code

* update time index test for dask

* parameterize test series and fix tests

* lint fix

* fix ordinal test with Dask

* update release notes

* fix accidental change in conftest.py

* fix changelog

* fix changelog

* Better dask datetime support (#286)

* better dask datetime support

* update release notes

* remove test xfail

* Test numeric time index with Dask input (#288)

* parameterize numeric time index tests

* parameterize additional tests

* update numeric time index test

* update pop method to work with Dask

* Update DataTable.describe to work with Dask (#296)

* update describe to work with Dask

* update release notes

* improve describe performance with Dask

* fix release notes

* Update DataTable.get_mutual_information() to work with Dask (#300)

* update mi to work with Dask

* update release notes

* update test fixture

* Create using Woodwork with Dask guide (#304)

* add using Woodwork with Dask docs guide

* update release notes

* remove comments and empty cell

* update dataframe wording

* remove empty cell

* fix title capitalization

* update release notes

* fix merge issues

* remove unused args from to_pandas util func
  • Loading branch information
thehomebrewnerd committed Oct 27, 2020
1 parent bdcc446 commit 662c1b7
Show file tree
Hide file tree
Showing 16 changed files with 851 additions and 357 deletions.
1 change: 1 addition & 0 deletions docs/source/guides/guides_index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ The guides below provide more detail on the functionality of Woodwork.
understanding_types_and_tags
setting_config_options
using_mi_and_describe
using_woodwork_with_dask
157 changes: 157 additions & 0 deletions docs/source/guides/using_woodwork_with_dask.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Using Woodwork with Dask DataFrames\n",
"\n",
"Woodwork enables DataTables to be created from Dask DataFrames when working with datasets that are too large to easily fit in memory. Although creating a DataTable from a Dask DataFrame follows the same process as one would follow when creating a DataTable from a pandas DataFrame, there are a few limitations to be aware of. This guide will provide a brief overview of creating a DataTable starting with a Dask DataFrame, and will outline several key items to keep in mind when using a Dask DataFrame as input.\n",
"\n",
"First we will create a Dask DataFrame to use in our example. Normally you would create the DataFrame directly by reading in the data from saved files, but we will create it from a demo pandas DataFrame."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask.dataframe as dd\n",
"import woodwork as ww\n",
"\n",
"df_pandas = ww.demo.load_retail(nrows=1000, return_dataframe=True)\n",
"df_dask = dd.from_pandas(df_pandas, npartitions=10)\n",
"df_dask"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now that we have a Dask DataFrame, we can use it to create a Woodwork DataTable, just as we would with a pandas DataFrame:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dt = ww.DataTable(df_dask, index='order_product_id')\n",
"dt.types"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As you can see from the output above, the DataTable was created successfully, and logical type inference was performed for all of the columns. However, this brings us to one of the key issues in working with Dask DataFrames. \n",
"\n",
"In order to perform logical type inference, Woodwork needs to bring the data into memory so it can be analyzed. Currently, Woodwork reads data from the first partition of data only, and then uses this data for type inference. Depending on the complexity of the data, this could be a time consuming operation. Additionally, if the first partition is not representative of the entire dataset, the logical types for some columns may be inferred incorrectly.\n",
"\n",
"If this process takes too much time, or if the logical types are not inferred correctly, users have the ability to manually specify the logical types for each column. If the logical type for a column is specified, type inference for that column will be skipped. If logical types are specified for all columns, logical type inference will be skipped completely and Woodwork will not need to bring any of the data into memory when creating the DataTable.\n",
"\n",
"To skip logical type inference completely, and/or to correct type inference issues, you would simply define a logical types dictionary with the correct logical type defined for each column in the dataframe. Then, pass that dictionary to the call to create the DataTable as shown below:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"logical_types = {\n",
" 'order_product_id': 'WholeNumber',\n",
" 'order_id': 'Categorical',\n",
" 'product_id': 'Categorical',\n",
" 'description': 'NaturalLanguage',\n",
" 'quantity': 'WholeNumber',\n",
" 'order_date': 'Datetime',\n",
" 'unit_price': 'Double',\n",
" 'customer_name': 'FullName',\n",
" 'country': 'Categorical',\n",
" 'total': 'Double',\n",
" 'cancelled': 'Boolean',\n",
"}\n",
"\n",
"dt = ww.DataTable(df_dask, index='order_product_id', logical_types=logical_types)\n",
"dt.types"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"There are two DataTable methods that also require bringing the underlying Dask DataFrame into memory: `describe` and `get_mutual_information`. When called, both of these methods will call a `compute` operation on the DataFrame associated with the DataTable in order to calculate the desired information. This may be problematic for datasets that cannot fit in memory, so exercise caution when using these methods."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dt.describe(include=['numeric'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dt.get_mutual_information().head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Data Validation Limitations\n",
"\n",
"When creating a DataTable several validation checks are performed to confirm that the data in the underlying dataframe is appropriate for the specified parameters. Because some of these validation steps require pulling the underlying data into memory, they are skipped when creating a DataTable from a Dask DataFrame. This section provides an overview of the validation checks that are performed with pandas input but skipped with Dask input.\n",
"\n",
"### Index Uniqueness\n",
"Normally a check is performed to verify that any column specified as the index contains no duplicate values. This check is skipped for Dask, so users must manually verify that any column specified as an index column contains unique values.\n",
"\n",
"### Data Consistency with LogicalType\n",
"If users manually define the LogicalType for a column when creating the DataTable, a check is performed to verify that the data in that column is appropriate for the specified LogicalType. For example, with pandas input if the user specifies a LogicalType of `Double` for a column that contains letters such as `['a', 'b', 'c']`, an error would be raised as it is not possible to convert the letters into numeric values with the `float` dtype associated with the `Double` LogicalType.\n",
"\n",
"With Dask input, no such error would be raised at the time of DataTable creation. However, behind the scenes, Woodwork will have attempted to convert the column physical type to `float`, and this conversion would be added to the Dask task graph, without raising an error. However, an error will be raised if a `compute` operation is called on the underlying DataFrame once Dask attempts to execute the conversion step. Extra care should be taken when using Dask input to make sure any specified logical types are consistent with the data in the columns to avoid this type of error.\n",
"\n",
"Similarly, for the `Ordinal` LogicalType, a check is typically performed to make sure that the data column does not contain any values that are not present in the defined order values. This check will not be performed with Dask input. Users should manually verify that the defined order values are complete to avoid unexpected results."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Other Limitations\n",
"\n",
"Woodwork provides the ability to read data directly from a CSV file into a DataTable, and during this process Woodwork creates the underlying dataframe so the user does not have to do so. The helper function used for this, `woodwork.read_csv`, will currently only read the data into a pandas DataFrame. At some point, we hope to remove this limitation and also allow data to be read into a Dask DataFrame, but for now only pandas DataFrames can be created by this function."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.4"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
9 changes: 8 additions & 1 deletion docs/source/release_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,21 @@ Release Notes
* Enhancements
* Fixes
* Changes
* Support logical type inference from a Dask DataFrame (:pr:`248`)
* Fix validation checks and ``make_index`` to work with Dask DataFrames (:pr:`260`)
* Skip validation of Ordinal order values for Dask DataFrames (:pr:`270`)
* Improve support for datetimes with Dask input (:pr:`286`)
* Update ``DataTable.describe`` to work with Dask input (:pr:`296`)
* Update ``DataTable.get_mutual_information`` to work with Dask input (:pr:`300`)
* Modify ``to_pandas`` function to return DataFrame with correct index (:pr:`281`)
* Rename ``DataColumn.to_pandas`` method to ``DataColumn.to_series`` (:pr:`311`)
* Rename ``DataTable.to_pandas`` method to ``DataTable.to_dataframe`` (:pr:`319`)
* Documentation Changes
* Create a guide for using Woodwork with Dask (:pr:`304`)
* Add conda install instructions (:pr:`305`, :pr:`309`)
* Fix README.md badge with correct link (:pr:`314`)
* Testing Changes
* Parameterize numeric time index tests (:pr:`288`)

Thanks to the following people for contributing to this release:
:user:`ctduffy`, :user:`gsheni`, :user:`thehomebrewnerd`
Expand All @@ -23,7 +31,6 @@ Release Notes
* The ``DataColumn.to_pandas`` method was renamed to ``DataColumn.to_series``.
* The ``DataTable.to_pandas`` method was renamed to ``DataTable.to_dataframe``.


**v0.0.4** October 21, 2020
* Enhancements
* Add optional ``include`` parameter for ``DataTable.describe()`` to filter results (:pr:`228`)
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
numpy>=1.19.1
pandas>=1.1.0
click>=7.1.2
dask[dataframe]>=2.30.0
scikit-learn>=0.21.3
10 changes: 9 additions & 1 deletion woodwork/data_column.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import warnings

import dask.dataframe as dd
import pandas as pd
import pandas.api.types as pdtypes

Expand Down Expand Up @@ -70,7 +71,12 @@ def _update_dtype(self):
# Update the underlying series
try:
if _get_ltype_class(self.logical_type) == Datetime:
self._series = pd.to_datetime(self._series, format=self.logical_type.datetime_format)
if isinstance(self._series, dd.Series):
name = self._series.name
self._series = dd.to_datetime(self._series, format=self.logical_type.datetime_format)
self._series.name = name
else:
self._series = pd.to_datetime(self._series, format=self.logical_type.datetime_format)
else:
self._series = self._series.astype(self.logical_type.pandas_dtype)
except TypeError:
Expand Down Expand Up @@ -280,6 +286,8 @@ def infer_logical_type(series):
Args:
series (pd.Series): Input Series
"""
if isinstance(series, dd.Series):
series = series.get_partition(0).compute()
natural_language_threshold = config.get_option('natural_language_threshold')

inferred_type = NaturalLanguage
Expand Down
31 changes: 22 additions & 9 deletions woodwork/data_table.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import warnings

import dask.dataframe as dd
import pandas as pd
from sklearn.metrics.cluster import normalized_mutual_info_score

Expand Down Expand Up @@ -69,7 +70,11 @@ def __init__(self, dataframe,
self._dataframe = dataframe

if make_index:
self._dataframe.insert(0, index, range(len(self._dataframe)))
if isinstance(self._dataframe, dd.DataFrame):
self._dataframe[index] = 1
self._dataframe[index] = self._dataframe[index].cumsum() - 1
else:
self._dataframe.insert(0, index, range(len(self._dataframe)))

self.name = name
self.use_standard_tags = use_standard_tags
Expand Down Expand Up @@ -230,7 +235,7 @@ def pop(self, column_name):
"""
col = self[column_name]
del self.columns[column_name]
self._dataframe.drop(column_name, axis=1, inplace=True)
self._dataframe = self._dataframe.drop(column_name, axis=1)
return col

def set_index(self, index):
Expand Down Expand Up @@ -393,7 +398,7 @@ def to_dataframe(self, copy=False):
DataFrame: The underlying dataframe of the DataTable. Return type will depend on the type
of dataframe used to create the DataTable.
"""
if self.index is None:
if self.index is None or not isinstance(self._dataframe, pd.DataFrame):
if copy:
return self._dataframe.copy()
else:
Expand Down Expand Up @@ -531,13 +536,18 @@ def describe(self, include=None):

results = {}

if isinstance(self._dataframe, dd.DataFrame):
df = self._dataframe.compute()
else:
df = self._dataframe

for column_name, column in cols_to_include:
if 'index' in column.semantic_tags:
continue
values = {}
logical_type = column.logical_type
semantic_tags = column.semantic_tags
series = column._series
series = df[column_name]

# Calculate Aggregation Stats
if column._is_categorical():
Expand Down Expand Up @@ -666,13 +676,15 @@ def get_mutual_information(self, num_bins=10, nrows=None):
"""
# We only want Numeric, Categorical, and Boolean columns
# And we don't want the index column
valid_columns = {col_name for col_name, column
valid_columns = [col_name for col_name, column
in self.columns.items() if (col_name != self.index and
(column._is_numeric() or
column._is_categorical() or
_get_ltype_class(column.logical_type) == Boolean)
)}
)]
data = self._dataframe[valid_columns]
if isinstance(data, dd.DataFrame):
data = data.compute()

# cut off data if necessary
if nrows is not None and nrows < data.shape[0]:
Expand Down Expand Up @@ -703,8 +715,8 @@ def get_mutual_information(self, num_bins=10, nrows=None):

def _validate_params(dataframe, name, index, time_index, logical_types, semantic_tags, make_index):
"""Check that values supplied during DataTable initialization are valid"""
if not isinstance(dataframe, pd.DataFrame):
raise TypeError('Dataframe must be a pandas.DataFrame')
if not isinstance(dataframe, (pd.DataFrame, dd.DataFrame)):
raise TypeError('Dataframe must be one of: pandas.DataFrame, dask.DataFrame')
_check_unique_column_names(dataframe)
if name and not isinstance(name, str):
raise TypeError('DataTable name must be a string')
Expand Down Expand Up @@ -737,8 +749,9 @@ def _check_index(dataframe, index, make_index=False):
if not make_index and index not in dataframe.columns:
# User specifies an index that is not in the dataframe, without setting make_index to True
raise LookupError(f'Specified index column `{index}` not found in dataframe. To create a new index column, set make_index to True.')
if index and not make_index and not dataframe[index].is_unique:
if index and not make_index and isinstance(dataframe, pd.DataFrame) and not dataframe[index].is_unique:
# User specifies an index that is in the dataframe but not unique
# Does not check for Dask as Dask does not support is_unique
raise IndexError('Index column must be unique')
if make_index and index and index in dataframe.columns:
# User sets make_index to True, but supplies an index name that matches a column already present
Expand Down
13 changes: 8 additions & 5 deletions woodwork/logical_types.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import pandas as pd

from woodwork.utils import camel_to_snake


Expand Down Expand Up @@ -233,11 +235,12 @@ def __init__(self, order):
def _validate_data(self, series):
"""Confirm the supplied series does not contain any values that are not
in the specified order values"""
missing_order_vals = set(series.dropna().values).difference(self.order)
if missing_order_vals:
error_msg = f'Ordinal column {series.name} contains values that are not present ' \
f'in the order values provided: {sorted(list(missing_order_vals))}'
raise ValueError(error_msg)
if isinstance(series, pd.Series):
missing_order_vals = set(series.dropna().values).difference(self.order)
if missing_order_vals:
error_msg = f'Ordinal column {series.name} contains values that are not present ' \
f'in the order values provided: {sorted(list(missing_order_vals))}'
raise ValueError(error_msg)


class PhoneNumber(LogicalType):
Expand Down

0 comments on commit 662c1b7

Please sign in to comment.