Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8446] Adding a test checking the wait for BQ jobs #9858

Merged
merged 6 commits into from Oct 25, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
85 changes: 85 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
Expand Up @@ -23,6 +23,7 @@
import logging
import os
import random
import sys
import time
import unittest

Expand Down Expand Up @@ -424,6 +425,90 @@ def test_records_traverse_transform_with_mocks(self):
assert_that(jobs,
equal_to([job_reference]), label='CheckJobs')

@unittest.skipIf(sys.version_info[0] == 2,
'Mock pickling problems in Py 2')
@mock.patch('time.sleep')
def test_wait_for_job_completion(self, sleep_mock):
job_references = [bigquery_api.JobReference(),
bigquery_api.JobReference()]
job_references[0].projectId = 'project1'
job_references[0].jobId = 'jobId1'
job_references[1].projectId = 'project1'
job_references[1].jobId = 'jobId2'

job_1_waiting = mock.Mock()
job_1_waiting.status.state = 'RUNNING'
job_2_done = mock.Mock()
job_2_done.status.state = 'DONE'
job_2_done.status.errorResult = None

job_1_done = mock.Mock()
job_1_done.status.state = 'DONE'
job_1_done.status.errorResult = None

bq_client = mock.Mock()
bq_client.jobs.Get.side_effect = [
job_1_waiting,
job_2_done,
job_1_done,
job_2_done]

waiting_dofn = bqfl.WaitForBQJobs(bq_client)

dest_list = [(i, job) for i, job in enumerate(job_references)]

with TestPipeline('DirectRunner') as p:
references = beam.pvalue.AsList(p | 'job_ref' >> beam.Create(dest_list))
outputs = (p
| beam.Create([''])
| beam.ParDo(waiting_dofn, references))

assert_that(outputs,
equal_to(dest_list))

sleep_mock.assert_called_once()

@unittest.skipIf(sys.version_info[0] == 2,
'Mock pickling problems in Py 2')
@mock.patch('time.sleep')
def test_one_job_failed_after_waiting(self, sleep_mock):
job_references = [bigquery_api.JobReference(),
bigquery_api.JobReference()]
job_references[0].projectId = 'project1'
job_references[0].jobId = 'jobId1'
job_references[1].projectId = 'project1'
job_references[1].jobId = 'jobId2'

job_1_waiting = mock.Mock()
job_1_waiting.status.state = 'RUNNING'
job_2_done = mock.Mock()
job_2_done.status.state = 'DONE'
job_2_done.status.errorResult = None

job_1_error = mock.Mock()
job_1_error.status.state = 'DONE'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does error still returns 'DONE' as state?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it returns DONE with an error report

job_1_error.status.errorResult = 'Some problems happened'

bq_client = mock.Mock()
bq_client.jobs.Get.side_effect = [
job_1_waiting,
job_2_done,
job_1_error,
job_2_done]

waiting_dofn = bqfl.WaitForBQJobs(bq_client)

dest_list = [(i, job) for i, job in enumerate(job_references)]

with self.assertRaises(Exception):
with TestPipeline('DirectRunner') as p:
references = beam.pvalue.AsList(p | 'job_ref' >> beam.Create(dest_list))
_ = (p
| beam.Create([''])
| beam.ParDo(waiting_dofn, references))

sleep_mock.assert_called_once()

def test_multiple_partition_files(self):
destination = 'project1:dataset1.table1'

Expand Down
5 changes: 4 additions & 1 deletion sdks/python/apache_beam/testing/util.py
Expand Up @@ -209,7 +209,10 @@ def assert_that(actual, matcher, label='assert_that',
Returns:
Ignored.
"""
assert isinstance(actual, pvalue.PCollection)
assert isinstance(
actual,
pvalue.PCollection), ('%s is not a supported type for Beam assert'
% type(actual))

class ReifyTimestampWindow(DoFn):
def process(self, element, timestamp=DoFn.TimestampParam,
Expand Down