Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): add create job method #81

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 76 additions & 1 deletion bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
from google.cloud.bigquery.table import TableReference
from google.cloud.bigquery.table import RowIterator


_DEFAULT_CHUNKSIZE = 1048576 # 1024 * 1024 B = 1 MB
_MAX_MULTIPART_SIZE = 5 * 1024 * 1024
_DEFAULT_NUM_RETRIES = 6
Expand Down Expand Up @@ -1127,6 +1126,82 @@ def job_from_resource(self, resource):
return job.QueryJob.from_api_repr(resource, self)
return job.UnknownJob.from_api_repr(resource, self)

def create_job(
self, job_config, source=None, destination=None, query=None, retry=DEFAULT_RETRY
):
"""Create a new job.
Arguments:
job_config (dict): configuration job representation returned from the API

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cosmetics: the line should end with a dot.


Keyword Arguments:
source (Union[ \
google.cloud.bigquery.table.Table, \
google.cloud.bigquery.table.TableReference, \
str, \
Sequence[str]
]):
(Optional) URIs of data files to be loaded; in format
``gs://<bucket_name>/<object_name_or_glob>`` or Table

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cosmetics: extra space in front of the Table.

into which data is to be loaded.

destination (Union[ \
google.cloud.bigquery.table.Table, \
google.cloud.bigquery.table.TableReference, \
str, \
]):
(Optional) Table into which data is to be loaded. If a string is passed
in, this method attempts to create a table reference from a string using
:func:`google.cloud.bigquery.table.TableReference.from_string` or URIs of
Cloud Storage file(s) into which table data is to be extracted; in format
``gs://<bucket_name>/<object_name_or_glob>``.

query (str):
(Optional) SQL query to be executed. Defaults to the standard SQL dialect.
Use the ``job_config`` parameter to change dialects.

retry (google.api_core.retry.Retry):
(Optional) How to retry the RPC.

Returns:
Union[ \
google.cloud.bigquery.job.LoadJob, \
google.cloud.bigquery.job.CopyJob, \
google.cloud.bigquery.job.ExtractJob, \
google.cloud.bigquery.job.QueryJob \
]:
A new job instance.
"""

if "load" in job_config:
job_config = google.cloud.bigquery.job.LoadJobConfig.from_api_repr(
job_config
)
return self.load_table_from_uri(
source, destination, job_config=job_config, retry=retry
)
elif "copy" in job_config:
job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr(
job_config
)
return self.copy_table(
source, destination, job_config=job_config, retry=retry
)
elif "extract" in job_config:
job_config = google.cloud.bigquery.job.ExtractJobConfig.from_api_repr(
job_config
)
return self.extract_table(
source, destination, job_config=job_config, retry=retry
)
elif "query" in job_config:
del job_config["query"]["destinationTable"]
job_config = google.cloud.bigquery.job.QueryJobConfig.from_api_repr(
job_config
)
return self.query(query, job_config=job_config, retry=retry)
else:
raise TypeError("Invalid job configuration received.")

def get_job(self, job_id, project=None, location=None, retry=DEFAULT_RETRY):
"""Fetch a job for the project associated with this client.

Expand Down
89 changes: 89 additions & 0 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from google.cloud import bigquery_v2
from google.cloud.bigquery.dataset import DatasetReference
from tests.unit.helpers import make_connection
from google.cloud.bigquery.retry import DEFAULT_RETRY


def _make_credentials():
Expand Down Expand Up @@ -2584,6 +2585,94 @@ def test_delete_table_w_not_found_ok_true(self):

conn.api_request.assert_called_with(method="DELETE", path=path)

def _create_job_helper(
self, job_config, client_method, query=None, source=None, destination=None
):
creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)

client._connection = make_connection()
rf1 = mock.Mock()
get_config_patch = mock.patch(
"google.cloud.bigquery.job._JobConfig.from_api_repr", return_value=rf1,
)
load_patch = mock.patch(client_method, autospec=True)
with load_patch as client_method, get_config_patch:
client.create_job(
job_config=job_config,
source=source,
destination=destination,
query=query,
)
if query:
client_method.assert_called_once_with(
client, query, job_config=rf1, retry=DEFAULT_RETRY
)
else:
client_method.assert_called_once_with(
client, source, destination, job_config=rf1, retry=DEFAULT_RETRY
)

def test_create_job_load_config(self):
configuration = {"load": {"sourceUris": "http://example.com/source.csv"}}
self._create_job_helper(
configuration,
"google.cloud.bigquery.client.Client.load_table_from_uri",
source="http://example.com/source.csv",
destination="dataset_id",
)

def test_create_job_copy_config(self):
configuration = {
"copy": {
"sourceTables": "sourceTable",
"destinationTable": "destinationTable",
}
}

self._create_job_helper(
configuration,
"google.cloud.bigquery.client.Client.copy_table",
source=mock.Mock(),
destination=mock.Mock(),
)

def test_create_job_extract_config(self):
configuration = {
"extract": {
"sourceTable": {"projectId": "project"},
"destinationUris": ["destination"],
}
}
self._create_job_helper(
configuration,
"google.cloud.bigquery.client.Client.extract_table",
source=mock.Mock(),
destination=mock.Mock(),
)

def test_create_job_query_config(self):
configuration = {
"query": {"query": "query", "destinationTable": {"tableId": "table_id"}}
}
self._create_job_helper(
configuration,
"google.cloud.bigquery.client.Client.query",
query=mock.Mock(),
)

def test_create_job_w_invalid_job_config(self):
configuration = {"unknown": {}}
creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)

with self.assertRaises(TypeError) as exc:
client.create_job(job_config=configuration)

self.assertIn("Invalid job configuration", exc.exception.args[0])

def test_job_from_resource_unknown_type(self):
from google.cloud.bigquery.job import UnknownJob

Expand Down