Skip to content

Commit

Permalink
Support ingestion of CSV files (#193)
Browse files Browse the repository at this point in the history
This aims to add ingestion of CSV files.

If `column_ids` is provided the input file is assumed to be a CSV.

Work in progress.
  • Loading branch information
rth committed Jan 30, 2019
1 parent 4075dfe commit 038c872
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 17 deletions.
5 changes: 5 additions & 0 deletions freediscovery/data/ds_003/example.csv
@@ -0,0 +1,5 @@
13, Shipment of gold damaged in aa fire, 32, agreed
23, Delivery of silver arrived in aa silver truck., 12, blue
44, Shipment of gold arrived in aa truck., 21, yellow
22, short sentence, ..., other values

38 changes: 38 additions & 0 deletions freediscovery/engine/tests/test_vectorizer.py
Expand Up @@ -20,6 +20,7 @@

basename = os.path.dirname(__file__)
data_dir = os.path.join(basename, "..", "..", "data", "ds_001", "raw")
csv_data_dir = os.path.join(basename, "..", "..", "data", "ds_003")
n_features = 1100000


Expand Down Expand Up @@ -395,6 +396,7 @@ def test_ingestion_content():
assert_array_equal(X.indices, X2.indices)
assert_array_equal(X.data, X2.data)


def test_non_random_dsid():
data_dir = os.path.join(basename, "..", "..", "data", "ds_002", "raw")
cache_dir = check_cache()
Expand All @@ -416,3 +418,39 @@ def test_non_random_dsid():
with pytest.raises(WrongParameter):
fh = FeatureVectorizer(cache_dir=cache_dir, mode='fw', dsid='?+ds$$')
uuid = fh.setup()


def test_ingestion_csv():
cache_dir = check_cache()

fe = FeatureVectorizer(cache_dir=cache_dir, mode='w')
fe.setup(column_ids=[1, 3])
fe.ingest(dataset_definition=[
{'file_path': os.path.join(csv_data_dir, 'example.csv')}],
)
X = fe._load_features()
assert X.shape[0] == 4
assert len(fe.filenames_) == X.shape[0]
assert X.shape[0] == fe.n_samples_


def test_ingestion_csv_wrong_params():
cache_dir = check_cache()

fe = FeatureVectorizer(cache_dir=cache_dir, mode='w')
fe.setup(column_ids=[1, 3])
with pytest.raises(
ValueError,
match=".*can only be privided using `dataset_definition.*"
):
fe.ingest(csv_data_dir)

with pytest.raises(
ValueError,
match=".*one CSV can be provided at a time.*"
):
fe.ingest(dataset_definition=[
{'file_path': os.path.join(csv_data_dir, 'example.csv')},
{'file_path': os.path.join(csv_data_dir, 'example.csv')},
],
)
77 changes: 64 additions & 13 deletions freediscovery/engine/vectorizer.py
Expand Up @@ -209,7 +209,8 @@ def setup(self, n_features=None, chunk_size=5000, analyzer='word',
use_hashing=False,
weighting='nnc', norm_alpha=0.75, min_df=0.0, max_df=1.0,
parse_email_headers=False,
preprocess=[]):
preprocess=[], column_ids=None, column_separator=','
):
"""Initalize the features extraction.
See sklearn.feature_extraction.text for a detailed description
Expand Down Expand Up @@ -253,6 +254,11 @@ def setup(self, n_features=None, chunk_size=5000, analyzer='word',
SMART weighting type
preprocess : list of strings, default: []
A list of pre-processing steps, including 'emails_ingore_header'
column_ids: None or List[str]
when provided the ingested files are assumed to be CSV
column_separator: str, default=','
delimiter used for parsing CSV files. Only used when
``column_ids`` is not None.
"""
if self.mode not in ['w', 'fw']:
raise WrongParameter('The vectorizer can be setup only with '
Expand Down Expand Up @@ -294,6 +300,14 @@ def setup(self, n_features=None, chunk_size=5000, analyzer='word',
else:
raise WrongParameter('stop_words = {}'.format(stop_words))

if not isinstance(column_separator, str):
raise ValueError('column_separator={} expected string'
.format(column_separator))

if not (column_ids is None or isinstance(column_ids, (list, tuple))):
raise ValueError('column_ids={} expected None or sequence'
.format(column_ids))

if n_features is None and use_hashing:
n_features = 100001 # default size of the hashing table

Expand All @@ -315,15 +329,18 @@ def setup(self, n_features=None, chunk_size=5000, analyzer='word',
'parse_email_headers': parse_email_headers,
'type': type(self).__name__,
'preprocess': preprocess,
'freediscovery_version': __version__}
'freediscovery_version': __version__,
'column_ids': column_ids,
'column_separator': column_separator}
self._pars = pars
with (dsid_dir / 'pars').open('wb') as fh:
pickle.dump(self._pars, fh)
return dsid

def ingest(self, data_dir=None, file_pattern='.*', dir_pattern='.*',
dataset_definition=None, vectorize=True,
document_id_generator='indexed_file_path'):
document_id_generator='indexed_file_path',
):
"""Perform data ingestion
Parameters
Expand All @@ -347,14 +364,38 @@ def ingest(self, data_dir=None, file_pattern='.*', dir_pattern='.*',
elif len(db_list) >= 1:
internal_id_offset = int(db_list[-1].name[3:])

if dataset_definition is not None:
db = DocumentIndex.from_list(dataset_definition, data_dir,
internal_id_offset + 1, dsid_dir,
document_id_generator=document_id_generator)
pars = self.pars_

if pars.get('column_ids', None) is not None:
if dataset_definition is None:
raise ValueError("CSV files can only be privided using "
"`dataset_definition` parameter")
else:
if len(dataset_definition) > 1:
raise ValueError(
"Only one CSV can be provided at a time"
)
file_path = dataset_definition[0]['file_path']
X = pd.read_csv(
file_path, sep=pars['column_separator'], header=None)
dataset_definition = [
{'file_path': f"{file_path}:{idx}", 'document_id': idx}
for idx in range(len(X))]

db = DocumentIndex.from_list(
dataset_definition, data_dir,
internal_id_offset + 1, dsid_dir,
document_id_generator=document_id_generator)
elif dataset_definition is not None:
db = DocumentIndex.from_list(
dataset_definition, data_dir,
internal_id_offset + 1, dsid_dir,
document_id_generator=document_id_generator)
elif data_dir is not None:
db = DocumentIndex.from_folder(data_dir, file_pattern, dir_pattern,
internal_id_offset + 1,
document_id_generator=document_id_generator)
db = DocumentIndex.from_folder(
data_dir, file_pattern, dir_pattern,
internal_id_offset + 1,
document_id_generator=document_id_generator)
else:
db = None

Expand All @@ -366,7 +407,6 @@ def ingest(self, data_dir=None, file_pattern='.*', dir_pattern='.*',
self._filenames = db.data.file_path.values.tolist()
del db.data['file_path']

pars = self.pars_

if 'file_path' in db.data.columns:
del db.data['file_path']
Expand Down Expand Up @@ -522,8 +562,19 @@ def transform(self):
vect = CountVectorizer(input='content',
max_features=pars['n_features'],
**opts_tfidf)
text_gen = (_preprocess_stream(_read_file(fname), pars['preprocess'])
for fname in pars['filenames_abs'])
if pars['column_ids'] is not None:
# joining again in case there are more than one `:` in the file name
file_path = ':'.join(pars["filenames_abs"][0].split(':')[:-1])
X = pd.read_csv(file_path, sep=pars['column_separator'],
header=None)
X = X.iloc[:, pars['column_ids']]
# contactenate all columns together
text_gen = X.apply(lambda x: ''.join(str(el) for el in x), axis=1).values
else:
text_gen = (
_preprocess_stream(_read_file(fname), pars['preprocess'])
for fname in pars['filenames_abs'])

res = vect.fit_transform(text_gen)
self._vect = vect
fname = dsid_dir / 'vectorizer'
Expand Down
2 changes: 2 additions & 0 deletions freediscovery/server/resources.py
Expand Up @@ -127,6 +127,8 @@ def get(self):
- `preprocess`: a list of pre-processing steps to apply before vectorization. A subset of ['emails_ignore_header'], default: [].
- `id`: (optional) custom dataset id. Can only contain letters, numbers, "_" or "-". It must also be between 2 and 50 characters long.
- `overwrite`: if a custom dataset id was provided, and it already exists, overwrite it. Default: false
- `column_ids` : list of ints. If provided the input dataset is expected to be CSV, and the columns with the provided ids are selected. Documents can only be provided using `dataset_definition` parameter that must contain a single file path
- `column_separator`: str, character used to delimit columns. Only used if `column_ids` is provided. Default: ','
"""))
@use_args(FeaturesParsSchema(strict=True, exclude=('data_dir')))
@marshal_with(IDSchema())
Expand Down
2 changes: 2 additions & 0 deletions freediscovery/server/schemas.py
Expand Up @@ -84,6 +84,8 @@ class FeaturesParsSchema(Schema):
preprocess = fields.List(fields.Str(), missing=[])
id = fields.Str()
overwrite = fields.Boolean(missing=False)
column_ids = fields.List(fields.Int(), missing=None)
column_separator = fields.Str(missing=',')


class FeaturesSchema(FeaturesParsSchema):
Expand Down
2 changes: 2 additions & 0 deletions freediscovery/server/tests/test_categorization.py
Expand Up @@ -272,6 +272,8 @@ def test_api_categorization_sort(app, sort_by):
data = app.get_check(method)

training_set = ds_input['training_set']
print(training_set)


pars = {
'parent_id': lsi_id,
Expand Down
41 changes: 40 additions & 1 deletion freediscovery/server/tests/test_ingestion.py
Expand Up @@ -70,7 +70,8 @@ def test_get_feature_extraction(app, hashed, weighting):
'norm_alpha': 'float', 'use_hashing': 'bool',
'filenames': ['str'], 'max_df': 'float', 'min_df': 'float',
'parse_email_headers': 'bool', 'n_samples_processed': 'int',
'preprocess': []}
'preprocess': [], 'column_ids': 'NoneType',
'column_separator': 'str'}

assert data['use_hashing'] == hashed
assert data['weighting'] == weighting
Expand Down Expand Up @@ -280,3 +281,41 @@ def test_document_non_random_id(app):

with pytest.raises(WrongParameter):
data = app.post_check(method, json={'id': 'dsjkdlsy8^$$$'})


def test_ingest_csv(app):
method = V01 + "/feature-extraction/"
data = app.post_check(method, json={'column_ids': [1, 3]})
dsid = data['id']
method += dsid
app.post_check(
method,
json={
'dataset_definition': [
{'file_path': os.path.join(data_dir, '..', '..', 'ds_003', 'example.csv')}
]})
data = app.get_check(method)

# check that the file_path is correctly returned by the id-mapping
data = app.post_check(method + '/id-mapping',
json={'return_file_path': True})
assert len(data.get('data', [])) == 4
assert data.get('data', [])[-1] == {
'document_id': 3, 'file_path': 'example.csv:3', 'internal_id': 3}

# check that classification works
pars = {
'parent_id': dsid,
'data': [{'document_id': 0, 'category': 'positive'},
{'document_id': 1, 'category': 'negative'}],
'method': "NearestNeighbor",
'training_scores': True}

method = V01 + "/categorization/"
data = app.post_check(method, json=pars)
mid = data['id']

method = V01 + "/categorization/{}/predict".format(mid)

data = app.get_check(method)
assert data['pagination']['total_response_count'] == 2
6 changes: 3 additions & 3 deletions requirements_engine.txt
@@ -1,7 +1,7 @@
numpy>=1.10.1 # core libraries
scipy>=0.16.1
pandas>=0.19.0
scikit-learn>=0.19.1
scipy>=0.16.1,<1.2.0
pandas>=0.19.0,<0.24.0
scikit-learn>=0.19.1,<0.21
nltk>=3.2.1
pytest>=3.0.1
flask>=0.12.3 # web libraries
Expand Down

0 comments on commit 038c872

Please sign in to comment.