Skip to content

Commit

Permalink
add method and test for determining if a connection has an alpha or b…
Browse files Browse the repository at this point in the history
…eta connector (#22190)
  • Loading branch information
pmossman committed Feb 1, 2023
1 parent 7978862 commit a8bdbe2
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1568,6 +1568,30 @@ public boolean getWorkspaceHasAlphaOrBetaConnector(final UUID workspaceId) throw
return countResult > 0;
}

/**
* Specialized query for efficiently determining a connection's eligibility for the Free Connector
* Program. If a connection has at least one Alpha or Beta connector, it will be free to use as long
* as the workspace is enrolled in the Free Connector Program. This check is used to allow free
* connections to continue running even when a workspace runs out of credits.
*
* @param connectionId ID of the connection to check connectors for
* @return boolean indicating if an alpha or beta connector is used by the connection
*/
public boolean getConnectionHasAlphaOrBetaConnector(final UUID connectionId) throws IOException {
final Condition releaseStageAlphaOrBeta = ACTOR_DEFINITION.RELEASE_STAGE.eq(ReleaseStage.alpha)
.or(ACTOR_DEFINITION.RELEASE_STAGE.eq(ReleaseStage.beta));

final Integer countResult = database.query(ctx -> ctx.selectCount()
.from(CONNECTION)
.join(ACTOR).on(ACTOR.ID.eq(CONNECTION.SOURCE_ID).or(ACTOR.ID.eq(CONNECTION.DESTINATION_ID)))
.join(ACTOR_DEFINITION).on(ACTOR_DEFINITION.ID.eq(ACTOR.ACTOR_DEFINITION_ID))
.where(CONNECTION.ID.eq(connectionId))
.and(releaseStageAlphaOrBeta))
.fetchOneInto(Integer.class);

return countResult > 0;
}

/**
* Deletes all records with given id. If it deletes anything, returns true. Otherwise, false.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import static io.airbyte.db.instance.configs.jooq.generated.Tables.CONNECTION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -23,6 +24,7 @@
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSourceDefinition.ReleaseStage;
import io.airbyte.config.StandardSourceDefinition.SourceType;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.NonBreakingChangesPreference;
Expand Down Expand Up @@ -59,12 +61,16 @@ record StandardSyncProtocolVersionFlag(UUID standardSyncId, boolean unsupportedP
private StandardSyncPersistence standardSyncPersistence;

private StandardSourceDefinition sourceDef1;
private StandardSourceDefinition sourceDefAlpha;
private SourceConnection source1;
private SourceConnection source2;
private SourceConnection sourceAlpha;
private StandardDestinationDefinition destDef1;
private StandardDestinationDefinition destDef2;
private StandardDestinationDefinition destDefBeta;
private DestinationConnection destination1;
private DestinationConnection destination2;
private DestinationConnection destinationBeta;

@BeforeEach
void beforeEach() throws Exception {
Expand Down Expand Up @@ -232,6 +238,23 @@ void testGetAllStreamsForConnection() throws Exception {
streamDescriptor -> "stream2".equals(streamDescriptor.getName()) && streamDescriptor.getNamespace() == null));
}

@Test
void testConnectionHasAlphaOrBetaConnector() throws JsonValidationException, IOException {
createBaseObjects();

final StandardSync syncGa = createStandardSync(source1, destination1);
standardSyncPersistence.writeStandardSync(syncGa);
assertFalse(configRepository.getConnectionHasAlphaOrBetaConnector(syncGa.getConnectionId()));

final StandardSync syncAlpha = createStandardSync(sourceAlpha, destination1);
standardSyncPersistence.writeStandardSync(syncAlpha);
assertTrue(configRepository.getConnectionHasAlphaOrBetaConnector(syncAlpha.getConnectionId()));

final StandardSync syncBeta = createStandardSync(source1, destinationBeta);
standardSyncPersistence.writeStandardSync(syncBeta);
assertTrue(configRepository.getConnectionHasAlphaOrBetaConnector(syncBeta.getConnectionId()));
}

private Set<StandardSyncProtocolVersionFlag> getProtocolVersionFlagForSyncs(final List<StandardSync> standardSync) throws SQLException {
return database.query(ctx -> ctx
.select(CONNECTION.ID, CONNECTION.UNSUPPORTED_PROTOCOL_VERSION)
Expand Down Expand Up @@ -276,20 +299,27 @@ private void createBaseObjects() throws IOException, JsonValidationException {
.withDefaultGeography(Geography.AUTO);
configRepository.writeStandardWorkspaceNoSecrets(workspace);

sourceDef1 = createStandardSourceDefinition("0.2.2");
sourceDef1 = createStandardSourceDefinition("0.2.2", ReleaseStage.GENERALLY_AVAILABLE);
source1 = createSourceConnection(workspaceId, sourceDef1);

final StandardSourceDefinition sourceDef2 = createStandardSourceDefinition("1.1.0");
final StandardSourceDefinition sourceDef2 = createStandardSourceDefinition("1.1.0", ReleaseStage.GENERALLY_AVAILABLE);
source2 = createSourceConnection(workspaceId, sourceDef2);

destDef1 = createStandardDestDefinition("0.2.3");
sourceDefAlpha = createStandardSourceDefinition("1.0.0", ReleaseStage.ALPHA);
sourceAlpha = createSourceConnection(workspaceId, sourceDefAlpha);

destDef1 = createStandardDestDefinition("0.2.3", StandardDestinationDefinition.ReleaseStage.GENERALLY_AVAILABLE);
destination1 = createDestinationConnection(workspaceId, destDef1);

destDef2 = createStandardDestDefinition("1.0.0");
destDef2 = createStandardDestDefinition("1.3.0", StandardDestinationDefinition.ReleaseStage.GENERALLY_AVAILABLE);
destination2 = createDestinationConnection(workspaceId, destDef2);

destDefBeta = createStandardDestDefinition("1.3.0", StandardDestinationDefinition.ReleaseStage.BETA);
destinationBeta = createDestinationConnection(workspaceId, destDefBeta);
}

private StandardSourceDefinition createStandardSourceDefinition(final String protocolVersion) throws JsonValidationException, IOException {
private StandardSourceDefinition createStandardSourceDefinition(final String protocolVersion, final ReleaseStage releaseStage)
throws JsonValidationException, IOException {
final UUID sourceDefId = UUID.randomUUID();
final StandardSourceDefinition sourceDef = new StandardSourceDefinition()
.withSourceDefinitionId(sourceDefId)
Expand All @@ -301,6 +331,7 @@ private StandardSourceDefinition createStandardSourceDefinition(final String pro
.withIcon("icon-1")
.withSpec(new ConnectorSpecification())
.withProtocolVersion(protocolVersion)
.withReleaseStage(releaseStage)
.withTombstone(false)
.withPublic(true)
.withCustom(false)
Expand All @@ -309,7 +340,9 @@ private StandardSourceDefinition createStandardSourceDefinition(final String pro
return sourceDef;
}

private StandardDestinationDefinition createStandardDestDefinition(final String protocolVersion) throws JsonValidationException, IOException {
private StandardDestinationDefinition createStandardDestDefinition(final String protocolVersion,
final StandardDestinationDefinition.ReleaseStage releaseStage)
throws JsonValidationException, IOException {
final UUID destDefId = UUID.randomUUID();
final StandardDestinationDefinition destDef = new StandardDestinationDefinition()
.withDestinationDefinitionId(destDefId)
Expand All @@ -320,6 +353,7 @@ private StandardDestinationDefinition createStandardDestDefinition(final String
.withIcon("icon-3")
.withSpec(new ConnectorSpecification())
.withProtocolVersion(protocolVersion)
.withReleaseStage(releaseStage)
.withTombstone(false)
.withPublic(true)
.withCustom(false)
Expand Down

0 comments on commit a8bdbe2

Please sign in to comment.