diff --git a/CHANGELOG.md b/CHANGELOG.md index ac6c61e9..3e0fd162 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ This project adheres to [Semantic Versioning](http://semver.org/). ## Unreleased ### Added +- Added `table_columns` parameter to `civis.io.civis_file_to_table`, `civis.io.dataframe_to_civis`, and `civis.io.csv_to_civis` (#379) + ### Fixed - Fixed/relaxed version specifications for click, jsonref, and jsonschema. (#377) diff --git a/civis/io/_tables.py b/civis/io/_tables.py index 0da5895b..164f2439 100644 --- a/civis/io/_tables.py +++ b/civis/io/_tables.py @@ -617,6 +617,7 @@ def dataframe_to_civis(df, database, table, api_key=None, client=None, max_errors=None, existing_table_rows="fail", diststyle=None, distkey=None, sortkey1=None, sortkey2=None, + table_columns=None, headers=None, credential_id=None, primary_keys=None, last_modified_keys=None, execution="immediate", @@ -660,6 +661,11 @@ def dataframe_to_civis(df, database, table, api_key=None, client=None, The column to use as the sortkey for the table. sortkey2 : str, optional The second column in a compound sortkey for the table. + table_columns : list[Dict[str, str]], optional + A list of dictionaries corresponding to the columns in + the source file. Each dictionary should have keys + for column "name" and "sqlType". The import will only copy these + columns regardless if there are more columns in the table. headers : bool, optional [DEPRECATED] Whether or not the first row of the file should be treated as headers. The default, ``None``, attempts to autodetect whether @@ -740,6 +746,7 @@ def dataframe_to_civis(df, database, table, api_key=None, client=None, existing_table_rows=existing_table_rows, diststyle=diststyle, distkey=distkey, sortkey1=sortkey1, sortkey2=sortkey2, + table_columns=table_columns, delimiter=delimiter, headers=headers, credential_id=credential_id, primary_keys=primary_keys, @@ -756,6 +763,7 @@ def csv_to_civis(filename, database, table, api_key=None, client=None, max_errors=None, existing_table_rows="fail", diststyle=None, distkey=None, sortkey1=None, sortkey2=None, + table_columns=None, delimiter=",", headers=None, primary_keys=None, last_modified_keys=None, escaped=False, execution="immediate", @@ -795,6 +803,11 @@ def csv_to_civis(filename, database, table, api_key=None, client=None, The column to use as the sortkey for the table. sortkey2 : str, optional The second column in a compound sortkey for the table. + table_columns : list[Dict[str, str]], optional + A list of dictionaries corresponding to the columns in + the source file. Each dictionary should have keys + for column "name" and "sqlType". The import will only copy these + columns regardless if there are more columns in the table. delimiter : string, optional The column delimiter. One of ``','``, ``'\\t'`` or ``'|'``. headers : bool, optional @@ -863,6 +876,7 @@ def csv_to_civis(filename, database, table, api_key=None, client=None, existing_table_rows=existing_table_rows, diststyle=diststyle, distkey=distkey, sortkey1=sortkey1, sortkey2=sortkey2, + table_columns=table_columns, delimiter=delimiter, headers=headers, credential_id=credential_id, primary_keys=primary_keys, @@ -878,6 +892,7 @@ def civis_file_to_table(file_id, database, table, client=None, max_errors=None, existing_table_rows="fail", diststyle=None, distkey=None, sortkey1=None, sortkey2=None, + table_columns=None, primary_keys=None, last_modified_keys=None, escaped=False, execution="immediate", delimiter=None, headers=None, @@ -918,6 +933,11 @@ def civis_file_to_table(file_id, database, table, client=None, The column to use as the sortkey for the table. sortkey2 : str, optional The second column in a compound sortkey for the table. + table_columns : list[Dict[str, str]], optional + A list of dictionaries corresponding to the columns in + the source file. Each dictionary should have keys + for column "name" and "sqlType". The import will only copy these + columns regardless if there are more columns in the table. primary_keys: list[str], optional A list of the primary key column(s) of the destination table that uniquely identify a record. If existing_table_rows is "upsert", this @@ -999,16 +1019,19 @@ def civis_file_to_table(file_id, database, table, client=None, # Use Preprocess endpoint to get the table columns as needed # and perform necessary file cleaning - need_table_columns = not table_exists or existing_table_rows == 'drop' + need_table_columns = ((not table_exists or existing_table_rows == 'drop') + and table_columns is None) cleaning_futures = _run_cleaning(file_id, client, need_table_columns, headers, delimiter, hidden) (cleaned_file_ids, headers, compression, delimiter, - table_columns) = _process_cleaning_results( + cleaned_table_columns) = _process_cleaning_results( cleaning_futures, client, headers, need_table_columns, delimiter ) + table_columns = table_columns or cleaned_table_columns + source = dict(file_ids=cleaned_file_ids) destination = dict(schema=schema, table=table, remote_host_id=db_id, credential_id=cred_id, primary_keys=primary_keys, diff --git a/civis/tests/test_io.py b/civis/tests/test_io.py index 386b4155..0209ef7f 100644 --- a/civis/tests/test_io.py +++ b/civis/tests/test_io.py @@ -194,6 +194,7 @@ def test_csv_to_civis(self, m_civis_file_to_table, m_file_to_civis, existing_table_rows='truncate', diststyle=None, distkey=None, sortkey1=None, sortkey2=None, + table_columns=None, delimiter=",", headers=None, primary_keys=None, last_modified_keys=None, @@ -397,6 +398,106 @@ def test_civis_file_to_table_table_doesnt_exist( **expected_kwargs ) + @pytest.mark.civis_file_to_table + @mock.patch('civis.io._tables._process_cleaning_results') + @mock.patch('civis.io._tables._run_cleaning') + def test_civis_file_to_table_table_doesnt_exist_provide_table_columns( + self, + m_run_cleaning, + m_process_cleaning_results, + _m_get_api_spec + ): + table = "scratch.api_client_test_fixture" + database = 'redshift-general' + mock_file_id = 1234 + mock_cleaned_file_id = 1235 + mock_import_id = 8675309 + + self.mock_client.imports.post_files_csv.return_value\ + .id = mock_import_id + self.mock_client.get_database_id.return_value = 42 + self.mock_client.default_credential = 713 + + self.mock_client.get_table_id.side_effect = ValueError('no table') + table_columns = [{'name': 'foo', 'sql_type': 'INTEGER'}] + m_process_cleaning_results.return_value = ( + [mock_cleaned_file_id], + True, # headers + 'gzip', # compression + 'comma', # delimiter + None # table_columns + ) + m_run_cleaning.return_value = [mock.sentinel.cleaning_future] + + with mock.patch.object( + civis.io._tables, 'run_job', + spec_set=True) as m_run_job: + + run_job_future = mock.MagicMock( + spec=civis.futures.CivisFuture, + job_id=123, + run_id=234 + ) + + m_run_job.return_value = run_job_future + + result = civis.io.civis_file_to_table( + mock_file_id, database, table, + existing_table_rows='truncate', + table_columns=table_columns, + delimiter=',', + headers=True, + client=self.mock_client + ) + + assert result is run_job_future + m_run_job.assert_called_once_with(mock_import_id, + client=self.mock_client, + polling_interval=None) + + m_run_cleaning.assert_called_once_with( + [mock_file_id], self.mock_client, False, True, 'comma', True + ) + m_process_cleaning_results.assert_called_once_with( + [mock.sentinel.cleaning_future], + self.mock_client, + True, + False, + 'comma' + ) + + expected_name = 'CSV import to scratch.api_client_test_fixture' + expected_kwargs = { + 'name': expected_name, + 'max_errors': None, + 'existing_table_rows': 'truncate', + 'hidden': True, + 'column_delimiter': 'comma', + 'compression': 'gzip', + 'escaped': False, + 'execution': 'immediate', + 'loosen_types': False, + 'table_columns': table_columns, + 'redshift_destination_options': { + 'diststyle': None, 'distkey': None, + 'sortkeys': [None, None] + } + + } + self.mock_client.imports.post_files_csv.assert_called_once_with( + {'file_ids': [mock_cleaned_file_id]}, + { + 'schema': 'scratch', + 'table': 'api_client_test_fixture', + 'remote_host_id': 42, + 'credential_id': 713, + 'primary_keys': None, + 'last_modified_keys': None + }, + True, + **expected_kwargs + ) + @pytest.mark.civis_file_to_table @mock.patch('civis.io._tables._process_cleaning_results') @mock.patch('civis.io._tables._run_cleaning') @@ -755,6 +856,7 @@ def test_dataframe_to_civis(self, m_civis_file_to_table, m_file_to_civis, max_errors=None, existing_table_rows="truncate", diststyle=None, distkey=None, sortkey1=None, sortkey2=None, + table_columns=None, delimiter=',', primary_keys=None, last_modified_keys=None,