Skip to content

Commit

Permalink
Fixup system test for DataprocSubmitJobOperator (PySpark job) (#32740)
Browse files Browse the repository at this point in the history
  • Loading branch information
moiseenkov committed Jul 21, 2023
1 parent 4992176 commit 440c9eb
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 15 deletions.
6 changes: 5 additions & 1 deletion airflow/providers/google/cloud/operators/dataproc.py
Expand Up @@ -1504,6 +1504,10 @@ def execute(self, context: Context):
class DataprocSubmitPySparkJobOperator(DataprocJobBaseOperator):
"""Start a PySpark Job on a Cloud DataProc cluster.
.. seealso::
This operator is deprecated, please use
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`:
:param main: [Required] The Hadoop Compatible Filesystem (HCFS) URI of the main
Python file to use as the driver. Must be a .py file. (templated)
:param arguments: Arguments for the job. (templated)
Expand Down Expand Up @@ -1940,7 +1944,7 @@ class DataprocSubmitJobOperator(GoogleCloudBaseOperator):
:param job: Required. The job resource.
If a dict is provided, it must be of the same form as the protobuf message
:class:`~google.cloud.dataproc_v1.types.Job`.
For the complete list of supported job types please take a look here
For the complete list of supported job types and their configurations please take a look here
https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs
:param request_id: Optional. A unique id used to identify the request. If the server receives two
``SubmitJobRequest`` requests with the same id, then the second request will be ignored and the first
Expand Down
Expand Up @@ -22,15 +22,18 @@

import os
from datetime import datetime
from pathlib import Path

from airflow import models
from airflow.decorators import task
from airflow.providers.google.cloud.operators.dataproc import (
DataprocCreateClusterOperator,
DataprocDeleteClusterOperator,
DataprocSubmitJobOperator,
)
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.operators.gcs import (
GCSCreateBucketOperator,
GCSDeleteBucketOperator,
)
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.utils.trigger_rule import TriggerRule

Expand All @@ -43,11 +46,7 @@
REGION = "europe-west1"
ZONE = "europe-west1-b"

PYSPARK_SRC = str(Path(__file__).parent / "resources" / "hello_world.py")
PYSPARK_FILE = "hello_world.py"

# Cluster definition

CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
Expand All @@ -61,14 +60,35 @@
},
}

TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
JOB_FILE_NAME = "dataproc-pyspark-job.py"
JOB_FILE_LOCAL_PATH = f"/tmp/{JOB_FILE_NAME}"
JOB_FILE_CONTENT = """from operator import add
from random import random
from pyspark.sql import SparkSession
def f(_: int) -> float:
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x**2 + y**2 <= 1 else 0
spark = SparkSession.builder.appName("PythonPi").getOrCreate()
partitions = 2
n = 100000 * partitions
count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print(f"Pi is roughly {4.0 * count / n:f}")
spark.stop()
"""

# Jobs definitions
# [START how_to_cloud_dataproc_pyspark_config]
PYSPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {"main_python_file_uri": f"gs://{BUCKET_NAME}/{PYSPARK_FILE}"},
"pyspark_job": {"main_python_file_uri": f"gs://{BUCKET_NAME}/{JOB_FILE_NAME}"},
}
# [END how_to_cloud_dataproc_pyspark_config]

Expand All @@ -78,15 +98,23 @@
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example", "dataproc"],
tags=["example", "dataproc", "pyspark"],
) as dag:
create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
)

@task
def create_job_file():
with open(JOB_FILE_LOCAL_PATH, "w") as job_file:
job_file.write(JOB_FILE_CONTENT)

create_job_file_task = create_job_file()

upload_file = LocalFilesystemToGCSOperator(
task_id="upload_file",
src=PYSPARK_SRC,
dst=PYSPARK_FILE,
src=JOB_FILE_LOCAL_PATH,
dst=JOB_FILE_NAME,
bucket=BUCKET_NAME,
)

Expand Down Expand Up @@ -116,14 +144,23 @@
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
)

@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_job_file():
try:
os.remove(JOB_FILE_LOCAL_PATH)
except FileNotFoundError:
pass
return 0

delete_job_file_task = delete_job_file()

(
# TEST SETUP
create_bucket
>> [upload_file, create_cluster]
[[create_job_file_task, create_bucket] >> upload_file, create_cluster]
# TEST BODY
>> pyspark_task
# TEST TEARDOWN
>> [delete_cluster, delete_bucket]
>> [delete_cluster, delete_bucket, delete_job_file_task]
)

from tests.system.utils.watcher import watcher
Expand Down

0 comments on commit 440c9eb

Please sign in to comment.