[AIRFLOW-7064] Add CloudFirestoreExportDatabaseOperator#7725
[AIRFLOW-7064] Add CloudFirestoreExportDatabaseOperator#7725mik-laj merged 6 commits intoapache:masterfrom
Conversation
6dcbab9 to
8df4534
Compare
17fa2e4 to
3582f81
Compare
3582f81 to
146dec8
Compare
potiuk
left a comment
There was a problem hiding this comment.
Just a few comment copy&paste victims to fix 👍
|
|
||
| def get_conn(self): | ||
| """ | ||
| Retrieves the connection to Cloud Build. |
There was a problem hiding this comment.
| Retrieves the connection to Cloud Build. | |
| Retrieves the connection to Cloud Firestore. |
| """ | ||
| Retrieves the connection to Cloud Build. | ||
|
|
||
| :return: Google Cloud Build services object. |
There was a problem hiding this comment.
| :return: Google Cloud Build services object. | |
| :return: Google Cloud Firestore services object. |
| # then it will get the message below: | ||
| # > Request contains an invalid argument. | ||
| # At the same time, the Non-Authorized Client has no problems. | ||
| non_authorized_conn = build("firestore", self.api_version, cache_discovery=False) |
There was a problem hiding this comment.
I created a ticket in google-api-python-client.
googleapis/google-api-python-client#845
I am not sure if this is a good place to report a bug, so I do not add a comment in the code. The error is in the service, not in the library, but I don't know a better place to report this error.
| self, body: Dict, database_id: str = "(default)", project_id: Optional[str] = None | ||
| ) -> None: | ||
| """ | ||
| Starts a build with the specified configuration. |
There was a problem hiding this comment.
| Starts a build with the specified configuration. | |
| Starts export with the specified configuration. |
| See: | ||
| https://firebase.google.com/docs/firestore/reference/rest/v1beta1/projects.databases/exportDocuments | ||
| :type body: dict | ||
| :param project_id: Optional, Google Cloud Project project_id where the function belongs. |
There was a problem hiding this comment.
| :param project_id: Optional, Google Cloud Project project_id where the function belongs. | |
| :param project_id: Optional, Google Cloud Project project_id where the database belongs. |
turbaszek
left a comment
There was a problem hiding this comment.
LGTM, some questions and additional copy-past fixes :)
| .get(name=operation_name) | ||
| .execute(num_retries=self.num_retries) | ||
| ) | ||
| if operation_response.get("done"): |
There was a problem hiding this comment.
This is ok, however, I wonder isn't if "done" in operation_response more pythonic?
There was a problem hiding this comment.
We want to check if the done value is "true"
| GCP_SPANNER_KEY = 'gcp_spanner.json' | ||
| GCP_TASKS_KEY = 'gcp_tasks.json' | ||
| GMP_KEY = 'gmp.json' | ||
| G_FIREBASE_KEY = 'g_firebase.json' |
There was a problem hiding this comment.
| G_FIREBASE_KEY = 'g_firebase.json' | |
| G_FIREBASE_KEY = 'gcp_firebase.json' |
Let's keep consistency in case of GCP :)
There was a problem hiding this comment.
It's not GCP, but Firebase service. This database is available in the Pantheon, but is most often used by Firebase and was created as part of Firebase. It also has a common name with Firebase.
| TEST_PROJECT_ID = "firestore--project-id" | ||
|
|
||
|
|
||
| class TestCloudBuildHookWithPassedProjectId(unittest.TestCase): |
There was a problem hiding this comment.
| class TestCloudBuildHookWithPassedProjectId(unittest.TestCase): | |
| class TestCloudFirestoreHookWithPassedProjectId(unittest.TestCase): |
| self.hook.export_documents(body=EXPORT_DOCUMENT_BODY, project_id=TEST_PROJECT_ID) | ||
|
|
||
|
|
||
| class TestGcpComputeHookWithDefaultProjectIdFromConnection(unittest.TestCase): |
There was a problem hiding this comment.
| class TestGcpComputeHookWithDefaultProjectIdFromConnection(unittest.TestCase): | |
| class TestCloudFirestoreHookWithDefaultProjectIdFromConnection(unittest.TestCase): |
| self.hook.export_documents(body=EXPORT_DOCUMENT_BODY) | ||
|
|
||
|
|
||
| class TestCloudBuildHookWithoutProjectId(unittest.TestCase): |
There was a problem hiding this comment.
| class TestCloudBuildHookWithoutProjectId(unittest.TestCase): | |
| class TestCloudFirestoreHookWithoutProjectId(unittest.TestCase): |
| @pytest.mark.system("google.firebase") | ||
| @pytest.mark.system("google.cloud") | ||
| @pytest.mark.credential_file(G_FIREBASE_KEY) | ||
| class CampaignManagerSystemTest(GoogleSystemTest): |
There was a problem hiding this comment.
| class CampaignManagerSystemTest(GoogleSystemTest): | |
| class CloudFirestoreSystemTest(GoogleSystemTest): |
| super().tearDown() | ||
|
|
||
| def clean_up(self): | ||
| self.execute_with_ctx(["gsutil", "rm", "-r", f"{EXPORT_DESTINATION_URL}"], G_FIREBASE_KEY) |
There was a problem hiding this comment.
Can't we use delete_gcs_bucket method of GoogleSystemTest?
There was a problem hiding this comment.
I do not want to delete the entire bucket, but only its contents, because this test has complex requirements for the location of the bucket and permissions. Information is available in example_firestore.py.
It is WIP. I have to prepare documentation, but I would like to ask for opinions. gcloud is not a standard solution in operators. A more complex example that will use BigQuery would also be helpful.
Issue link: AIRFLOW-7064
Make sure to mark the boxes below before creating PR: [x]
[AIRFLOW-NNNN]. AIRFLOW-NNNN = JIRA ID** For document-only changes commit message can start with
[AIRFLOW-XXXX].In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.