From 624617a7bceccaa0c60340a131e32d0e37f90ccf Mon Sep 17 00:00:00 2001 From: Carrie Utter Date: Thu, 2 Apr 2020 11:54:31 -0400 Subject: [PATCH 01/12] updated io methods to take table_columns parameter --- civis/io/_tables.py | 21 ++++++++- civis/tests/test_io.py | 102 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 121 insertions(+), 2 deletions(-) diff --git a/civis/io/_tables.py b/civis/io/_tables.py index 0da5895b..42b4a049 100644 --- a/civis/io/_tables.py +++ b/civis/io/_tables.py @@ -617,6 +617,7 @@ def dataframe_to_civis(df, database, table, api_key=None, client=None, max_errors=None, existing_table_rows="fail", diststyle=None, distkey=None, sortkey1=None, sortkey2=None, + table_columns=None, headers=None, credential_id=None, primary_keys=None, last_modified_keys=None, execution="immediate", @@ -660,6 +661,9 @@ def dataframe_to_civis(df, database, table, api_key=None, client=None, The column to use as the sortkey for the table. sortkey2 : str, optional The second column in a compound sortkey for the table. + table_columns : list[Dict[str, str]], optional + An list of dictionaries corresponding to the columns in the source file. + Each dictionary should have keys for column "name" and "sqlType" headers : bool, optional [DEPRECATED] Whether or not the first row of the file should be treated as headers. The default, ``None``, attempts to autodetect whether @@ -740,6 +744,7 @@ def dataframe_to_civis(df, database, table, api_key=None, client=None, existing_table_rows=existing_table_rows, diststyle=diststyle, distkey=distkey, sortkey1=sortkey1, sortkey2=sortkey2, + table_columns=table_columns, delimiter=delimiter, headers=headers, credential_id=credential_id, primary_keys=primary_keys, @@ -756,6 +761,7 @@ def csv_to_civis(filename, database, table, api_key=None, client=None, max_errors=None, existing_table_rows="fail", diststyle=None, distkey=None, sortkey1=None, sortkey2=None, + table_columns=None, delimiter=",", headers=None, primary_keys=None, last_modified_keys=None, escaped=False, execution="immediate", @@ -795,6 +801,9 @@ def csv_to_civis(filename, database, table, api_key=None, client=None, The column to use as the sortkey for the table. sortkey2 : str, optional The second column in a compound sortkey for the table. + table_columns : list[Dict[str, str]], optional + An list of dictionaries corresponding to the columns in the source file. + Each dictionary should have keys for column "name" and "sqlType" delimiter : string, optional The column delimiter. One of ``','``, ``'\\t'`` or ``'|'``. headers : bool, optional @@ -863,6 +872,7 @@ def csv_to_civis(filename, database, table, api_key=None, client=None, existing_table_rows=existing_table_rows, diststyle=diststyle, distkey=distkey, sortkey1=sortkey1, sortkey2=sortkey2, + table_columns=table_columns, delimiter=delimiter, headers=headers, credential_id=credential_id, primary_keys=primary_keys, @@ -878,6 +888,7 @@ def civis_file_to_table(file_id, database, table, client=None, max_errors=None, existing_table_rows="fail", diststyle=None, distkey=None, sortkey1=None, sortkey2=None, + table_columns=None, primary_keys=None, last_modified_keys=None, escaped=False, execution="immediate", delimiter=None, headers=None, @@ -918,6 +929,9 @@ def civis_file_to_table(file_id, database, table, client=None, The column to use as the sortkey for the table. sortkey2 : str, optional The second column in a compound sortkey for the table. + table_columns : list[Dict[str, str]], optional + An list of dictionaries corresponding to the columns in the source file. + Each dictionary should have keys for column "name" and "sqlType" primary_keys: list[str], optional A list of the primary key column(s) of the destination table that uniquely identify a record. If existing_table_rows is "upsert", this @@ -999,16 +1013,19 @@ def civis_file_to_table(file_id, database, table, client=None, # Use Preprocess endpoint to get the table columns as needed # and perform necessary file cleaning - need_table_columns = not table_exists or existing_table_rows == 'drop' + need_table_columns = (not table_exists or existing_table_rows == 'drop') and not table_columns cleaning_futures = _run_cleaning(file_id, client, need_table_columns, headers, delimiter, hidden) (cleaned_file_ids, headers, compression, delimiter, - table_columns) = _process_cleaning_results( + cleaning_table_columns) = _process_cleaning_results( cleaning_futures, client, headers, need_table_columns, delimiter ) + if not table_columns: + table_columns = cleaning_table_columns + source = dict(file_ids=cleaned_file_ids) destination = dict(schema=schema, table=table, remote_host_id=db_id, credential_id=cred_id, primary_keys=primary_keys, diff --git a/civis/tests/test_io.py b/civis/tests/test_io.py index 386b4155..6ac75431 100644 --- a/civis/tests/test_io.py +++ b/civis/tests/test_io.py @@ -194,6 +194,7 @@ def test_csv_to_civis(self, m_civis_file_to_table, m_file_to_civis, existing_table_rows='truncate', diststyle=None, distkey=None, sortkey1=None, sortkey2=None, + table_columns=None, delimiter=",", headers=None, primary_keys=None, last_modified_keys=None, @@ -397,6 +398,106 @@ def test_civis_file_to_table_table_doesnt_exist( **expected_kwargs ) + @pytest.mark.civis_file_to_table + @mock.patch('civis.io._tables._process_cleaning_results') + @mock.patch('civis.io._tables._run_cleaning') + def test_civis_file_to_table_table_doesnt_exist_provide_table_columns( + self, + m_run_cleaning, + m_process_cleaning_results, + _m_get_api_spec + ): + table = "scratch.api_client_test_fixture" + database = 'redshift-general' + mock_file_id = 1234 + mock_cleaned_file_id = 1235 + mock_import_id = 8675309 + + self.mock_client.imports.post_files_csv.return_value\ + .id = mock_import_id + self.mock_client.get_database_id.return_value = 42 + self.mock_client.default_credential = 713 + + self.mock_client.get_table_id.side_effect = ValueError('no table') + table_columns = [{'name': 'foo', 'sql_type': 'INTEGER'}] + m_process_cleaning_results.return_value = ( + [mock_cleaned_file_id], + True, # headers + 'gzip', # compression + 'comma', # delimiter + None # table_columns + ) + m_run_cleaning.return_value = [mock.sentinel.cleaning_future] + + with mock.patch.object( + civis.io._tables, 'run_job', + spec_set=True) as m_run_job: + + run_job_future = mock.MagicMock( + spec=civis.futures.CivisFuture, + job_id=123, + run_id=234 + ) + + m_run_job.return_value = run_job_future + + result = civis.io.civis_file_to_table( + mock_file_id, database, table, + existing_table_rows='truncate', + table_columns=table_columns, + delimiter=',', + headers=True, + client=self.mock_client + ) + + assert result is run_job_future + m_run_job.assert_called_once_with(mock_import_id, + client=self.mock_client, + polling_interval=None) + + m_run_cleaning.assert_called_once_with( + [mock_file_id], self.mock_client, True, False, 'comma', True + ) + m_process_cleaning_results.assert_called_once_with( + [mock.sentinel.cleaning_future], + self.mock_client, + True, + False, + 'comma' + ) + + expected_name = 'CSV import to scratch.api_client_test_fixture' + expected_kwargs = { + 'name': expected_name, + 'max_errors': None, + 'existing_table_rows': 'truncate', + 'hidden': True, + 'column_delimiter': 'comma', + 'compression': 'gzip', + 'escaped': False, + 'execution': 'immediate', + 'loosen_types': False, + 'table_columns': table_columns, + 'redshift_destination_options': { + 'diststyle': None, 'distkey': None, + 'sortkeys': [None, None] + } + + } + self.mock_client.imports.post_files_csv.assert_called_once_with( + {'file_ids': [mock_cleaned_file_id]}, + { + 'schema': 'scratch', + 'table': 'api_client_test_fixture', + 'remote_host_id': 42, + 'credential_id': 713, + 'primary_keys': None, + 'last_modified_keys': None + }, + True, + **expected_kwargs + ) + @pytest.mark.civis_file_to_table @mock.patch('civis.io._tables._process_cleaning_results') @mock.patch('civis.io._tables._run_cleaning') @@ -755,6 +856,7 @@ def test_dataframe_to_civis(self, m_civis_file_to_table, m_file_to_civis, max_errors=None, existing_table_rows="truncate", diststyle=None, distkey=None, sortkey1=None, sortkey2=None, + table_columns=None, delimiter=',', primary_keys=None, last_modified_keys=None, From 4899c2a3029ca1b8e43a70b949c04905276e8c73 Mon Sep 17 00:00:00 2001 From: Carrie Utter Date: Thu, 2 Apr 2020 12:13:15 -0400 Subject: [PATCH 02/12] fixed lint errors --- civis/io/_tables.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/civis/io/_tables.py b/civis/io/_tables.py index 42b4a049..e2434f51 100644 --- a/civis/io/_tables.py +++ b/civis/io/_tables.py @@ -662,8 +662,9 @@ def dataframe_to_civis(df, database, table, api_key=None, client=None, sortkey2 : str, optional The second column in a compound sortkey for the table. table_columns : list[Dict[str, str]], optional - An list of dictionaries corresponding to the columns in the source file. - Each dictionary should have keys for column "name" and "sqlType" + An list of dictionaries corresponding to the columns in + the source file. Each dictionary should have keys + for column "name" and "sqlType" headers : bool, optional [DEPRECATED] Whether or not the first row of the file should be treated as headers. The default, ``None``, attempts to autodetect whether @@ -802,8 +803,9 @@ def csv_to_civis(filename, database, table, api_key=None, client=None, sortkey2 : str, optional The second column in a compound sortkey for the table. table_columns : list[Dict[str, str]], optional - An list of dictionaries corresponding to the columns in the source file. - Each dictionary should have keys for column "name" and "sqlType" + An list of dictionaries corresponding to the columns in + the source file. Each dictionary should have keys + for column "name" and "sqlType" delimiter : string, optional The column delimiter. One of ``','``, ``'\\t'`` or ``'|'``. headers : bool, optional @@ -930,8 +932,9 @@ def civis_file_to_table(file_id, database, table, client=None, sortkey2 : str, optional The second column in a compound sortkey for the table. table_columns : list[Dict[str, str]], optional - An list of dictionaries corresponding to the columns in the source file. - Each dictionary should have keys for column "name" and "sqlType" + An list of dictionaries corresponding to the columns in + the source file. Each dictionary should have keys + for column "name" and "sqlType" primary_keys: list[str], optional A list of the primary key column(s) of the destination table that uniquely identify a record. If existing_table_rows is "upsert", this @@ -1013,7 +1016,10 @@ def civis_file_to_table(file_id, database, table, client=None, # Use Preprocess endpoint to get the table columns as needed # and perform necessary file cleaning - need_table_columns = (not table_exists or existing_table_rows == 'drop') and not table_columns + if not table_columns: + need_table_columns = not table_exists or existing_table_rows == 'drop' + else: + need_table_columns = False cleaning_futures = _run_cleaning(file_id, client, need_table_columns, headers, delimiter, hidden) From a4feb8ed4d0c7467467ff030daa92cc5037f5a67 Mon Sep 17 00:00:00 2001 From: Carrie Utter Date: Thu, 2 Apr 2020 12:21:49 -0400 Subject: [PATCH 03/12] fixed test_io expected param order --- civis/tests/test_io.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/civis/tests/test_io.py b/civis/tests/test_io.py index 6ac75431..d78fb924 100644 --- a/civis/tests/test_io.py +++ b/civis/tests/test_io.py @@ -456,13 +456,13 @@ def test_civis_file_to_table_table_doesnt_exist_provide_table_columns( polling_interval=None) m_run_cleaning.assert_called_once_with( - [mock_file_id], self.mock_client, True, False, 'comma', True + [mock_file_id], self.mock_client, False, True, 'comma', True ) m_process_cleaning_results.assert_called_once_with( [mock.sentinel.cleaning_future], self.mock_client, - True, False, + True, 'comma' ) From 993424cd3f854add71b78243e3bb7bc097805f3b Mon Sep 17 00:00:00 2001 From: Carrie Utter Date: Thu, 2 Apr 2020 12:26:45 -0400 Subject: [PATCH 04/12] fixed test param order --- civis/tests/test_io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/civis/tests/test_io.py b/civis/tests/test_io.py index d78fb924..0209ef7f 100644 --- a/civis/tests/test_io.py +++ b/civis/tests/test_io.py @@ -461,8 +461,8 @@ def test_civis_file_to_table_table_doesnt_exist_provide_table_columns( m_process_cleaning_results.assert_called_once_with( [mock.sentinel.cleaning_future], self.mock_client, - False, True, + False, 'comma' ) From ed6cb75402a279fbea0b4c12adf686f5b2cecb67 Mon Sep 17 00:00:00 2001 From: Carrie Utter Date: Thu, 2 Apr 2020 15:32:59 -0400 Subject: [PATCH 05/12] cleaned up setting variables in civis/io/tables --- civis/io/_tables.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/civis/io/_tables.py b/civis/io/_tables.py index e2434f51..fa0a3d0c 100644 --- a/civis/io/_tables.py +++ b/civis/io/_tables.py @@ -1016,10 +1016,9 @@ def civis_file_to_table(file_id, database, table, client=None, # Use Preprocess endpoint to get the table columns as needed # and perform necessary file cleaning - if not table_columns: - need_table_columns = not table_exists or existing_table_rows == 'drop' - else: - need_table_columns = False + need_table_columns = (table_columns is None + or not table_exists + or existing_table_rows == 'drop') cleaning_futures = _run_cleaning(file_id, client, need_table_columns, headers, delimiter, hidden) @@ -1029,8 +1028,7 @@ def civis_file_to_table(file_id, database, table, client=None, cleaning_futures, client, headers, need_table_columns, delimiter ) - if not table_columns: - table_columns = cleaning_table_columns + table_columns = table_columns or cleaning_table_columns source = dict(file_ids=cleaned_file_ids) destination = dict(schema=schema, table=table, remote_host_id=db_id, From dc731f0e80f2dbe1e0da17d8508a9f4d03b5c42b Mon Sep 17 00:00:00 2001 From: Carrie Utter Date: Thu, 2 Apr 2020 15:35:06 -0400 Subject: [PATCH 06/12] Update variable name to be less confusing --- civis/io/_tables.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/civis/io/_tables.py b/civis/io/_tables.py index fa0a3d0c..3ac346e1 100644 --- a/civis/io/_tables.py +++ b/civis/io/_tables.py @@ -1024,11 +1024,11 @@ def civis_file_to_table(file_id, database, table, client=None, headers, delimiter, hidden) (cleaned_file_ids, headers, compression, delimiter, - cleaning_table_columns) = _process_cleaning_results( + cleaned_table_columns) = _process_cleaning_results( cleaning_futures, client, headers, need_table_columns, delimiter ) - table_columns = table_columns or cleaning_table_columns + table_columns = table_columns or cleaned_table_columns source = dict(file_ids=cleaned_file_ids) destination = dict(schema=schema, table=table, remote_host_id=db_id, From f74e2be48f1d96b607ac67c4947860465122267f Mon Sep 17 00:00:00 2001 From: Carrie Utter Date: Thu, 2 Apr 2020 15:40:47 -0400 Subject: [PATCH 07/12] fixed need_table_columns logic --- civis/io/_tables.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/civis/io/_tables.py b/civis/io/_tables.py index 3ac346e1..0f5730c6 100644 --- a/civis/io/_tables.py +++ b/civis/io/_tables.py @@ -1016,9 +1016,8 @@ def civis_file_to_table(file_id, database, table, client=None, # Use Preprocess endpoint to get the table columns as needed # and perform necessary file cleaning - need_table_columns = (table_columns is None - or not table_exists - or existing_table_rows == 'drop') + need_table_columns = ((not table_exists or existing_table_rows == 'drop') + and table_columns is None) cleaning_futures = _run_cleaning(file_id, client, need_table_columns, headers, delimiter, hidden) From 21303a45033e2d51a4ae0d68605d4aab0a81d419 Mon Sep 17 00:00:00 2001 From: Carrie Utter Date: Fri, 3 Apr 2020 11:48:51 -0400 Subject: [PATCH 08/12] updated table_columns descriptions --- civis/io/_tables.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/civis/io/_tables.py b/civis/io/_tables.py index 0f5730c6..164f2439 100644 --- a/civis/io/_tables.py +++ b/civis/io/_tables.py @@ -662,9 +662,10 @@ def dataframe_to_civis(df, database, table, api_key=None, client=None, sortkey2 : str, optional The second column in a compound sortkey for the table. table_columns : list[Dict[str, str]], optional - An list of dictionaries corresponding to the columns in + A list of dictionaries corresponding to the columns in the source file. Each dictionary should have keys - for column "name" and "sqlType" + for column "name" and "sqlType". The import will only copy these + columns regardless if there are more columns in the table. headers : bool, optional [DEPRECATED] Whether or not the first row of the file should be treated as headers. The default, ``None``, attempts to autodetect whether @@ -803,9 +804,10 @@ def csv_to_civis(filename, database, table, api_key=None, client=None, sortkey2 : str, optional The second column in a compound sortkey for the table. table_columns : list[Dict[str, str]], optional - An list of dictionaries corresponding to the columns in + A list of dictionaries corresponding to the columns in the source file. Each dictionary should have keys - for column "name" and "sqlType" + for column "name" and "sqlType". The import will only copy these + columns regardless if there are more columns in the table. delimiter : string, optional The column delimiter. One of ``','``, ``'\\t'`` or ``'|'``. headers : bool, optional @@ -932,9 +934,10 @@ def civis_file_to_table(file_id, database, table, client=None, sortkey2 : str, optional The second column in a compound sortkey for the table. table_columns : list[Dict[str, str]], optional - An list of dictionaries corresponding to the columns in + A list of dictionaries corresponding to the columns in the source file. Each dictionary should have keys - for column "name" and "sqlType" + for column "name" and "sqlType". The import will only copy these + columns regardless if there are more columns in the table. primary_keys: list[str], optional A list of the primary key column(s) of the destination table that uniquely identify a record. If existing_table_rows is "upsert", this From e39bd42f418d28159dc5a55455e6580869d56dee Mon Sep 17 00:00:00 2001 From: Carrie Utter Date: Fri, 3 Apr 2020 12:41:15 -0400 Subject: [PATCH 09/12] added validation on table_columns list --- civis/io/_tables.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/civis/io/_tables.py b/civis/io/_tables.py index 164f2439..873dc800 100644 --- a/civis/io/_tables.py +++ b/civis/io/_tables.py @@ -1001,6 +1001,8 @@ def civis_file_to_table(file_id, database, table, client=None, file_id = [file_id] if schema is None: raise ValueError("Provide a schema as part of the `table` input.") + if table_columns is not None and not isinstance(table_columns, list): + raise ValueError("Provide table_columns as a list of dictionaries with keys of `name` and `sqlType`.") db_id = client.get_database_id(database) cred_id = credential_id or client.default_credential if delimiter is not None: # i.e. it was provided as an argument From df0f454ce12638d390d260b1046395f3acfbc7fe Mon Sep 17 00:00:00 2001 From: Carrie Utter Date: Fri, 3 Apr 2020 12:43:54 -0400 Subject: [PATCH 10/12] fixed flake8 error --- civis/io/_tables.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/civis/io/_tables.py b/civis/io/_tables.py index 873dc800..76e3efe8 100644 --- a/civis/io/_tables.py +++ b/civis/io/_tables.py @@ -1002,7 +1002,8 @@ def civis_file_to_table(file_id, database, table, client=None, if schema is None: raise ValueError("Provide a schema as part of the `table` input.") if table_columns is not None and not isinstance(table_columns, list): - raise ValueError("Provide table_columns as a list of dictionaries with keys of `name` and `sqlType`.") + raise ValueError("Provide table_columns as a list of dictionaries" + " with keys of `name` and `sqlType`.") db_id = client.get_database_id(database) cred_id = credential_id or client.default_credential if delimiter is not None: # i.e. it was provided as an argument From 3159a232007ee49b97b86a3dcce0bc5e67b0df12 Mon Sep 17 00:00:00 2001 From: Carrie Utter Date: Fri, 3 Apr 2020 13:46:34 -0400 Subject: [PATCH 11/12] removed added error check --- civis/io/_tables.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/civis/io/_tables.py b/civis/io/_tables.py index 76e3efe8..164f2439 100644 --- a/civis/io/_tables.py +++ b/civis/io/_tables.py @@ -1001,9 +1001,6 @@ def civis_file_to_table(file_id, database, table, client=None, file_id = [file_id] if schema is None: raise ValueError("Provide a schema as part of the `table` input.") - if table_columns is not None and not isinstance(table_columns, list): - raise ValueError("Provide table_columns as a list of dictionaries" - " with keys of `name` and `sqlType`.") db_id = client.get_database_id(database) cred_id = credential_id or client.default_credential if delimiter is not None: # i.e. it was provided as an argument From 97d0fa4dd2dab50686e4b9af6166171e2161773e Mon Sep 17 00:00:00 2001 From: Carrie Utter Date: Fri, 3 Apr 2020 15:05:03 -0400 Subject: [PATCH 12/12] updated changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ac6c61e9..3e0fd162 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ This project adheres to [Semantic Versioning](http://semver.org/). ## Unreleased ### Added +- Added `table_columns` parameter to `civis.io.civis_file_to_table`, `civis.io.dataframe_to_civis`, and `civis.io.csv_to_civis` (#379) + ### Fixed - Fixed/relaxed version specifications for click, jsonref, and jsonschema. (#377)