Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-31806] Fix public API rule for connectors #22667

Merged
merged 2 commits into from
Jun 12, 2023

Conversation

echauchot
Copy link
Contributor

No description provided.

@flinkbot
Copy link
Collaborator

flinkbot commented May 26, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@echauchot
Copy link
Contributor Author

echauchot commented May 26, 2023

Tested it on Cassandra connector and it reports these violations:

  1. If I put back incorrect IOUtils use in the connector it raises

Method <org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorStateSerializer.deserialize(int, [B)> calls method <org.apache.flink.util.IOUtils.readFully(java.io.InputStream, [B, int, int)> in (CassandraEnumeratorStateSerializer.java:90)
=> This is what we wanted

  1. On current code base, it raises:
    java.lang.AssertionError: Architecture Violation [Priority: MEDIUM] - Rule 'Connector production code must not depend on non-public API outside of connector packages' was violated (16 times): Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (CassandraSource.java:138) Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (CassandraSource.java:124) Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (CassandraSource.java:125) Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (CassandraSource.java:126) Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.String, [Ljava.lang.Object;)> in (CassandraSource.java:127) Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> gets field <org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel.RECURSIVE> in (CassandraSource.java:138) Field <org.apache.flink.connector.cassandra.source.split.CassandraSplitSerializer.SERIALIZER_CACHE> has generic type <java.lang.ThreadLocal<org.apache.flink.core.memory.DataOutputSerializer>> with type argument depending on <org.apache.flink.core.memory.DataOutputSerializer> in (CassandraSplitSerializer.java:0) Method <org.apache.flink.connector.cassandra.source.CassandraSource.checkQueryValidity(java.lang.String)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.Object)> in (CassandraSource.java:145) Method <org.apache.flink.connector.cassandra.source.CassandraSource.checkQueryValidity(java.lang.String)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.Object)> in (CassandraSource.java:149) Method <org.apache.flink.connector.cassandra.source.CassandraSource.checkQueryValidity(java.lang.String)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (CassandraSource.java:0) Method <org.apache.flink.connector.cassandra.source.reader.CassandraSplitReader.generateRangeQuery(java.lang.String, java.lang.String)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (CassandraSplitReader.java:0) Method <org.apache.flink.connector.cassandra.source.split.CassandraSplitSerializer.deserialize(int, [B)> calls constructor <org.apache.flink.core.memory.DataInputDeserializer.<init>([B)> in (CassandraSplitSerializer.java:57) Method <org.apache.flink.connector.cassandra.source.split.CassandraSplitSerializer.serialize(org.apache.flink.connector.cassandra.source.split.CassandraSplit)> calls method <org.apache.flink.core.memory.DataOutputSerializer.clear()> in (CassandraSplitSerializer.java:51) Method <org.apache.flink.connector.cassandra.source.split.CassandraSplitSerializer.serialize(org.apache.flink.connector.cassandra.source.split.CassandraSplit)> calls method <org.apache.flink.core.memory.DataOutputSerializer.getCopyOfBuffer()> in (CassandraSplitSerializer.java:50) Method <org.apache.flink.connector.cassandra.source.split.SplitsGenerator.estimateTableSize()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (SplitsGenerator.java:0) Static Initializer <org.apache.flink.connector.cassandra.source.split.CassandraSplitSerializer.<clinit>()> calls constructor <org.apache.flink.core.memory.DataOutputSerializer.<init>(int)> in (CassandraSplitSerializer.java:34)
    => These are calls to ClosureCleaner, checkState and checkNotNull, VisibleForTesting which should be added as violation exceptions
    => The only discutable part is the dependency on org.apache.flink.core.memory.DataInputDeserializer and org.apache.flink.core.memory.DataOutputSerializer which are internal APIs but we could definitely use [ByteArray | Object]Streams JDK counterparts instead in the Cassandra code.
    @zentol WDYT ?

@echauchot echauchot requested a review from zentol May 26, 2023 14:15
Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So why didn't the previous version work? Is resideOutsideOfPackages just a filter that works on a previously provided set of classes, and since we didn't specify anything like resideInAPackage it filtered on an empty set?

@zentol zentol self-assigned this May 30, 2023
@zentol
Copy link
Contributor

zentol commented May 30, 2023

=> These are calls to ClosureCleaner, checkState and checkNotNull, VisibleForTesting which should be added as violation exceptions
=> The only discutable part is the dependency on org.apache.flink.core.memory.DataInputDeserializer and org.apache.flink.core.memory.DataOutputSerializer which are internal APIs but we could definitely use [ByteArray | Object]Streams JDK counterparts instead in the Cassandra code.

I'd start a separate discussion on these. we likely want to make Preconditions/VisibleForTesting public.
Not sure about DataInput*, maybe we can look at how it would look like in Cassandra and then make a call.
ClosureCleaner I'm also not sure about, but we could consider handling this internally in Flink instead of having each connector use it (e.g., call it in StreamExEnv#addSource)

@zentol
Copy link
Contributor

zentol commented May 30, 2023

Anyway, for now we'll add exclusions and handle this in a follow-up.

@echauchot
Copy link
Contributor Author

So why didn't the previous version work? Is resideOutsideOfPackages just a filter that works on a previously provided set of classes, and since we didn't specify anything like resideInAPackage it filtered on an empty set?

No, it did not work because the rule was conceptually wrong: see the review comment I have put on line 50

@echauchot
Copy link
Contributor Author

echauchot commented May 30, 2023

I'd start a separate discussion on these. we likely want to make Preconditions/VisibleForTesting public.

ok so, in the meantime, we should add them to the frozen violations, right ?

Not sure about DataInput*, maybe we can look at how it would look like in Cassandra and then make a call.

Yes I was about to open a dedicated Cassandra ticket for DataInput* / DataOutput* removal. I could open a PR to replace them with JDK counterparts and discuss there if it is better for Cassandra connector to use JDK libs instead of non-public flink lib.

ClosureCleaner I'm also not sure about, but we could consider handling this internally in Flink instead of having each connector use it (e.g., call it in StreamExEnv#addSource)

+1 on the goal, maybe in the meantime, similarly add this exception to frozen exceptions

@echauchot
Copy link
Contributor Author

Not sure about DataInput*, maybe we can look at how it would look like in Cassandra and then make a call.

Yes I was about to open a dedicated Cassandra ticket for DataInput* / DataOutput* removal. I could open a PR to replace them with JDK counterparts and discuss there if it is better for Cassandra connector to use JDK libs instead of non-public flink lib.

https://issues.apache.org/jira/browse/FLINK-32222

@echauchot echauchot force-pushed the FLINK-31806-arch-tests-public-api branch 2 times, most recently from f8327ef to 124c40b Compare May 31, 2023 09:56
@zentol
Copy link
Contributor

zentol commented May 31, 2023

No, it did not work because the rule was conceptually wrong: see the review comment I have put on line 50

I don't see any comment; did you submit the review?

.and(not(modifier(PUBLIC))))
private static DescribedPredicate<JavaClass>
areFlinkClassesThatResideOutsideOfConnectorPackagesAndAreNotPublic() {
return JavaClass.Predicates.resideInAPackage("org.apache.flink..")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added flink packages filter otherwise all classes (JDK, deps ...) could match the rule

@echauchot
Copy link
Contributor Author

echauchot commented May 31, 2023

No, it did not work because the rule was conceptually wrong: see the review comment I have put on line 50

I don't see any comment; did you submit the review?

My bad, forgot to submit the review. I thought it was automatic, this PR being my own code

@echauchot
Copy link
Contributor Author

I don't get how the centralized flink-architecture-tests-production work. The azure CI complains about freeze.store.default.allowStoreUpdate being disabled in flink-architecture-tests-production whereas it is enabled in flink-architecture-tests-production/src/test/resources/archunit.properties. Even if I enable freeze.refreeze in this module as stated in the README, I still get no error locally and no violation addition.

@echauchot echauchot force-pushed the FLINK-31806-arch-tests-public-api branch from 7e54120 to 124c40b Compare June 5, 2023 11:08
@echauchot
Copy link
Contributor Author

echauchot commented Jun 5, 2023

I don't get how the centralized flink-architecture-tests-production work. The azure CI complains about freeze.store.default.allowStoreUpdate being disabled in flink-architecture-tests-production whereas it is enabled in flink-architecture-tests-production/src/test/resources/archunit.properties. Even if I enable freeze.refreeze in this module as stated in the README, I still get no error locally and no violation addition.

I had to recreate the violation store, just updating it did not work

@echauchot
Copy link
Contributor Author

echauchot commented Jun 5, 2023

Not sure about DataInput*, maybe we can look at how it would look like in Cassandra and then make a call.

Yes I was about to open a dedicated Cassandra ticket for DataInput* / DataOutput* removal. I could open a PR to replace them with JDK counterparts and discuss there if it is better for Cassandra connector to use JDK libs instead of non-public flink lib.

https://issues.apache.org/jira/browse/FLINK-32222

Addressed in Cassandra here: apache/flink-connector-cassandra#17

@echauchot
Copy link
Contributor Author

@flinkbot run azure

@echauchot
Copy link
Contributor Author

relaunching CI because E2E preparation script failed

@echauchot
Copy link
Contributor Author

@flinkbot run azure

@echauchot
Copy link
Contributor Author

@flinkbot run azure

@@ -0,0 +1,1486 @@
Class <org.apache.flink.connector.datagen.table.DataGenVisitorBase$TimeGenerator> implements interface <org.apache.flink.streaming.api.functions.source.datagen.DataGenerator> in (DataGenVisitorBase.java:0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good god...this is gonna be fine to go through...

@zentol
Copy link
Contributor

zentol commented Jun 6, 2023

Rebasing the PR should resolve the e2e CI issue.

@echauchot echauchot force-pushed the FLINK-31806-arch-tests-public-api branch from 7a43e59 to 694db68 Compare June 8, 2023 10:22
@echauchot
Copy link
Contributor Author

@zentol:

  • I rebased on master
  • I changed the rule by reverting the assumption from "class must not depend on non-public" to "class must depend only on public". That makes something that I believe is more maintainable and readable.
  • I updated the violations: the rule seems to work and not entail false-positves on non-public classes enclosed in public classes.
    PTAL

@echauchot echauchot force-pushed the FLINK-31806-arch-tests-public-api branch from 694db68 to ca743b3 Compare June 8, 2023 10:55
@echauchot echauchot force-pushed the FLINK-31806-arch-tests-public-api branch from ca743b3 to 7d299be Compare June 9, 2023 08:08
Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since each of these modifies the rules I'd assume each of these individually would also have to update the store.
So it's either that or squash them all together.
(again, key rule is that each merged commit should individually pass on CI; if they'd already pass go ahead)

[FLINK-31806] Fix public API rule for connectors
[FLINK-31806] Add a rule to test non-public enclosing classes
[FLINK-31806] Reverse rule to positive asumption and update violation… 

@echauchot
Copy link
Contributor Author

echauchot commented Jun 9, 2023

Since each of these modifies the rules I'd assume each of these individually would also have to update the store. So it's either that or squash them all together. (again, key rule is that each merged commit should individually pass on CI; if they'd already pass go ahead)

[FLINK-31806] Fix public API rule for connectors
[FLINK-31806] Add a rule to test non-public enclosing classes
[FLINK-31806] Reverse rule to positive asumption and update violation… 

This is what I though but as you mentioned only the 2 commits below, I merged only these ones:

[FLINK-31806] Reverse rule to positive asumption
[FLINK-31806] Update violations 

That being said, having all commits pass the CI makes total sense. As the first rule was missing the enclosed class case, the second rule was missing a sub-case, there is no point in keeping these commits, I'll squash them.

@echauchot echauchot force-pushed the FLINK-31806-arch-tests-public-api branch from 7d299be to d27c40d Compare June 9, 2023 12:43
@echauchot echauchot merged commit 28abe81 into apache:master Jun 12, 2023
@echauchot echauchot deleted the FLINK-31806-arch-tests-public-api branch June 14, 2023 14:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants