Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for XCom to Armada operator for airflow #2279

Closed
michelsumbul opened this issue Mar 16, 2023 · 4 comments
Closed

Add support for XCom to Armada operator for airflow #2279

michelsumbul opened this issue Mar 16, 2023 · 4 comments
Labels
third-party/airflow Related to Airflow or the Airflow Operator type/code Code-related changes

Comments

@michelsumbul
Copy link

michelsumbul commented Mar 16, 2023

Hi,

We would to add the support for XCom for the armada operator in airflow.
For information here is a doc on what is XCom: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/xcoms.html

Here is a doc explaining how the XCom work with the KubernetesPodOperator: https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/operators.html

Thanks,
Michel

┆Issue is synchronized with this Jira Task by Unito

@Sharpz7 Sharpz7 added third-party/airflow Related to Airflow or the Airflow Operator type/code Code-related changes labels Aug 24, 2023
@itsaviral2609
Copy link
Contributor

Hey @Sharpz7
I will like to give a shot to solve this issue!
Anything more to keep in mind other than above resources to solve this?

@itsaviral2609
Copy link
Contributor

itsaviral2609 commented Aug 28, 2023

 try:
           job_id = job.job_response_items[0].job_id
        except Exception:
            raise AirflowException("Armada has issues submitting job")

        armada_logger.info("Running Armada job %s with id %s", self.name, job_id)

        lookout_url = self._get_lookout_url(job_id)
        if len(lookout_url) > 0:
            armada_logger.info("Lookout URL: %s", lookout_url)

        job_state, job_message = search_for_job_complete(
            job_service_client=job_service_client,
            armada_queue=self.armada_queue,
            job_set_id=job_set_id,
            airflow_task_name=self.name,
            job_id=job_id,
            poll_interval=self.poll_interval,
        )
         # Push the job state and message to XCom
    	context['ti'].xcom_push(key='armada_job_state', value=job_state)
    	context['ti'].xcom_push(key='armada_job_message', value=job_message)
       
        armada_logger.info(
            "Armada Job finished with %s and message: %s", job_state, job_message
        )
        airflow_error(job_state, self.name, job_id)

    def _get_lookout_url(self, job_id: str) -> str:
        if self.lookout_url_template is None:
            return ""
        return self.lookout_url_template.replace("<job_id>", job_id)

    def render_template_fields(
        self,
        context: Context,
        jinja_env: Optional[jinja2.Environment] = None,
    ) -> None:
        self.job_request_items = [
            MessageToDict(x, preserving_proto_field_name=True)
            for x in self.job_request_items
        ]
        super().render_template_fields(context, jinja_env)
        self.job_request_items = [
            ParseDict(x, JobSubmitRequestItem()) for x in self.job_request_items
        ]
        
    # Your function to pull the XCom values
    def pull_xcom_values_from_armada(**kwargs):
    	ti = kwargs['ti']
   	job_state = ti.xcom_pull(task_ids='armada', key='armada_job_state')
      	job_message = ti.xcom_pull(task_ids='armada', key='armada_job_message')
    	print(f"Job State: {job_state}")
    	print(f"Job Message: {job_message}")
   


def submit_sleep_job():
    """
    This is a PodSpec definition that allows you to run sleep.
    This returns an array of JobSubmitRequestItems that allows you
    to submit to Armada.
    """
    pod = core_v1.PodSpec(
        containers=[
            core_v1.Container(
                name="sleep",
                image="busybox",
                args=["sleep", "10s"],
                securityContext=core_v1.SecurityContext(runAsUser=1000),
                resources=core_v1.ResourceRequirements(
                    requests={
                        "cpu": api_resource.Quantity(string="120m"),
                        "memory": api_resource.Quantity(string="510Mi"),
                    },
                    limits={
                        "cpu": api_resource.Quantity(string="120m"),
                        "memory": api_resource.Quantity(string="510Mi"),
                    },
                ),
            )
        ],
    )

    return [
        submit_pb2.JobSubmitRequestItem(
            priority=1,
            pod_spec=pod,
            namespace="personal-anonymous",
            annotations={"armadaproject.io/hello": "world"},
        )
    ]

		
     """
      This is an example of a Airflow dag that uses a PythonOperator and an ArmadaOperator
     """
    	
    with DAG(
    dag_id="hello_armada",
    start_date=pendulum.datetime(2016, 1, 1, tz="UTC"),
    schedule_interval="@daily",
    catchup=False,
    default_args={"retries": 2},
) as dag:
    
    armada_channel_args = {"target": "127.0.0.1:50051"}
    job_service_channel_args = {"target": "127.0.0.1:60003"}

    armada = ArmadaOperator(
        task_id="armada",
        do_xcom_push=True,
        name="armada",
        armada_queue="test",
        job_service_channel_args=job_service_channel_args,
        armada_channel_args=armada_channel_args,
        job_request_items=submit_sleep_job(),
        lookout_url_template="http://127.0.0.1:8089/jobs?job_id=<job_id>",
        
    )

    pull_values_task = PythonOperator(
        task_id='pull_xcom_values',
        python_callable=pull_xcom_values_from_armada,
        provide_context=True,
    )
    
    armada >> pull_values_task	

I tried to implement something like this in armada.py file. Is it correct line of approach in terms of approaching the problem. I tried to develop a mechanism to push data to XCom from within one operator and pull it within another, specifically the ArmadaOperator and PythonOperator.

Within your DAG, there are two tasks:
The ArmadaOperator, which submits a job and pushes its state and message to XCom.
The PythonOperator, which pulls these values from XCom and prints them.

https://github.com/armadaproject/armada/blob/master/third_party/airflow/examples/hello_armada.py
Took some part of code from hello_armada.py to implement it.

Should this be the way to carry things forward for testing? I have no prior experience with airflow so would need little help to build and test it!
@Sharpz7 @Mo-Fatah @kannon92

@dave-gantenbein
Copy link
Member

@itsaviral2609 I think this would be a lot easier to review in the form of a pull request....

@dave-gantenbein
Copy link
Member

Won't do

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
third-party/airflow Related to Airflow or the Airflow Operator type/code Code-related changes
Projects
None yet
Development

No branches or pull requests

4 participants