Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: 修复StorageManager创建Node超时后误删存储 #2133 #2141

Merged
merged 20 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ enum class ArtifactMessageCode(private val key: String) : MessageCode {
ARTIFACT_SIZE_TOO_LARGE("artifact.size.too-large"),
ARTIFACT_TYPE_UNSUPPORTED("artifact.type.unsupported"),
ARTIFACT_FORBIDDEN("artifact.forbidden"),
NODE_CREATE_TIMEOUT("artifact.node.create.timeout"),
SIZE_CHECK_FAILED("artifact.size.check-failed"),
cnlkl marked this conversation as resolved.
Show resolved Hide resolved
NODE_LINK_FOLDER_UNSUPPORTED("artifact.node.link-folder-unsupported"),
ARTIFACT_PRELOAD_STRATEGY_NOT_FOUND("artifact.preload-strategy.not-found"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ artifact.node.not-found=Node [{0}] not found
artifact.node.path.invalid=Invalid node path [{0}]
artifact.node.existed=Node [{0}] existed
artifact.node.conflict=Node [{0}] conflict
artifact.node.create.timeout=Node [{0}] create timeout
artifact.node.list.too-large=Node list count too large
artifact.node.link-folder-unsupported=Link folder[{0}] was unsupported
artifact.stage.upgrade.error=Upgrade artifact stage error: [{0}]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ artifact.node.not-found=节点[{0}]不存在
artifact.node.path.invalid=节点路径[{0}]无效
artifact.node.existed=节点[{0}]已存在
artifact.node.conflict=已存在同名文件,且不允许覆盖
artifact.node.create.timeout=节点[{0}]创建超时
artifact.node.list.too-large=节点列表数量过大
artifact.node.link-folder-unsupported=无法链接到目录[{0}]
artifact.stage.upgrade.error=制品晋级失败: {0}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ artifact.node.not-found=節點[{0}]不存在
artifact.node.path.invalid=節點路徑[{0}]無效
artifact.node.existed=節點[{0}]已存在
artifact.node.conflict=已存在同名文件,且不允許覆蓋
artifact.node.create.timeout=節點[{0}]創建超時
artifact.node.list.too-large=節點列表數量過大
artifact.node.link-folder-unsupported=無法連結到目錄[{0}]
artifact.stage.upgrade.error=制品晉級失敗: {0}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ import com.tencent.bkrepo.common.storage.core.StorageService
import com.tencent.bkrepo.common.storage.credentials.StorageCredentials
import com.tencent.bkrepo.common.storage.innercos.http.HttpMethod
import com.tencent.bkrepo.repository.api.NodeClient
import com.tencent.bkrepo.repository.api.StoreRecordClient
import com.tencent.bkrepo.repository.pojo.node.NodeDetail
import com.tencent.bkrepo.repository.pojo.node.NodeInfo
import com.tencent.bkrepo.repository.pojo.node.service.NodeCreateRequest
import com.tencent.devops.plugin.api.PluginManager
import com.tencent.devops.plugin.api.applyExtension
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicBoolean

/**
* 存储管理类
Expand All @@ -66,6 +66,7 @@ import java.util.concurrent.atomic.AtomicBoolean
class StorageManager(
private val storageService: StorageService,
private val nodeClient: NodeClient,
private val storeRecordClient: StoreRecordClient,
private val nodeResourceFactoryImpl: NodeResourceFactoryImpl,
private val pluginManager: PluginManager,
) {
Expand All @@ -79,23 +80,12 @@ class StorageManager(
artifactFile: ArtifactFile,
storageCredentials: StorageCredentials?,
): NodeDetail {
val cancel = AtomicBoolean(false)
val affectedCount = storageService.store(request.sha256!!, artifactFile, storageCredentials, cancel)
try {
return nodeClient.createNode(request).data!!
} catch (exception: Exception) {
// 当文件有创建,则删除文件
if (affectedCount == 1) {
try {
cancel.set(true)
storageService.delete(request.sha256!!, storageCredentials)
} catch (exception: Exception) {
logger.error("Failed to delete new created file[${request.sha256}]", exception)
}
}
// 异常往上抛
throw exception
}
// 存储失败后会根据存储记录通过定时任务StorageRollbackJob进行回滚清理冗余存储
val storingRecord = storeRecordClient.recordStoring(request.sha256!!, storageCredentials?.key).data!!
storageService.store(request.sha256!!, artifactFile, storageCredentials)
val node = nodeClient.createNode(request).data!!
storeRecordClient.storeFinished(storingRecord.id)
cnlkl marked this conversation as resolved.
Show resolved Hide resolved
cnlkl marked this conversation as resolved.
Show resolved Hide resolved
return node
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,5 @@ dependencies {
testImplementation("it.ozimov:embedded-redis:${Versions.EmbeddedRedis}") {
exclude("org.slf4j", "slf4j-simple")
}
testImplementation("org.mockito.kotlin:mockito-kotlin")
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import com.tencent.bkrepo.common.storage.filesystem.check.SynchronizeResult
import com.tencent.bkrepo.common.storage.message.StorageErrorException
import com.tencent.bkrepo.common.storage.message.StorageMessageCode
import com.tencent.bkrepo.common.storage.monitor.Throughput
import java.util.concurrent.atomic.AtomicBoolean
import org.slf4j.LoggerFactory
import kotlin.system.measureNanoTime

Expand All @@ -53,7 +52,6 @@ abstract class AbstractStorageService : CompressSupport() {
digest: String,
artifactFile: ArtifactFile,
storageCredentials: StorageCredentials?,
cancel: AtomicBoolean?,
storageClass: String?,
): Int {
val path = fileLocator.locate(digest)
Expand All @@ -65,7 +63,7 @@ abstract class AbstractStorageService : CompressSupport() {
} else {
val size = artifactFile.getSize()
val nanoTime = measureNanoTime {
doStore(path, digest, artifactFile, credentials, cancel, storageClass)
doStore(path, digest, artifactFile, credentials, storageClass)
}
val throughput = Throughput(size, nanoTime)
logger.info("Success to store artifact file [$digest], $throughput.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.ApplicationEventPublisher
import java.nio.file.Path
import java.nio.file.Paths
import java.util.concurrent.atomic.AtomicBoolean

/**
* 抽象存储服务辅助类
Expand Down Expand Up @@ -105,7 +104,6 @@ abstract class AbstractStorageSupport : StorageService {
filename: String,
artifactFile: ArtifactFile,
credentials: StorageCredentials,
cancel: AtomicBoolean? = null,
storageClass: String? = null,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import com.tencent.bkrepo.common.storage.core.operation.OverlayOperation
import com.tencent.bkrepo.common.storage.credentials.StorageCredentials
import com.tencent.bkrepo.common.storage.filesystem.check.SynchronizeResult
import java.nio.file.Path
import java.util.concurrent.atomic.AtomicBoolean

/**
* 存储服务接口
Expand All @@ -61,7 +60,6 @@ interface StorageService :
digest: String,
artifactFile: ArtifactFile,
storageCredentials: StorageCredentials?,
cancel: AtomicBoolean? = null,
storageClass: String? = null,
): Int

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import java.io.File
import java.io.FileNotFoundException
import java.nio.file.Path
import java.nio.file.Paths
import java.util.concurrent.atomic.AtomicBoolean

/**
* 支持缓存的存储服务
Expand All @@ -69,7 +68,6 @@ class CacheStorageService(
filename: String,
artifactFile: ArtifactFile,
credentials: StorageCredentials,
cancel: AtomicBoolean?,
storageClass: String?,
) {
when {
Expand All @@ -90,13 +88,12 @@ class CacheStorageService(
else -> {
val cacheFile = getCacheClient(credentials).move(path, filename, artifactFile.flushToFile())
cacheFileEventPublisher.publishCacheFileLoadedEvent(credentials, cacheFile)
async2Store(cancel, filename, credentials, path, cacheFile, storageClass)
async2Store(filename, credentials, path, cacheFile, storageClass)
}
}
}

private fun async2Store(
cancel: AtomicBoolean?,
filename: String,
credentials: StorageCredentials,
path: String,
Expand All @@ -105,16 +102,8 @@ class CacheStorageService(
) {
threadPoolTaskExecutor.execute {
try {
if (cancel?.get() == true) {
logger.info("Cancel store fle [$filename] on [${credentials.key}]")
return@execute
}
fileStorage.store(path, filename, cacheFile, credentials, storageClass)
} catch (ignored: Exception) {
if (cancel?.get() == true) {
logger.info("Cancel store fle [$filename] on [${credentials.key}]")
return@execute
}
// 此处为异步上传,失败后异常不会被外层捕获,所以单独捕获打印error日志
logger.error("Failed to async store file [$filename] on [${credentials.key}]", ignored)
// 失败时把文件放入暂存区,后台任务会进行补偿。
Expand Down Expand Up @@ -158,6 +147,7 @@ class CacheStorageService(
val size = File(cacheFilePath).length()
fileStorage.delete(path, filename, credentials)
getCacheClient(credentials).delete(path, filename)
getStagingClient(credentials).delete(path, filename)
cnlkl marked this conversation as resolved.
Show resolved Hide resolved
cacheFileEventPublisher.publishCacheFileDeletedEvent(path, filename, size, credentials)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import com.tencent.bkrepo.common.artifact.stream.Range
import com.tencent.bkrepo.common.artifact.stream.artifactStream
import com.tencent.bkrepo.common.storage.core.AbstractStorageService
import com.tencent.bkrepo.common.storage.credentials.StorageCredentials
import java.util.concurrent.atomic.AtomicBoolean

/**
* 存储服务简单实现
Expand All @@ -49,7 +48,6 @@ class SimpleStorageService : AbstractStorageService() {
filename: String,
artifactFile: ArtifactFile,
credentials: StorageCredentials,
cancel: AtomicBoolean?,
storageClass: String?,
) {
when {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,31 +37,38 @@ import com.tencent.bkrepo.common.artifact.api.FileSystemArtifactFile
import com.tencent.bkrepo.common.artifact.hash.sha256
import com.tencent.bkrepo.common.artifact.stream.Range
import com.tencent.bkrepo.common.storage.StorageAutoConfiguration
import com.tencent.bkrepo.common.storage.core.AbstractStorageSupport
import com.tencent.bkrepo.common.storage.core.FileStorage
import com.tencent.bkrepo.common.storage.core.StorageProperties
import com.tencent.bkrepo.common.storage.core.StorageService
import com.tencent.bkrepo.common.storage.core.locator.FileLocator
import com.tencent.bkrepo.common.storage.credentials.StorageCredentials
import com.tencent.bkrepo.common.storage.filesystem.FileSystemClient
import com.tencent.bkrepo.common.storage.filesystem.FileSystemStorage
import org.apache.commons.io.FileUtils
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.ArgumentMatchers.anyString
import org.mockito.kotlin.any
import org.mockito.kotlin.anyOrNull
import org.mockito.kotlin.mock
import org.mockito.kotlin.whenever
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.ImportAutoConfiguration
import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration
import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit.jupiter.SpringExtension
import org.springframework.util.ReflectionUtils
import org.springframework.util.StreamUtils
import java.io.File
import java.nio.charset.Charset
import java.nio.file.Paths
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.concurrent.thread
import kotlin.random.Random
import org.springframework.util.StreamUtils

@ExtendWith(SpringExtension::class)
@ImportAutoConfiguration(StorageAutoConfiguration::class, TaskExecutionAutoConfiguration::class)
Expand Down Expand Up @@ -226,16 +233,6 @@ internal class CacheStorageServiceTest {
Assertions.assertFalse(cacheClient.exist(path, sha256))
}

@Test
fun cancelStoreTest() {
val size = 1024L
val artifactFile = createTempArtifactFile(size)
val sha256 = artifactFile.getFileSha256()
val cancel = AtomicBoolean(false)
cancel.set(true)
assertDoesNotThrow { storageService.store(sha256, artifactFile, null, cancel) }
}

@Test
fun deltaStoreTest() {
val data1 = Random.nextBytes(Random.nextInt(1024, 1 shl 20))
Expand Down Expand Up @@ -383,6 +380,46 @@ internal class CacheStorageServiceTest {
Thread.sleep(2000)
}

@Test
fun `should delete relative file`() {
// mock
val mockFileStorage = mock<FileStorage>()
whenever(
mockFileStorage.store(anyString(), anyString(), any<File>(), any<StorageCredentials>(), anyOrNull())
).thenThrow(RuntimeException())
val field = ReflectionUtils.findField(AbstractStorageSupport::class.java, "fileStorage")!!
field.isAccessible = true
field.set(storageService, mockFileStorage)
val stagingClient = FileSystemClient(Paths.get(storageProperties.filesystem.cache.path, "staging"))

// prepare
val artifactFile = createTempArtifactFile(10240L)
val sha256 = artifactFile.getFileSha256()
val path = fileLocator.locate(sha256)

// store file
storageService.store(sha256, artifactFile, null)

// wait to async store
Thread.sleep(500)

// check cache
Assertions.assertTrue(cacheClient.exist(path, sha256))
// check staging
Assertions.assertTrue(stagingClient.exist(path, sha256))
// check persist
Assertions.assertFalse(storageService.exist(sha256, null))

// cache and staging file not exists after delete
storageService.delete(sha256, null)
Assertions.assertFalse(cacheClient.exist(path, sha256))
Assertions.assertFalse(stagingClient.exist(path, sha256))
Assertions.assertFalse(storageService.exist(sha256, null))

// reset mock
field.set(storageService, fileStorage)
}

private fun createTempArtifactFile(size: Long): ArtifactFile {
val tempFile = createTempFile()
val content = StringPool.randomString(size.toInt())
Expand Down
Loading
Loading