Skip to content

Commit

Permalink
feat(cron): get and persist latest declarative manifest docker image …
Browse files Browse the repository at this point in the history
…versions (#12499)

## What
<!--
* Describe what the change is solving.
* It helps to add screenshots if it affects the frontend.
-->
Allow updating of the CDK version used on the fly.

Close airbytehq/airbyte-internal-issues#7505

## How
<!--
* Describe how code changes achieve the solution.
-->
* Add a `RemoteDeclarativeManifestImageVersionsProvider` which queries dockerhub tags for `airbyte/source-declarative-manifest` and returns the latest (via semver) version for each major. 
  * Use this provider in the cloud cron, so that we get the most up-to-date versions on boot 
  * Create a cron which runs a remoteDeclarativeSourcesUpdater every 10 minutes, using this new version provider

## Recommended reading order
1. [RemoteDeclarativeManifestImageVersionsProvider.kt](https://github.com/airbytehq/airbyte-platform-internal/pull/12499/files#diff-dc2c4a3ceb4056cbab79d8e6c3f8aa215d26d61653ac6ce0900abf2224790bc4) / [SeedBeanFactory.kt](https://github.com/airbytehq/airbyte-platform-internal/pull/12499/files#diff-b87092e77cdce0ec47ecb08b39f981528cf2f174c09b557bc9180f275b4e75b5)
2. [CloudPostLoadExecutor.java](https://github.com/airbytehq/airbyte-platform-internal/pull/12499/files#diff-be7b66ae16abbabc6a60bc5f6641112998818d375e6813d38477e1cf9cfd0233)
3. [DeclarativeSourcesUpdater.java](https://github.com/airbytehq/airbyte-platform-internal/pull/12499/files#diff-934375da2ac12f000fde28addddaf93e9987463899cb2dca75cf2ecad63e1c4c)

## Can this PR be safely reverted and rolled back?
<!--
* If you know that your be safely rolled back, check YES.*
* If that is not the case (e.g. a database migration), check NO.
* If unsure, leave it blank.*
-->
- [x] YES 💚
- [ ] NO ❌
  • Loading branch information
erohmensing committed May 23, 2024
1 parent 9479dbe commit a61b18c
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 1 deletion.
3 changes: 3 additions & 0 deletions airbyte-config/init/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@ dependencies {
implementation(libs.airbyte.protocol)
implementation(project(":airbyte-json-validation"))
implementation(libs.guava)
implementation(libs.okhttp)
implementation(libs.bundles.jackson)

testImplementation(project(":airbyte-test-utils"))
testRuntimeOnly(libs.junit.jupiter.engine)
testImplementation(libs.bundles.junit)
testImplementation(libs.assertj.core)
testImplementation(libs.junit.pioneer)
testImplementation(libs.mockk)

}

airbyte {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.airbyte.config.init

import io.airbyte.commons.json.Jsons
import jakarta.inject.Named
import jakarta.inject.Singleton
import okhttp3.OkHttpClient
import okhttp3.Request
import org.slf4j.LoggerFactory
import java.io.IOException

@Singleton
@Named("remoteDeclarativeManifestImageVersionsProvider")
class RemoteDeclarativeManifestImageVersionsProvider(
@Named("dockerHubOkHttpClient") val okHttpClient: OkHttpClient,
) : DeclarativeManifestImageVersionsProvider {
companion object {
private val log = LoggerFactory.getLogger(RemoteDeclarativeManifestImageVersionsProvider::class.java)
}

override fun getLatestDeclarativeManifestImageVersions(): Map<Int, String> {
val repository = "airbyte/source-declarative-manifest"
val tags = getTagsForRepository(repository)

val semverStandardVersionTags = tags.filter { it.matches(Regex("""^\d+\.\d+\.\d+$""")) }.toList()
val latestVersionsByMajor =
semverStandardVersionTags
.groupBy { it.split(".")[0].toInt() }
.mapValues { (_, versionsByMajor) -> versionsByMajor.maxBy { it } }

log.info("Latest versions for $repository: $latestVersionsByMajor")
return latestVersionsByMajor
}

private fun getTagsForRepository(repository: String): List<String> {
val tags = mutableListOf<String>()

// 100 is max allowed page size for DockerHub
var nextUrl: String? = "https://hub.docker.com/v2/repositories/$repository/tags?page_size=100"

log.info("Fetching image tags for $repository...")
while (nextUrl != null) {
val request = Request.Builder().url(nextUrl).build()
okHttpClient.newCall(request).execute().use { response ->
if (!response.isSuccessful || response.body == null) {
throw IOException(
"Unexpected response from DockerHub API: ${response.code} ${response.message}",
)
}
val body = Jsons.deserialize(response.body!!.string())
tags.addAll(body.get("results").elements().asSequence().mapNotNull { it.get("name").asText() })
nextUrl = if (!body.get("next").isNull) body.get("next").asText() else null
}
}
log.info("DockerHub tags for $repository: $tags")
return tags
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import io.micronaut.context.annotation.Value
import io.micronaut.core.util.StringUtils
import jakarta.inject.Named
import jakarta.inject.Singleton
import okhttp3.OkHttpClient
import org.slf4j.Logger
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -44,6 +45,16 @@ class SeedBeanFactory {
throw IllegalArgumentException("Invalid seed provider: $seedProvider")
}

@Singleton
@Named("remoteDeclarativeSourceUpdater")
fun remoteDeclarativeSourceUpdater(
@Named("remoteDeclarativeManifestImageVersionsProvider") declarativeManifestImageVersionsProvider: DeclarativeManifestImageVersionsProvider,
declarativeManifestImageVersionService: DeclarativeManifestImageVersionService,
actorDefinitionService: ActorDefinitionService,
): DeclarativeSourceUpdater {
return DeclarativeSourceUpdater(declarativeManifestImageVersionsProvider, declarativeManifestImageVersionService, actorDefinitionService)
}

@Singleton
@Named("localDeclarativeSourceUpdater")
fun localDeclarativeSourceUpdater(
Expand All @@ -54,6 +65,12 @@ class SeedBeanFactory {
return DeclarativeSourceUpdater(declarativeManifestImageVersionsProvider, declarativeManifestImageVersionService, actorDefinitionService)
}

@Singleton
@Named("dockerHubOkHttpClient")
fun okHttpClient(): OkHttpClient {
return OkHttpClient()
}

companion object {
private val LOGGER: Logger = LoggerFactory.getLogger(SeedBeanFactory::class.java)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test

internal class DeclarativeSourceUpdaterTest {
private var mDeclarativeManifestImageVersionsProvider: DeclarativeManifestImageVersionsProvider = mockk()
private var mDeclarativeManifestImageVersionsProvider: RemoteDeclarativeManifestImageVersionsProvider = mockk()
private var mDeclarativeManifestImageVersionService: DeclarativeManifestImageVersionService = mockk()
private var mActorDefinitionService: ActorDefinitionService = mockk()
private lateinit var declarativeSourceUpdater: DeclarativeSourceUpdater
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.config.init
import io.mockk.confirmVerified
import io.mockk.every
import io.mockk.justRun
import io.mockk.mockk
import io.mockk.verify
import okhttp3.OkHttpClient
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertThrows
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import java.io.IOException

internal class RemoteDeclarativeManifestImageVersionsProviderTest {
private val okHttpClient: OkHttpClient = mockk(relaxed = true)
private lateinit var declarativeManifestImageVersionsProvider: RemoteDeclarativeManifestImageVersionsProvider

@BeforeEach
fun setup() {
declarativeManifestImageVersionsProvider = RemoteDeclarativeManifestImageVersionsProvider(okHttpClient)
}

@Test
fun `test getLatestDeclarativeManifestImageVersions`() {
every { okHttpClient.newCall(any()).execute() } returns
successfulResponse(
"""
{
"count": 8,
"next": null,
"previous": null,
"results": [
{"name": "0.90.0"},
{"name": "1.0.0"},
{"name": "1.0.1"},
{"name": "2.0.0"},
{"name": "2.0.1-dev123456"},
{"name": "3.0.0"}
]
}
""",
)

val latestMajorVersions = declarativeManifestImageVersionsProvider.getLatestDeclarativeManifestImageVersions()
val expectedLatestVersions =
mapOf(
0 to "0.90.0",
1 to "1.0.1",
2 to "2.0.0",
3 to "3.0.0",
)
assertEquals(expectedLatestVersions, latestMajorVersions)

verify(exactly = 1) { okHttpClient.newCall(any()).execute() }
confirmVerified(okHttpClient)
}

@Test
fun `test pagination`() {
every { okHttpClient.newCall(any()).execute() } returns
successfulResponse(
"""
{
"count": 3,
"next": "https://hub.docker.com/v2/repositories/airbyte/source-declarative-manifest/tags?page_size=1&page=2",
"previous": null,
"results": [
{"name": "1.0.1"}
]
}
""",
) andThen
successfulResponse(
"""
{
"count": 3,
"next": "https://hub.docker.com/v2/repositories/airbyte/source-declarative-manifest/tags?page_size=1&page=3",
"previous": "https://hub.docker.com/v2/repositories/airbyte/source-declarative-manifest/tags?page_size=1",
"results": [
{"name": "1.0.0"}
]
}
""",
) andThen
successfulResponse(
"""
{
"count": 3,
"next": null,
"previous": "https://hub.docker.com/v2/repositories/airbyte/source-declarative-manifest/tags?page_size=1&page=2",
"results": [
{"name": "0.90.0"}
]
}
""",
)

val latestMajorVersions = declarativeManifestImageVersionsProvider.getLatestDeclarativeManifestImageVersions()
val expectedLatestVersions =
mapOf(
0 to "0.90.0",
1 to "1.0.1",
)
assertEquals(expectedLatestVersions, latestMajorVersions)

verify(exactly = 3) { okHttpClient.newCall(any()).execute() }
confirmVerified(okHttpClient)
}

@Test
fun `test no tags available`() {
every { okHttpClient.newCall(any()).execute() } returns
successfulResponse(
"""
{
"count": 0,
"next": null,
"previous": null,
"results": []
}
""",
)

val latestMajorVersions = declarativeManifestImageVersionsProvider.getLatestDeclarativeManifestImageVersions()
assertEquals(emptyMap<Int, String>(), latestMajorVersions)

verify(exactly = 1) { okHttpClient.newCall(any()).execute() }
confirmVerified(okHttpClient)
}

@Test
fun `test error handling`() {
every { okHttpClient.newCall(any()).execute() } returns
mockk {
every { isSuccessful } returns false
every { code } returns 500
every { message } returns "Internal Server Error"
justRun { close() }
}

val exception =
assertThrows(IOException::class.java) {
declarativeManifestImageVersionsProvider.getLatestDeclarativeManifestImageVersions()
}
assertEquals("Unexpected response from DockerHub API: 500 Internal Server Error", exception.message)

verify(exactly = 1) { okHttpClient.newCall(any()).execute() }
confirmVerified(okHttpClient)
}

fun successfulResponse(responseBody: String): okhttp3.Response {
return mockk {
every { isSuccessful } returns true
every { body } returns
mockk {
every { string() } returns responseBody.trimIndent()
}
justRun { close() }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cron.jobs;

import static io.airbyte.cron.MicronautCronRunner.SCHEDULED_TRACE_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.config.init.DeclarativeSourceUpdater;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClient;
import io.airbyte.metrics.lib.MetricTags;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import io.micronaut.scheduling.annotation.Scheduled;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;

/**
* Declarative Sources Updater.
*
* Calls the DeclarativeSourceUpdater to update the declarative sources at an interval (10m).
*/
@Singleton
@Slf4j
public class DeclarativeSourcesUpdater {

private final DeclarativeSourceUpdater declarativeSourceUpdater;
private final MetricClient metricClient;

public DeclarativeSourcesUpdater(@Named("remoteDeclarativeSourceUpdater") final DeclarativeSourceUpdater declarativeSourceUpdater,
final MetricClient metricClient) {
log.info("Creating declarative source updater");
this.declarativeSourceUpdater = declarativeSourceUpdater;
this.metricClient = metricClient;
}

@Trace(operationName = SCHEDULED_TRACE_OPERATION_NAME)
@Scheduled(fixedRate = "10m")
void updateDefinitions() {
log.info("Getting latest CDK versions and updating declarative sources...");
metricClient.count(OssMetricsRegistry.CRON_JOB_RUN_BY_CRON_TYPE, 1, new MetricAttribute(MetricTags.CRON_TYPE, "declarative_sources_updater"));
declarativeSourceUpdater.apply();
log.info("Done updating declarative sources.");
}

}

0 comments on commit a61b18c

Please sign in to comment.