Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 46 additions & 82 deletions tests/gcp/hooks/test_dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,95 +178,34 @@ def test_start_java_dataflow_with_job_class(self, mock_conn, mock_dataflow, mock
self.assertListEqual(sorted(mock_dataflow.call_args[0][0]),
sorted(expected_cmd))

@mock.patch('airflow.gcp.hooks.dataflow._Dataflow.log')
@mock.patch('subprocess.Popen')
@mock.patch('select.select')
def test_dataflow_wait_for_done_logging(self, mock_select, mock_popen, mock_logging):
mock_logging.info = MagicMock()
mock_logging.warning = MagicMock()
mock_proc = MagicMock()
mock_proc.stderr = MagicMock()
mock_proc.stderr.readlines = MagicMock(return_value=['test\n', 'error\n'])
mock_stderr_fd = MagicMock()
mock_proc.stderr.fileno = MagicMock(return_value=mock_stderr_fd)
mock_proc_poll = MagicMock()
mock_select.return_value = [[mock_stderr_fd]]

def poll_resp_error():
mock_proc.return_code = 1
return True

mock_proc_poll.side_effect = [None, poll_resp_error]
mock_proc.poll = mock_proc_poll
mock_popen.return_value = mock_proc
dataflow = _Dataflow(['test', 'cmd'])
mock_logging.info.assert_called_once_with('Running command: %s', 'test cmd')
self.assertRaises(Exception, dataflow.wait_for_done)

def test_valid_dataflow_job_name(self):
job_name = self.dataflow_hook._build_dataflow_job_name(
job_name=JOB_NAME, append_job_name=False
)

self.assertEqual(job_name, JOB_NAME)

def test_fix_underscore_in_job_name(self):
job_name_with_underscore = 'test_example'
fixed_job_name = job_name_with_underscore.replace(
'_', '-'
)
@parameterized.expand([
(JOB_NAME, JOB_NAME, False),
('test-example', 'test_example', False),
('test-dataflow-pipeline-12345678', JOB_NAME, True),
('test-example-12345678', 'test_example', True),
('df-job-1', 'df-job-1', False),
('df-job', 'df-job', False),
('dfjob', 'dfjob', False),
('dfjob1', 'dfjob1', False),
])
@mock.patch(DATAFLOW_STRING.format('uuid.uuid4'), return_value=MOCK_UUID)
def test_valid_dataflow_job_name(self, expected_result, job_name, append_job_name, mock_uuid4):
job_name = self.dataflow_hook._build_dataflow_job_name(
job_name=job_name_with_underscore, append_job_name=False
)

self.assertEqual(job_name, fixed_job_name)

def test_invalid_dataflow_job_name(self):
invalid_job_name = '9test_invalid_name'
fixed_name = invalid_job_name.replace(
'_', '-')

with self.assertRaises(ValueError) as e:
self.dataflow_hook._build_dataflow_job_name(
job_name=invalid_job_name, append_job_name=False
)
# Test whether the job_name is present in the Error msg
self.assertIn('Invalid job_name ({})'.format(fixed_name),
str(e.exception))

def test_dataflow_job_regex_check(self):
self.assertEqual(self.dataflow_hook._build_dataflow_job_name(
job_name='df-job-1', append_job_name=False
), 'df-job-1')

self.assertEqual(self.dataflow_hook._build_dataflow_job_name(
job_name='df-job', append_job_name=False
), 'df-job')

self.assertEqual(self.dataflow_hook._build_dataflow_job_name(
job_name='dfjob', append_job_name=False
), 'dfjob')

self.assertEqual(self.dataflow_hook._build_dataflow_job_name(
job_name='dfjob1', append_job_name=False
), 'dfjob1')

self.assertRaises(
ValueError,
self.dataflow_hook._build_dataflow_job_name,
job_name='1dfjob', append_job_name=False
job_name=job_name, append_job_name=append_job_name
)

self.assertRaises(
ValueError,
self.dataflow_hook._build_dataflow_job_name,
job_name='dfjob@', append_job_name=False
)
self.assertEqual(expected_result, job_name)

@parameterized.expand([
("1dfjob@", ),
("dfjob@", ),
("df^jo", )
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tvalentyn are there any specific limitations on the first character of a dataflow job id ? (e.g. can't be _ or -. If so, I think that the hook might cast _ -> - and think it has fixed the name but in reality this may still be invalid. we should add a test case for something like this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name must match the regular expression a-z?

I like the idea of trying to fix IDs. I enter it in the list of planned improvements. Thanks.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed this question, dataflow job IDs start with a date, e.g. 2019-08-05

])
def test_build_dataflow_job_name_with_invalid_value(self, job_name):
self.assertRaises(
ValueError,
self.dataflow_hook._build_dataflow_job_name,
job_name='df^jo', append_job_name=False
job_name=job_name, append_job_name=False
)


Expand Down Expand Up @@ -412,3 +351,28 @@ def test_data_flow_valid_job_id(self):
def test_data_flow_missing_job_id(self):
cmd = ['echo', 'unit testing']
self.assertEqual(_Dataflow(cmd).wait_for_done(), None)

@mock.patch('airflow.gcp.hooks.dataflow._Dataflow.log')
@mock.patch('subprocess.Popen')
@mock.patch('select.select')
def test_dataflow_wait_for_done_logging(self, mock_select, mock_popen, mock_logging):
mock_logging.info = MagicMock()
mock_logging.warning = MagicMock()
mock_proc = MagicMock()
mock_proc.stderr = MagicMock()
mock_proc.stderr.readlines = MagicMock(return_value=['test\n', 'error\n'])
mock_stderr_fd = MagicMock()
mock_proc.stderr.fileno = MagicMock(return_value=mock_stderr_fd)
mock_proc_poll = MagicMock()
mock_select.return_value = [[mock_stderr_fd]]

def poll_resp_error():
mock_proc.return_code = 1
return True

mock_proc_poll.side_effect = [None, poll_resp_error]
mock_proc.poll = mock_proc_poll
mock_popen.return_value = mock_proc
dataflow = _Dataflow(['test', 'cmd'])
mock_logging.info.assert_called_once_with('Running command: %s', 'test cmd')
self.assertRaises(Exception, dataflow.wait_for_done)