diff --git a/tests/gcp/hooks/test_dataflow.py b/tests/gcp/hooks/test_dataflow.py index b1a30f1efe646..8a90454756e8c 100644 --- a/tests/gcp/hooks/test_dataflow.py +++ b/tests/gcp/hooks/test_dataflow.py @@ -178,95 +178,34 @@ def test_start_java_dataflow_with_job_class(self, mock_conn, mock_dataflow, mock self.assertListEqual(sorted(mock_dataflow.call_args[0][0]), sorted(expected_cmd)) - @mock.patch('airflow.gcp.hooks.dataflow._Dataflow.log') - @mock.patch('subprocess.Popen') - @mock.patch('select.select') - def test_dataflow_wait_for_done_logging(self, mock_select, mock_popen, mock_logging): - mock_logging.info = MagicMock() - mock_logging.warning = MagicMock() - mock_proc = MagicMock() - mock_proc.stderr = MagicMock() - mock_proc.stderr.readlines = MagicMock(return_value=['test\n', 'error\n']) - mock_stderr_fd = MagicMock() - mock_proc.stderr.fileno = MagicMock(return_value=mock_stderr_fd) - mock_proc_poll = MagicMock() - mock_select.return_value = [[mock_stderr_fd]] - - def poll_resp_error(): - mock_proc.return_code = 1 - return True - - mock_proc_poll.side_effect = [None, poll_resp_error] - mock_proc.poll = mock_proc_poll - mock_popen.return_value = mock_proc - dataflow = _Dataflow(['test', 'cmd']) - mock_logging.info.assert_called_once_with('Running command: %s', 'test cmd') - self.assertRaises(Exception, dataflow.wait_for_done) - - def test_valid_dataflow_job_name(self): - job_name = self.dataflow_hook._build_dataflow_job_name( - job_name=JOB_NAME, append_job_name=False - ) - - self.assertEqual(job_name, JOB_NAME) - - def test_fix_underscore_in_job_name(self): - job_name_with_underscore = 'test_example' - fixed_job_name = job_name_with_underscore.replace( - '_', '-' - ) + @parameterized.expand([ + (JOB_NAME, JOB_NAME, False), + ('test-example', 'test_example', False), + ('test-dataflow-pipeline-12345678', JOB_NAME, True), + ('test-example-12345678', 'test_example', True), + ('df-job-1', 'df-job-1', False), + ('df-job', 'df-job', False), + ('dfjob', 'dfjob', False), + ('dfjob1', 'dfjob1', False), + ]) + @mock.patch(DATAFLOW_STRING.format('uuid.uuid4'), return_value=MOCK_UUID) + def test_valid_dataflow_job_name(self, expected_result, job_name, append_job_name, mock_uuid4): job_name = self.dataflow_hook._build_dataflow_job_name( - job_name=job_name_with_underscore, append_job_name=False - ) - - self.assertEqual(job_name, fixed_job_name) - - def test_invalid_dataflow_job_name(self): - invalid_job_name = '9test_invalid_name' - fixed_name = invalid_job_name.replace( - '_', '-') - - with self.assertRaises(ValueError) as e: - self.dataflow_hook._build_dataflow_job_name( - job_name=invalid_job_name, append_job_name=False - ) - # Test whether the job_name is present in the Error msg - self.assertIn('Invalid job_name ({})'.format(fixed_name), - str(e.exception)) - - def test_dataflow_job_regex_check(self): - self.assertEqual(self.dataflow_hook._build_dataflow_job_name( - job_name='df-job-1', append_job_name=False - ), 'df-job-1') - - self.assertEqual(self.dataflow_hook._build_dataflow_job_name( - job_name='df-job', append_job_name=False - ), 'df-job') - - self.assertEqual(self.dataflow_hook._build_dataflow_job_name( - job_name='dfjob', append_job_name=False - ), 'dfjob') - - self.assertEqual(self.dataflow_hook._build_dataflow_job_name( - job_name='dfjob1', append_job_name=False - ), 'dfjob1') - - self.assertRaises( - ValueError, - self.dataflow_hook._build_dataflow_job_name, - job_name='1dfjob', append_job_name=False + job_name=job_name, append_job_name=append_job_name ) - self.assertRaises( - ValueError, - self.dataflow_hook._build_dataflow_job_name, - job_name='dfjob@', append_job_name=False - ) + self.assertEqual(expected_result, job_name) + @parameterized.expand([ + ("1dfjob@", ), + ("dfjob@", ), + ("df^jo", ) + ]) + def test_build_dataflow_job_name_with_invalid_value(self, job_name): self.assertRaises( ValueError, self.dataflow_hook._build_dataflow_job_name, - job_name='df^jo', append_job_name=False + job_name=job_name, append_job_name=False ) @@ -412,3 +351,28 @@ def test_data_flow_valid_job_id(self): def test_data_flow_missing_job_id(self): cmd = ['echo', 'unit testing'] self.assertEqual(_Dataflow(cmd).wait_for_done(), None) + + @mock.patch('airflow.gcp.hooks.dataflow._Dataflow.log') + @mock.patch('subprocess.Popen') + @mock.patch('select.select') + def test_dataflow_wait_for_done_logging(self, mock_select, mock_popen, mock_logging): + mock_logging.info = MagicMock() + mock_logging.warning = MagicMock() + mock_proc = MagicMock() + mock_proc.stderr = MagicMock() + mock_proc.stderr.readlines = MagicMock(return_value=['test\n', 'error\n']) + mock_stderr_fd = MagicMock() + mock_proc.stderr.fileno = MagicMock(return_value=mock_stderr_fd) + mock_proc_poll = MagicMock() + mock_select.return_value = [[mock_stderr_fd]] + + def poll_resp_error(): + mock_proc.return_code = 1 + return True + + mock_proc_poll.side_effect = [None, poll_resp_error] + mock_proc.poll = mock_proc_poll + mock_popen.return_value = mock_proc + dataflow = _Dataflow(['test', 'cmd']) + mock_logging.info.assert_called_once_with('Running command: %s', 'test cmd') + self.assertRaises(Exception, dataflow.wait_for_done)