Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify job hierarchy for Spark #1672

Closed
wslulciuc opened this issue Feb 28, 2023 · 13 comments
Closed

Simplify job hierarchy for Spark #1672

wslulciuc opened this issue Feb 28, 2023 · 13 comments

Comments

@wslulciuc
Copy link
Collaborator

We currently name spark jobs using the following naming pattern:

  • {spark.app.name}: name of spark app
  • {spark.app.name}.{action.name}: name of spark action

Although collecting lineage events for a parent job (spark app) and children (actions) can be very useful, the level of integration might be to verbose. That is, input/output datasets are associated with the spark actions, but in some cases, associating them to the parent job would be preferred.

I propose we allow users to toggle the verbosity of the integration level. For example, let's say we have the spark app MySparkApp with actions (assuming naming pattern above):

  • MySparkApp.action1
  • MySparkApp.action2
  • MySparkApp.action3

where each spark action had input/output datasets. This is the current default behavior. But, when verbosity was disabled, the input/output datasets associated with the spark actions would now be associated to only the MySparkApp; therefore, simplifying the lineage graph and "flattening" the job hierarchy.

@pawel-big-lebowski
Copy link
Contributor

First, current version (11.2023) of the integration creates job names based on: {spark.app.name}.{action.name}.{output-dataset-name}. Reducing the verbosity would require keeping the application stateful, which is something we were avoiding for so long and would love in keeping so. There are also other good reasons behind that. If Spark application runs multiple spark.sql('create table as select') and fails on some NullpointerException between them on the driver side, a user will not get any lineage about output being written from the failed job. I doubt this is something people do want.

In my opinion, there is nothing wrong on the verbosity although it requires extra amount of work on the backend side, to merge multiple OL events to a single overview of a Spark Job.

However, I think there are other issues that I would consider a problem:

  • A spark application may emit multiple OL events with different runIds and there is no way to group them on the backend.
  • For a spark application we never know the whole app succeeded.

I think the solution to this should be some SparkApplicationRunFacet or parentRun such that:

  • It would emit start event on application start (new event),
  • For each task within the app, the OL event will contain application id, that allows it be grouped to everything done by the application
  • An extra event is sent onApplicationEnd

@mobuchowski
Copy link
Member

From discussion with @harels @pawel-big-lebowski: there are two different options we can talk about here:

  • first option is whether the jobs (for a single action) will emit all the collected data for the run up till this point - information each subsequent event - START, COMPLETE, maybe couple RUNNING in the middle - will be a superset of an information in previous event. FYI @jenspfaug
  • second option is whether we should (or can?) emit single complete event containing all of that data, without emitting intermediary events.

We should think whether we can make sure that introducing those won't cause a memory leak.

@suraj5077
Copy link

suraj5077 commented Dec 6, 2023

Associating the input/output datasets with the parent job (spark application) makes more sense for data catalog based backends.
There are a lot of problems for the backend when Spark sends events on Spark actions. These jobs may write to some temporary data assets (like temp files) which are not meaningful from the lineage point of view. The user consuming the lineage would always want to view the data flow in terms of the permanent data assets existing outside the lifecycle of the Spark application (like database tables etc.) and the lineage becomes very verbose in case of a lot of such intermediate events triggered by Spark actions.

Sending events at the application level (instead of action level) can help improve the experience as all the intermediate data assets would be skipped and we'll just have the input and output datasets for a Job (entire spark app) run. It can be simpler.

@jenspfaug
Copy link

There are a few topics here.

1. How to map events for Spark actions to the Spark application?
AFAIU the parent run facet is the generic and agreed way to do this (as mentioned by @pawel-big-lebowski). This means that OL events with separate job id and run ids need to be emitted for the Spark application itself, and each action needs to have its own job ids and run ids as well as a parent facet.
This will help with grouping for standard lineage use cases and also with observability use cases, e.g. where the user is interested in run times of the whole Spark application.

2. How to present the lineage information to the user?
@wslulciuc proposed the verbosity toggle. I think that instead of increasing the complexity of the spark integration we should leave aggregation of events to the OpenLineage consumer. For some, verbose, action-level lineage might actually be required. For others, they only need aggregated, application-level lineage. If a toggle is introduced to the spark OpenLineage producer, any consumer that is interested in verbose events will need to build a fallback on how to deal with producers that send aggregated information only.

It's important to recognise also that aggregation is not a lossless transformation.

3. Whether inputs and outputs cumulate between events
This one probably needs an online discussion.

@mobuchowski
Copy link
Member

mobuchowski commented Jan 9, 2024

@mgorsk1 replying to OpenLineage/docs#268

it's not totally ok that spark lineage events don't have default run.parent.job.name - this actually creates a confusion and a very wide area for interpretation what should it be (while there is an obvious candidate for this - see next point)

Agreed.

if spark job.name contains spark.appName, we should default run.parent.job.name to this value - then we wouldn't need to provide spark.openlineage.parentJobName at all and it would be as you intend it - as close to actual execution as possible.

I would describe what we need to do differently, since use of spark.openlineage.parentJobName was thought of somewhat differently:

  1. Actually emitting events describing that parent job - on Spark application start and end with name Spark.appName - let's call it application job.
  2. Making individual action jobs point to application job as their parent job
  3. If spark.openlineage.parentJobName is provided, then it should be parent job of an application job.

This allows us to create deeper hierarchy - for example when we know that Spark job is scheduled by Airflow or some other scheduler.

openlineage event from airflow is should have job.name inheriting from task_id while run.parent.job.name should be dag_id. It's a very accurate description of actual state and it's already implemented like that.

Ideally our "unique identifier" of a job would be a composite aggregation of names of all parent jobs - but for practical reasons - like, making processing OpenLineage data in relational databases easier - some duplication, like including Spark.appName into action job name, or including DAG name into Airflow task job name makes life of OpenLineage consumers easier. If that's not true then let me know - I think you're one of them.

@jenspfaug @pawel-big-lebowski @wslulciuc

I think we have the consensus on 1st point of Jens as described above. I will work on adding application events to Spark integration.

For the consumer-side aggregation I'd skip it for now unless we have additional input, since it introduces possible loss of information.
For example, one app with two actions:
action 1 reading from dataset A and writing to dataset B, action 2 reading from dataset B and writing to dataset C would have inputs: A, B and outputs: B, C which is pretty wrong.

@jenspfaug
Copy link

@mobuchowski would you mind describing a whole example where a DAG spawns a Task, which spawns a Spark application, which spawns a Spark action? What are the parent run id, parent namespace, parent name going to be on each of the levels?
It might be useful to add that here.

Btw, in addition to spark.openlineage.parentJobName, don't we also need spark.openlineage.parentJobNamespace as Spark OL parameters?

@mobuchowski
Copy link
Member

mobuchowski commented Jan 23, 2024

@jenspfaug I've created diagram for that case:

This describes parent hierarchy of an Airflow DAG that has two operators, first one spawning Spark job. The job has two actions:
events drawio

That's order of events that would come from execution of above:

events drawio(2)

It might be useful to add that here.

I'd rather move to use https://openlineage.io/docs/ fully for all documentation rather than scattered .md files, I'll add doc there when PR regarding this topic will get accepted.

Btw, in addition to spark.openlineage.parentJobName, don't we also need spark.openlineage.parentJobNamespace as Spark OL parameters?

Yes - added this in PR.

@mgorsk1
Copy link

mgorsk1 commented Jan 25, 2024

Somehow I still fail to understand this hop where parent of Spark Application is not DAG but SparkOperator. This overcomplicates things and brings little value in my opinion. If you consider comparison between SparkOperator and, for example, BashOperator (which also can be source of lineage information considering it accepts inlets and outlets kwargs):

  • BashOperator can also be a source of airflow lineage information. It executes a process (linux process for example) that accomplishes something. So for this linux process the parent is DAG, the details of airflow BashOperator task are not captured.
  • SparkOperator executes process as well, same as BashOperator, except it's not a linux process but JVM process. For this process, according to your design, parent is airflow task, for which we capture additional details.

So your design assumes for just one operator different behavior is desired than for any other operator in airflow and I don't see how that is a good thing.

@jenspfaug
Copy link

@mobuchowski thank you very much for putting this together. From the discussion above I understood that there should be separate events OnApplicationStart and OnApplicationEnd with job namespace and name representing the Spark application itself. You only seem to account for events coming from the actions themselves.

@mgorsk1 I see the following reasons for having the Spark application as the parent of the Spark actions.

  1. Consider only a Spark application (that is not spawned from Airflow): There is a need for some users to be able to view lineage at different aggregation levels. @wslulciuc's original proposal reflects that. The cleanest way to model this is by explicitly representing the Spark application as a job, as a parent to the individual actions.
  2. Given (1), it is preferable that a Spark application spawned by Airflow is represented in the same way and not differently.
  3. There is benefit in representing explicitly all the operators in a DAG because (a) they carry additional metadata that is not necessarily available at the level of the Spark application and (b) because that allows a consumer to present the full DAG to the user in a homogenous way (i.e. not with some operators presented as operators and some as the child application they spawned).

@JDarDagran
Copy link
Contributor

@mgorsk1

BashOperator can also be a source of airflow lineage information. It executes a process (linux process for example) that accomplishes something. So for this linux process the parent is DAG, the details of airflow BashOperator task are not captured.

Indeed, if BashOperator spawns another application that would produce lineage, that spawned process should be responsible for emitting lineage with Airflow task as parent run.

I used term application above as I believe that if BashOperator just uses Bash to do Bash-specific processing then there should be no child for BashOperator. Example of that would be:

sort input.txt > sorted.txt

The same goes with PythonOperator - if no external to Airflow's execution environment application is executed then PythonOperator is primary job that produces inputs and outputs.

@mobuchowski
Copy link
Member

@mgorsk1 to add what @JDarDagran and @jenspfaug wrote, imagine multiple operators spawning different jobs, not only Spark - having granular information allows you to determine what actually spawned the job. Looking at your example, similarly DAG alone does not do anything, but spawns the actual jobs - operators - that do the work.

@jenspfaug - you are obviously right, I've not included those events in previous graph. The fixed one is here, with grouped Spark Action START/RUNNING/COMPLETE events for clarity.
events drawio(2)

@michalmodras
Copy link

From Airflow's perspective, I believe parent of the SparkApplication should be the specific Operators/task that spawned it. Airflow DAG is not granular enough - it can have a thousand of tasks, so just pointing from Spark job to the DAG does not seem sufficient. Having said that, DAG metadata can also be present in the lineage event facet, as a sort of additional context.

On spark, I think events should be aggregated on the metastore level, and not decided on by consumer (at least not by default), given lossy character of aggregation and potential lineage consumer-dependent context.

@mobuchowski
Copy link
Member

Merged in #2371 - I will close the issue, but please reopen or create new one if you have further comments. We're going with ⏫ solution now, but that does not mean we can't revisit it further.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

9 participants