-
Notifications
You must be signed in to change notification settings - Fork 3.7k
/
S3DestinationAcceptanceTest.kt
188 lines (170 loc) · 7.24 KB
/
S3DestinationAcceptanceTest.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.cdk.integrations.destination.s3
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.DeleteObjectsRequest
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
import io.airbyte.cdk.integrations.destination.s3.util.S3NameTransformer
import io.airbyte.cdk.integrations.standardtest.destination.DestinationAcceptanceTest
import io.airbyte.cdk.integrations.standardtest.destination.comparator.AdvancedTestDataComparator
import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator
import io.airbyte.commons.io.IOs
import io.airbyte.commons.jackson.MoreMappers
import io.airbyte.commons.json.Jsons
import java.nio.file.Path
import java.util.*
import java.util.stream.Collectors
import org.apache.commons.lang3.RandomStringUtils
import org.joda.time.DateTime
import org.joda.time.DateTimeZone
import org.mockito.Mockito.mock
import org.slf4j.Logger
import org.slf4j.LoggerFactory
/**
* When adding a new S3 destination acceptance test, extend this class and do the following:
* * Implement [.getFormatConfig] that returns a [S3FormatConfig]
* * Implement [.retrieveRecords] that returns the Json records for the test
*
* Under the hood, a [S3DestinationConfig] is constructed as follows:
* * Retrieve the secrets from "secrets/config.json"
* * Get the S3 bucket path from the constructor
* * Get the format config from [.getFormatConfig]
*/
abstract class S3DestinationAcceptanceTest
protected constructor(protected val outputFormat: S3Format) : DestinationAcceptanceTest() {
protected val secretFilePath: String = "secrets/config.json"
protected var configJson: JsonNode? = null
protected var s3DestinationConfig: S3DestinationConfig = mock()
protected var s3Client: AmazonS3? = null
protected var s3nameTransformer: NamingConventionTransformer = mock()
protected var s3StorageOperations: S3StorageOperations? = null
protected val baseConfigJson: JsonNode
get() = Jsons.deserialize(IOs.readFile(Path.of(secretFilePath)))
override fun getDefaultSchema(config: JsonNode): String? {
if (config.has("s3_bucket_path")) {
return config["s3_bucket_path"].asText()
}
return null
}
override fun getConfig(): JsonNode = configJson!!
override fun getFailCheckConfig(): JsonNode {
val baseJson = baseConfigJson
val failCheckJson = Jsons.clone(baseJson)
// invalid credential
(failCheckJson as ObjectNode).put("access_key_id", "fake-key")
failCheckJson.put("secret_access_key", "fake-secret")
return failCheckJson
}
/** Helper method to retrieve all synced objects inside the configured bucket path. */
protected fun getAllSyncedObjects(
streamName: String,
namespace: String
): List<S3ObjectSummary> {
val namespaceStr = s3nameTransformer.getNamespace(namespace)
val streamNameStr = s3nameTransformer.getIdentifier(streamName)
val outputPrefix =
s3StorageOperations!!.getBucketObjectPath(
namespaceStr,
streamNameStr,
DateTime.now(DateTimeZone.UTC),
s3DestinationConfig.pathFormat!!
)
// the child folder contains a non-deterministic epoch timestamp, so use the parent folder
val parentFolder = outputPrefix.substring(0, outputPrefix.lastIndexOf("/") + 1)
val objectSummaries =
s3Client!!
.listObjects(s3DestinationConfig.bucketName, parentFolder)
.objectSummaries
.stream()
.filter { o: S3ObjectSummary -> o.key.contains("$streamNameStr/") }
.sorted(Comparator.comparingLong { o: S3ObjectSummary -> o.lastModified.time })
.collect(Collectors.toList())
LOGGER.info(
"All objects: {}",
objectSummaries
.stream()
.map { o: S3ObjectSummary -> String.format("%s/%s", o.bucketName, o.key) }
.collect(Collectors.toList())
)
return objectSummaries
}
protected abstract val formatConfig: JsonNode?
get
/**
* This method does the following:
* * Construct the S3 destination config.
* * Construct the S3 client.
*/
override fun setup(testEnv: TestDestinationEnv, TEST_SCHEMAS: HashSet<String>) {
val baseConfigJson = baseConfigJson
// Set a random s3 bucket path for each integration test
val configJson = Jsons.clone(baseConfigJson)
val testBucketPath =
String.format(
"%s_test_%s",
outputFormat.name.lowercase(),
RandomStringUtils.randomAlphanumeric(5)
)
(configJson as ObjectNode)
.put("s3_bucket_path", testBucketPath)
.set<JsonNode>("format", formatConfig)
this.configJson = configJson
this.s3DestinationConfig =
S3DestinationConfig.getS3DestinationConfig(configJson, storageProvider())
LOGGER.info(
"Test full path: {}/{}",
s3DestinationConfig.bucketName,
s3DestinationConfig.bucketPath
)
this.s3Client = s3DestinationConfig.getS3Client()
this.s3nameTransformer = S3NameTransformer()
this.s3StorageOperations =
S3StorageOperations(s3nameTransformer, s3Client!!, s3DestinationConfig)
}
/** Remove all the S3 output from the tests. */
override fun tearDown(testEnv: TestDestinationEnv) {
val keysToDelete: MutableList<DeleteObjectsRequest.KeyVersion> = LinkedList()
val objects =
s3Client!!
.listObjects(s3DestinationConfig.bucketName, s3DestinationConfig.bucketPath)
.objectSummaries
for (`object` in objects) {
keysToDelete.add(DeleteObjectsRequest.KeyVersion(`object`.key))
}
if (keysToDelete.size > 0) {
LOGGER.info(
"Tearing down test bucket path: {}/{}",
s3DestinationConfig.bucketName,
s3DestinationConfig.bucketPath
)
val result =
s3Client!!.deleteObjects(
DeleteObjectsRequest(s3DestinationConfig.bucketName).withKeys(keysToDelete)
)
LOGGER.info("Deleted {} file(s).", result.deletedObjects.size)
}
}
override fun getTestDataComparator(): TestDataComparator = AdvancedTestDataComparator()
override fun supportBasicDataTypeTest(): Boolean {
return true
}
override fun supportArrayDataTypeTest(): Boolean {
return true
}
override fun supportObjectDataTypeTest(): Boolean {
return true
}
fun storageProvider(): StorageProvider {
return StorageProvider.AWS_S3
}
companion object {
protected val LOGGER: Logger =
LoggerFactory.getLogger(S3DestinationAcceptanceTest::class.java)
@JvmStatic protected val MAPPER: ObjectMapper = MoreMappers.initMapper()
}
}