diff --git a/tests/test_marquez_client.py b/tests/test_marquez_client.py index 4253fdc..fd77a1a 100644 --- a/tests/test_marquez_client.py +++ b/tests/test_marquez_client.py @@ -9,7 +9,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import json import logging.config import unittest import uuid @@ -211,7 +211,7 @@ def test_create_job(self, mock_put): "07f3d2dfc8186cadae9146719e70294a4c7a8ee8" context = { - "SQL": "SELECT * FROM public.mytable;" + "sql": "SELECT * FROM public.mytable;" } mock_put.return_value = { @@ -239,7 +239,7 @@ def test_create_job(self, mock_put): "location": "https://github.com/my-jobs/blob/" "07f3d2dfc8186cadae9146719e70294a4c7a8ee8", "context": { - "SQL": "SELECT * FROM public.mytable;" + "sql": "SELECT * FROM public.mytable;" }, "description": "My first job.", "latestRun": None @@ -258,6 +258,89 @@ def test_create_job(self, mock_put): assert str(response['id']) is not None assert str(response['location']) == location + @mock.patch("marquez_client.client.MarquezClient._put") + def test_create_job_with_run_id(self, mock_put): + run_id = str(uuid.uuid4()) + job_name = "my-job" + input_dataset = [ + { + "namespace": "my-namespace", + "name": "public.mytable" + } + ] + output_dataset = { + "namespace": "my-namespace", + "name": "public.mytable" + } + + location = "https://github.com/my-jobs/blob/" \ + "07f3d2dfc8186cadae9146719e70294a4c7a8ee8" + + context = { + "sql": "SELECT * FROM public.mytable;" + } + + mock_put.return_value = { + "id": { + "namespace": "my-namespace", + "name": "my-job" + }, + "type": "BATCH", + "name": "my-job", + "createdAt": "2020-08-12T07:30:55.321059Z", + "updatedAt": "2020-08-12T07:30:55.333230Z", + "namespace": "my-namespace", + "inputs": [ + { + "namespace": "my-namespace", + "name": "public.mytable" + } + ], + "outputs": [ + { + "namespace": "my-namespace", + "name": "public.mytable" + } + ], + "location": "https://github.com/my-jobs/blob/" + "07f3d2dfc8186cadae9146719e70294a4c7a8ee8", + "context": { + "sql": "SELECT * FROM public.mytable;" + }, + "description": "My first job.", + "latestRun": { + "id": run_id, + "createdAt": "2020-10-09T19:14:07.846451Z", + "updatedAt": "2020-10-09T19:14:07.911627Z", + "nominalStartTime": None, + "nominalEndTime": None, + "state": RunState.RUNNING, + "startedAt": "2020-10-09T19:14:07.893074Z", + "endedAt": "2020-10-09T19:14:07.911627Z", + "durationMs": 18, + "args": {} + } + } + + response = self.client.create_job( + namespace_name=_NAMESPACE, + job_name=job_name, + job_type=JobType.BATCH, + location=location, + input_dataset=input_dataset, + output_dataset=output_dataset, + context=context, + run_id=run_id + ) + + mock_put.assert_called_once() + assert mock_put.call_args.kwargs.get("payload").get("runId")\ + == run_id + + assert str(response['inputs']) is not None + assert str(response['latestRun']['id']) == run_id + assert response['latestRun']['state'] == RunState.RUNNING + @mock.patch("marquez_client.client.MarquezClient._post") def test_create_job_run(self, mock_post): run_id = str(uuid.uuid4())