Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[APIC-273] Add Table Columns Parameter #379

Merged
merged 12 commits into from
Apr 6, 2020
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ This project adheres to [Semantic Versioning](http://semver.org/).

## Unreleased
### Added
- Added `table_columns` parameter to `civis.io.civis_file_to_table`, `civis.io.dataframe_to_civis`, and `civis.io.csv_to_civis` (#379)

### Fixed

- Fixed/relaxed version specifications for click, jsonref, and jsonschema. (#377)
Expand Down
27 changes: 25 additions & 2 deletions civis/io/_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,7 @@ def dataframe_to_civis(df, database, table, api_key=None, client=None,
max_errors=None, existing_table_rows="fail",
diststyle=None, distkey=None,
sortkey1=None, sortkey2=None,
table_columns=None,
headers=None, credential_id=None,
primary_keys=None, last_modified_keys=None,
execution="immediate",
Expand Down Expand Up @@ -660,6 +661,11 @@ def dataframe_to_civis(df, database, table, api_key=None, client=None,
The column to use as the sortkey for the table.
sortkey2 : str, optional
The second column in a compound sortkey for the table.
table_columns : list[Dict[str, str]], optional
A list of dictionaries corresponding to the columns in
the source file. Each dictionary should have keys
for column "name" and "sqlType". The import will only copy these
columns regardless if there are more columns in the table.
headers : bool, optional [DEPRECATED]
Whether or not the first row of the file should be treated as
headers. The default, ``None``, attempts to autodetect whether
Expand Down Expand Up @@ -740,6 +746,7 @@ def dataframe_to_civis(df, database, table, api_key=None, client=None,
existing_table_rows=existing_table_rows,
diststyle=diststyle, distkey=distkey,
sortkey1=sortkey1, sortkey2=sortkey2,
table_columns=table_columns,
delimiter=delimiter, headers=headers,
credential_id=credential_id,
primary_keys=primary_keys,
Expand All @@ -756,6 +763,7 @@ def csv_to_civis(filename, database, table, api_key=None, client=None,
max_errors=None, existing_table_rows="fail",
diststyle=None, distkey=None,
sortkey1=None, sortkey2=None,
table_columns=None,
delimiter=",", headers=None,
primary_keys=None, last_modified_keys=None,
escaped=False, execution="immediate",
Expand Down Expand Up @@ -795,6 +803,11 @@ def csv_to_civis(filename, database, table, api_key=None, client=None,
The column to use as the sortkey for the table.
sortkey2 : str, optional
The second column in a compound sortkey for the table.
table_columns : list[Dict[str, str]], optional
A list of dictionaries corresponding to the columns in
the source file. Each dictionary should have keys
for column "name" and "sqlType". The import will only copy these
columns regardless if there are more columns in the table.
delimiter : string, optional
The column delimiter. One of ``','``, ``'\\t'`` or ``'|'``.
headers : bool, optional
Expand Down Expand Up @@ -863,6 +876,7 @@ def csv_to_civis(filename, database, table, api_key=None, client=None,
existing_table_rows=existing_table_rows,
diststyle=diststyle, distkey=distkey,
sortkey1=sortkey1, sortkey2=sortkey2,
table_columns=table_columns,
delimiter=delimiter, headers=headers,
credential_id=credential_id,
primary_keys=primary_keys,
Expand All @@ -878,6 +892,7 @@ def civis_file_to_table(file_id, database, table, client=None,
max_errors=None, existing_table_rows="fail",
diststyle=None, distkey=None,
sortkey1=None, sortkey2=None,
table_columns=None,
primary_keys=None, last_modified_keys=None,
escaped=False, execution="immediate",
delimiter=None, headers=None,
Expand Down Expand Up @@ -918,6 +933,11 @@ def civis_file_to_table(file_id, database, table, client=None,
The column to use as the sortkey for the table.
sortkey2 : str, optional
The second column in a compound sortkey for the table.
table_columns : list[Dict[str, str]], optional
mheilman marked this conversation as resolved.
Show resolved Hide resolved
A list of dictionaries corresponding to the columns in
the source file. Each dictionary should have keys
for column "name" and "sqlType". The import will only copy these
columns regardless if there are more columns in the table.
primary_keys: list[str], optional
A list of the primary key column(s) of the destination table that
uniquely identify a record. If existing_table_rows is "upsert", this
Expand Down Expand Up @@ -999,16 +1019,19 @@ def civis_file_to_table(file_id, database, table, client=None,

# Use Preprocess endpoint to get the table columns as needed
# and perform necessary file cleaning
need_table_columns = not table_exists or existing_table_rows == 'drop'
need_table_columns = ((not table_exists or existing_table_rows == 'drop')
uttercm marked this conversation as resolved.
Show resolved Hide resolved
and table_columns is None)

cleaning_futures = _run_cleaning(file_id, client, need_table_columns,
headers, delimiter, hidden)

(cleaned_file_ids, headers, compression, delimiter,
table_columns) = _process_cleaning_results(
cleaned_table_columns) = _process_cleaning_results(
cleaning_futures, client, headers, need_table_columns, delimiter
)

table_columns = table_columns or cleaned_table_columns

source = dict(file_ids=cleaned_file_ids)
destination = dict(schema=schema, table=table, remote_host_id=db_id,
credential_id=cred_id, primary_keys=primary_keys,
Expand Down
102 changes: 102 additions & 0 deletions civis/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ def test_csv_to_civis(self, m_civis_file_to_table, m_file_to_civis,
existing_table_rows='truncate',
diststyle=None, distkey=None,
sortkey1=None, sortkey2=None,
table_columns=None,
delimiter=",", headers=None,
primary_keys=None,
last_modified_keys=None,
Expand Down Expand Up @@ -397,6 +398,106 @@ def test_civis_file_to_table_table_doesnt_exist(
**expected_kwargs
)

@pytest.mark.civis_file_to_table
@mock.patch('civis.io._tables._process_cleaning_results')
@mock.patch('civis.io._tables._run_cleaning')
def test_civis_file_to_table_table_doesnt_exist_provide_table_columns(
self,
m_run_cleaning,
m_process_cleaning_results,
_m_get_api_spec
):
table = "scratch.api_client_test_fixture"
database = 'redshift-general'
mock_file_id = 1234
mock_cleaned_file_id = 1235
mock_import_id = 8675309

self.mock_client.imports.post_files_csv.return_value\
.id = mock_import_id
self.mock_client.get_database_id.return_value = 42
self.mock_client.default_credential = 713

self.mock_client.get_table_id.side_effect = ValueError('no table')
table_columns = [{'name': 'foo', 'sql_type': 'INTEGER'}]
m_process_cleaning_results.return_value = (
[mock_cleaned_file_id],
True, # headers
'gzip', # compression
'comma', # delimiter
None # table_columns
)
m_run_cleaning.return_value = [mock.sentinel.cleaning_future]

with mock.patch.object(
civis.io._tables, 'run_job',
spec_set=True) as m_run_job:

run_job_future = mock.MagicMock(
spec=civis.futures.CivisFuture,
job_id=123,
run_id=234
)

m_run_job.return_value = run_job_future

result = civis.io.civis_file_to_table(
mock_file_id, database, table,
existing_table_rows='truncate',
table_columns=table_columns,
delimiter=',',
headers=True,
client=self.mock_client
)

assert result is run_job_future
m_run_job.assert_called_once_with(mock_import_id,
client=self.mock_client,
polling_interval=None)

m_run_cleaning.assert_called_once_with(
[mock_file_id], self.mock_client, False, True, 'comma', True
)
m_process_cleaning_results.assert_called_once_with(
[mock.sentinel.cleaning_future],
self.mock_client,
True,
False,
'comma'
)

expected_name = 'CSV import to scratch.api_client_test_fixture'
expected_kwargs = {
'name': expected_name,
'max_errors': None,
'existing_table_rows': 'truncate',
'hidden': True,
'column_delimiter': 'comma',
'compression': 'gzip',
'escaped': False,
'execution': 'immediate',
'loosen_types': False,
'table_columns': table_columns,
'redshift_destination_options': {
'diststyle': None, 'distkey': None,
'sortkeys': [None, None]
}

}
self.mock_client.imports.post_files_csv.assert_called_once_with(
{'file_ids': [mock_cleaned_file_id]},
{
'schema': 'scratch',
'table': 'api_client_test_fixture',
'remote_host_id': 42,
'credential_id': 713,
'primary_keys': None,
'last_modified_keys': None
},
True,
**expected_kwargs
)

@pytest.mark.civis_file_to_table
@mock.patch('civis.io._tables._process_cleaning_results')
@mock.patch('civis.io._tables._run_cleaning')
Expand Down Expand Up @@ -755,6 +856,7 @@ def test_dataframe_to_civis(self, m_civis_file_to_table, m_file_to_civis,
max_errors=None, existing_table_rows="truncate",
diststyle=None, distkey=None,
sortkey1=None, sortkey2=None,
table_columns=None,
delimiter=',',
primary_keys=None,
last_modified_keys=None,
Expand Down