-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Refactor Source & Job data model and Stop Duplicate Ingestion Jobs #685
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this mean that a new record is created for every source pushed to the db, even if there was a previously applied fs sharing the same source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it does. I thought about unifying these, but I think it would be unintuitive for one feature set to be able to modify the source of another feature set. Ideally a source would be configured by the administrator and the user should just select it, at which point sharing sources would make more sense in my opinion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not possible for a feature set to modify the source of another feature set, if a feature set's source is updated a new entry will be written to the db, distinct from the old one (still in use by other feature sets).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Didn't realize that. So it acts pretty similarly to how it's written here with some deduplication?
Should I change anything or are you happy with this implementation as a generalization step?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this implementation won't work once you're able to alter feature set sources, unfortunately. It will break job creation.
|
/hold |
|
/retest |
|
Currently, the point of contention for this PR is how to represent the Source's id. As pointed out by @zhilingc, the source Id (if where to have an id) must stay constant as to ensure that when the user calls SpecService's In this regard we have a couple of options:
|
It seems like the problem here is
How can you do (1) without (3)? What if a new source is being registered that already exists? You have to first have a look to see if there are existing sources with that Id and reuse it. Then when somebody changes the source you have to go and create a copy instead of just modifying it in place. Seems like there is a lot of complexity around managing sources that shouldnt exist.
Sources are 1:1 to feature sets, so they should not be shared or deduplicated in the database. In theory we could decouple sources from feature sets but nobody has made that decision yet. It seems like the current design of the database model has been influenced by an assumption of how the source id's will be used by the Unless I am missing something, (2) is the preferred approach. |
According to this Relevant SO, Hibernate has an internal copy of the entity which it performs a diff with internal copy (dirty checking). It only hits the database with an insert query when it finds a difference in the objects fields (ie id change). If we are to guarantee that the id and non of the fields change, Hibernate will not perform an insert.
If we are able to continue isolate the Source's config in a string config string. We can make sources configurable without making major changes to the Source model.
One approach that @zhilingc has suggested on this front is move towards named sources, similar to what we currently have for stores. This would remove the problem of duplication and opens to a N:M relationship between Sources and Feature Sets. However, the ability to configure sources on a Feature Set level suffers. Generally, I think we should move towards approach (3) with auto-incrementing/generated source ids. Since Hibernate already maintains an internal copy of the Source use for constructing the diff, I would assume that Hibernate would use this internal copy when we perform a query for an existing source and the performance hit is negligible. Apply Feature Set is also not that performance critical in my opinion, as the no. of requests by the average user pales in comparison to get online feature requests for example. (2) is not a suitable solution in my view as its a design choice that could have future implications on our codebase, especially as direct our attention to develop the source part of Feast. All code, not just JobCoordinatorService, that use sources would have to take account the possibility of Sources being duplicates. This may confuse new developers and add to some unnecessary code bloat (ie unique Source checking cropping up over the codebase). |
Agreed on this, as long as it doesn't affect the primary key.
Sure, but this isn't specced out. @ches and @khorshuheng have both highlighted the need to improve functionality around sources. For example having sources customizable locally (per serving deployemnt) even if they are registered centrally (Core). Or having a differences between addresses broadcasted to different consumers. The requirements aren't clear here yet.
Sources are supposed to be duplicated. An optimization that is specific to the Basically what you are arguing for is "pre-deduplication" of sources and managing that at create and update time. But it seems like we are creating a volatile dependency with that approach (3) If I understand you correctly, every time you make a change to a source it could get a new Id. How are you going to maintain referential integrity of sources if a feature set to source relationship is not maintained? |
In (3), where
I take this in the context of approach (1), which is the only approach where the source Id changes based on the contents of the Source. Where hibernates detects a change via its dirty checking in the Id, it automatically figures out that it should create a new object instead of continuing to use the existing one. Since the old source object is not touched, referential integrity is preserved for other feature sets depending on the old source. After reading about Feature Sets being designed as designed Ingestion Concept, not a form of logical grouping however, I think I can see now why (2) makes sense, if Feature Sets themselves are tied to data sourcing and ingestion. Moving forward with approach (2) if there are no concerns. |
|
@woop I'm curious as to why you say that feature sets have a 1:1 mapping with sources and that sources should be duplicated - there is nothing that suggests that in the code base, nor in the usage of feast. Sources being distinct objects that have a many-to-one mapping to feature sets is the relationship that comes to mind intuitively to me, particularly if we are eventually going to move towards named sources in the future. |
You can see this by looking at the feature set specification. Each specification has one source. The configuration of the source is created by the author of that specification. At no point did we indicate to users that the source they define is a shared resource with other feature sets, since they are authoring/creating it, and not selecting a pre-existing source. Further illustrated by just looking at exported feature sets. They all contain the sources in line, and do not reference an externally defined source.
I agree, but that isn't the current design. So we have to decide if we want to fix this problem by changing the Feast design of sources, or fix this implementation. |
I'm not really convinced here. Users specifying their sources in-line rather than defining them externally is more out of convenience to the user than to convey the intention that the sources are unique. I don't really see why feast can't treat sources that are the same as... well... the same. The same goes for displaying the sources in line. It's so that users can get complete information about the feature set, it's not pushing any agenda there. I'd argue that the internal model of entities within feast supports unique sources a lot better than duplicated ones, and the fact that named sources isn't specced out yet shouldn't immediately discount the option of implementing it as such. |
It's not trying to convey anything. A source is a part of a feature set, just like entities, just like feature, just like max_age. All attributes in the specification belong to that specification. Sources are no different.
It can, and in fact it functions like that today. But that is a storage only implementation detail that is now being taken as the Feast design for some reason. I see it as technical debt that should be removed. It is not a design goal to have shared sources yet, otherwise our API would indicate that, which it doesn't.
Just to be clear. A key/value relationship is 1:1. No "agenda" has to be pushed. If you are arguing that the relationship is not 1:1 then it's up to you to provide rationale for that, because our API is 1:1.
Because it was built around a data model of unique sources.
Nobody has discounted that. The point I am making is that it's a big change to make, which would require a proposal and discussions. It's also different from our current design. Furthermore, in our community calls our contributors have voiced approval for the fact that sources are a part of feature sets and tracked together. This is going to become more important when we have audit logs for example. We see value in |
| (source, setsForSource) -> { | ||
| // Sources with same type and config in different Feature Sets are different | ||
| // objects. | ||
| // Make sure that we are dealing with the same source object when spawning jobs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would not get the same object, but it would get objects with the same type and configuration. As long as we are clear that we wont use the object Id anywhere, then that should be fine.
69befc5 to
60acbd8
Compare
|
/test test-end-to-end-batch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On this point we may generate a pair that actually doesn't have a connection. Source may be connected only to some stores, not all of them, for example. But it definitely not all stores X all sources.
Why not continue source compilation (previous line) with pair generation?
.flatMap(
store ->
getFeatureSetsForStore(store).stream()
.map(featureSet -> Pair.of(featureSet.getSource(), store)))
.distinct()
.collect()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correction, since it currently creates map, so we overwrite stores for the same Source key and thus end up with one Store per Source.
Should be list of pairs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected. Thanks for pointing this out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My first comment is still valid. It shouldn't be all sources X all stores. You need to create pairs that are actually connected.
I guess to see that's the case would be great to add test that creates
Store1 -> subscripted only to -> Source1
Store2 -> subscripted only to -> Source2
Only two pairs should be generated. Whereas current implementation will generate 4
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't be a map. It's not 1<->1 relation.
One source can be used to populate many stores
I think it's rather list of Pairs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that JobStatus shouldn't be used here, since you only need two values - this API may be misleading. I suggest to create new Enum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can generate pretty big SQL query by putting long list of Ids. I'm not sure what's the length limit is, but definitely not the best practice.
I suggest to have
Set<Job> allRunningJobs = getAllRunningJobs();
Set<Job> checkedAsNeeded;
for (pair : sourceToStorePairs) {
checkedAsNeeded.add(..)
}
Set<Job> toStop = Sets.difference (allRunningJobs, checkedAsNeeded);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated getExtraJobs() do the diff in memory instead of as an SQL query
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My first comment is still valid. It shouldn't be all sources X all stores. You need to create pairs that are actually connected.
I guess to see that's the case would be great to add test that creates
Store1 -> subscripted only to -> Source1
Store2 -> subscripted only to -> Source2
Only two pairs should be generated. Whereas current implementation will generate 4
core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java
Outdated
Show resolved
Hide resolved
bfc6f00 to
6894c58
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably it was meant "%s-to-%s-%s", source
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected to Source.type-hash(Source.config)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this one is supposed to be used in createJobId then it's not exactly correct. this.id won't be populated in ConsolidatedSource and more stable formula (type + config) should be used as kafka consumer group id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I would rather have this logic in createJobId if that's the case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to createJobId()
8ec69e4 to
ac798c7
Compare
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: pyalex, woop The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
/lgtm |
|
/unhold |
What this PR does / why we need it:
1. Generalise Source Model.
The current Source model in Feast Core is Kafka specific. For all intents and purposes it is a hardcoded implementation of KafkaSource containing topics/brokers as top level fields, despite the naming being Source.
Not generalizing the data model at this point (prior to the release of 0.5) will cause further problems down the road when new sources are introduced.
This PR moves Source configuration into a
configobject and isolates Kafka specific logic to case statements.isDefaultis retained for the time being as a top level field, since it can easily be phased out if needed.Configuration is stored in String format:

Comparison between Source objects with
.equals()no longer takes into theisDefaultfield.Under this model, identical Source objects (ie
.equals()) can be stored as duplicate source Objects.JobCoordinatorServiceto take this into account.2. Make Feast stop duplicate ingestion Jobs.
JobCoordinatorServicedoes not stop duplicate jobs (ie ingestion jobs that ingest from the same exact source to store pairing.).JobCoordinatorServiceto abort these extra ingestion Jobs when safe (ie only whenJobCoordinatorServicecan find a running ingestion job for each Source to Store pairing).3. Job Model Refactors
JobManagerAPI:startJob()is standardised as transitioning a Job fromPENDINGtoRUNNING.abortJob()is standardised as transitioning a Job fromRUNNINGtoABORTINGabortJob()to returnJoband take aJobas args to be consistent with other methods.JobUpdateTask.call()to be easier to follow.JobCoordinatorService.poll()into multiple methods (iegetSourceToStoreMapping(),makeJobUpdateTasks()) to make code more readable.Jobto store source fields (ietypeandconfig) as inline fields in the Job table.Which issue(s) this PR fixes:
Fixes #632
Does this PR introduce a user-facing change?: