Skip to content

Commit

Permalink
storage auto adaptation(HDFS|LFS) (#264)
Browse files Browse the repository at this point in the history
* [feature] flink k8s native mode support

* [feature] flink k8s native mode support

* [issue#220] refactoring SubmitRequest, SubmitResponse to adapt k8s submit operations

* [issue#220] refactoring SubmitRequest, SubmitResponse to adapt k8s submit operations

* [issue#220] New dto object for flink stop action parameter transfer process

* [issue#220] refactor: move the parameters of the flink stop method to a dedicated dto object

* modify configuration constants of workspace(#251)

* typo(#251)

* add isAnyBank method(#251)

* add unified fs operator defined(#251)

* register FsOperator to SpringBoot Bean(#251)

* remove unnecessary import(#251)

* extend the signature of method upload, copy, copyDir(#251)

* Separate workspace storage type into configuration(#251)

* Separate workspace storage type into configuration(#251)

* add fileMd5 method(#251)

* replace the code reference of HdfsUtils to FsOperator(#251)

* change the bean injection behavior of FsOperator(#251)

* change the config key of streamx.workspace(#251)

* fix stack overflow bug

* LfsOperator.upload support dir source

* Update ConfigConst.scala

* Update HdfsOperator.scala

* Update LfsOperator.scala

* Update UnifiledFsOperator.scala

* Update Utils.scala

* Workspace storage compatibility for HDFS and LFS (#253)

* modify configuration constants of workspace(#251)

* typo(#251)

* add isAnyBank method(#251)

* add unified fs operator defined(#251)

* register FsOperator to SpringBoot Bean(#251)

* remove unnecessary import(#251)

* extend the signature of method upload, copy, copyDir(#251)

* Separate workspace storage type into configuration(#251)

* Separate workspace storage type into configuration(#251)

* add fileMd5 method(#251)

* replace the code reference of HdfsUtils to FsOperator(#251)

* change the bean injection behavior of FsOperator(#251)

* change the config key of streamx.workspace(#251)

* fix stack overflow bug

* LfsOperator.upload support dir source

* Update ConfigConst.scala

* Update HdfsOperator.scala

* Update LfsOperator.scala

* Update UnifiledFsOperator.scala

* Update Utils.scala

Co-authored-by: benjobs <benjobs@qq.com>

* Workspace storage compatibility for HDFS and LFS (#253) (#258)

Workspace storage compatibility for HDFS and LFS (#253)

* [feature] storage auto adaptation(HDFS|LFS) (#260)

* Storage auto adaptation #259

* compatible with flink k8s submit

* compatible with flink k8s submit

* add unit test

* rename

* add code build module

* rename

* add maven tool

* resolve conflicts

* 1. adapt storageType when appBackup delete
2. adapt storageType when savepoint delete
3. code optimization

Co-authored-by: yulinying <yulinying_1994@outlook.com>
Co-authored-by: Al-assad <yulin.ying@outlook.com>
  • Loading branch information
3 people committed Jul 27, 2021
1 parent 87022d2 commit 3ed1799
Show file tree
Hide file tree
Showing 53 changed files with 2,536 additions and 1,407 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<module>streamx-spark</module>
<module>streamx-plugin</module>
<module>streamx-console</module>
<module>streamx-codebuild</module>
</modules>

<url>https://github.com/streamxhub/streamx</url>
Expand Down
64 changes: 64 additions & 0 deletions streamx-codebuild/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.streamxhub.streamx</groupId>
<artifactId>streamx</artifactId>
<version>1.1.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>streamx-codebuild</artifactId>
<name>StreamX : Code Build</name>

<properties>
<eclipse.aether.version>1.1.0</eclipse.aether.version>
<maven.aether.version>3.3.9</maven.aether.version>
<maven.shade.version>3.0.0</maven.shade.version>
</properties>

<dependencies>
<dependency>
<groupId>com.streamxhub.streamx</groupId>
<artifactId>streamx-common</artifactId>
<version>${project.version}</version>
</dependency>

<!-- maven build tools -->
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven.shade.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.aether</groupId>
<artifactId>aether-impl</artifactId>
<version>${eclipse.aether.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.aether</groupId>
<artifactId>aether-connector-basic</artifactId>
<version>${eclipse.aether.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.aether</groupId>
<artifactId>aether-transport-file</artifactId>
<version>${eclipse.aether.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.aether</groupId>
<artifactId>aether-transport-http</artifactId>
<version>${eclipse.aether.version}</version>
</dependency>
<dependency>
<groupId>org.apache.maven</groupId>
<artifactId>maven-aether-provider</artifactId>
<version>${maven.aether.version}</version>
</dependency>
</dependencies>

</project>

Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2021 The StreamX Project
* <p>
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.streamxhub.streamx.codebuild

import java.util.regex.Pattern

case class MavenArtifact(groupId: String, artifactId: String, version: String) {
}

object MavenArtifact {
private val p = Pattern.compile("([^: ]+):([^: ]+):([^: ]+)")

/**
* build from coords
*/
def of(coords: String): MavenArtifact = {
val m = p.matcher(coords)
if (!m.matches) throw new IllegalArgumentException("Bad artifact coordinates " + coords + ", expected format is <groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>")
val groupId = m.group(1)
val artifactId = m.group(2)
val version = m.group(3)
new MavenArtifact(groupId, artifactId, version)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright (c) 2021 The StreamX Project
* <p>
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.streamxhub.streamx.codebuild

import com.google.common.collect.Lists
import com.streamxhub.streamx.common.conf.ConfigConst.{DEFAULT_MAVEN_REMOTE_URL, MAVEN_LOCAL_DIR}
import org.apache.maven.repository.internal.MavenRepositorySystemUtils
import org.eclipse.aether.connector.basic.BasicRepositoryConnectorFactory
import org.eclipse.aether.repository.{LocalRepository, RemoteRepository}
import org.eclipse.aether.spi.connector.RepositoryConnectorFactory
import org.eclipse.aether.spi.connector.transport.TransporterFactory
import org.eclipse.aether.transport.file.FileTransporterFactory
import org.eclipse.aether.transport.http.HttpTransporterFactory
import org.eclipse.aether.{RepositorySystem, RepositorySystemSession}

import java.util

/**
* author: Al-assad
*/
object MavenRetriever {

/**
* default maven remote center repository
*/
lazy val remoteCenterRepo: RemoteRepository =
new RemoteRepository.Builder("central", "default", DEFAULT_MAVEN_REMOTE_URL).build()

/**
* maven remote repository lists
*/
lazy val remoteRepos: util.ArrayList[RemoteRepository] = Lists.newArrayList(remoteCenterRepo)

/**
* default maven local repository
*/
lazy val localRepo = new LocalRepository(MAVEN_LOCAL_DIR)

private lazy val locator = MavenRepositorySystemUtils.newServiceLocator

/**
* create maven repository endpoint
*/
def newRepoSystem(): RepositorySystem = {
locator.addService(classOf[RepositoryConnectorFactory], classOf[BasicRepositoryConnectorFactory])
locator.addService(classOf[TransporterFactory], classOf[FileTransporterFactory])
locator.addService(classOf[TransporterFactory], classOf[HttpTransporterFactory])
locator.getService(classOf[RepositorySystem])
}

/**
* create maven repository session endpoint
*/
def newSession(system: RepositorySystem): RepositorySystemSession = {
val session = MavenRepositorySystemUtils.newSession
session.setLocalRepositoryManager(system.newLocalRepositoryManager(session, localRepo))
session
}

/**
* create composite maven endpoint
*/
def retrieve(): (RepositorySystem, RepositorySystemSession) = {
val repoSystem = newRepoSystem()
val session = newSession(repoSystem)
(repoSystem, session)
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright (c) 2021 The StreamX Project
* <p>
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.streamxhub.streamx.codebuild

import com.google.common.collect.Lists
import com.streamxhub.streamx.common.util.Logger
import org.apache.maven.plugins.shade.{DefaultShader, ShadeRequest}
import org.codehaus.plexus.logging.console.ConsoleLogger
import org.codehaus.plexus.logging.{Logger => PlexusLogger}
import org.eclipse.aether.artifact.DefaultArtifact
import org.eclipse.aether.resolution.{ArtifactDescriptorRequest, ArtifactRequest}

import java.io.File
import java.util
import javax.annotation.{Nonnull, Nullable}
import scala.collection.JavaConverters._


/**
* author: Al-assad
*/
object MavenTool extends Logger {

val plexusLog = new ConsoleLogger(PlexusLogger.LEVEL_INFO, "streamx-maven")

private val isJarFile = (file: File) => file.isFile && file.getName.endsWith(".jar")


/**
* Build fat-jar with custom jar libs
*
* @param jarLibs list of jar lib paths for building fat-jar
* @param outfatJarPath output paths of fat-jar, like "/streamx/workspace/233/my-fat.jar"
* @return File Object of output fat-jar
*/
@Nonnull
def buildFatJar(@Nonnull jarLibs: Array[String], @Nonnull outFatJarPath: String): File = {
// check userJarPath
val uberJar = new File(outFatJarPath)
if (uberJar.isDirectory) {
throw new Exception(s"[Streamx-Maven] outFatJarPath($outFatJarPath) should be a file.")
}
// resolve all jarLibs
val jarSet = new util.HashSet[File]
jarLibs.map(new File(_))
.filter(_.exists())
.distinct
.foreach(lib =>
if (isJarFile(lib)) {
jarSet.add(lib)
} else if (lib.isDirectory) {
lib.listFiles.filter(isJarFile).foreach(jarSet.add)
}
)
logInfo(s"start shaded fat-jar: ${jarLibs.mkString}")
// shade jars
val shadeRequest = {
val req = new ShadeRequest
req.setJars(jarSet)
req.setUberJar(uberJar)
req.setFilters(Lists.newArrayList())
req.setResourceTransformers(Lists.newArrayList())
req.setRelocators(Lists.newArrayList())
req
}
val shader = new DefaultShader()
shader.enableLogging(plexusLog)
shader.shade(shadeRequest)
logInfo(s"finish build fat-jar: ${uberJar.getAbsolutePath}")
uberJar
}

/**
* Build fat-jar with custom jar libs and maven artifacts
*
* @param jarLibs list of jar lib paths
* @param mavenArtifacts collection of maven artifacts
* @param outfatJarPath output paths of fat-jar
* @return File Object of output fat-jar
*/
@Nonnull
def buildFatJar(@Nullable jarLibs: Array[String], @Nullable mavenArtifacts: Array[MavenArtifact],
@Nonnull outFatJarPath: String): File = {
val libs = if (jarLibs == null) Array[String]() else jarLibs
val arts = if (mavenArtifacts == null) Array[MavenArtifact]() else mavenArtifacts
if (libs.isEmpty && arts.isEmpty) {
throw new Exception(s"[Streamx-Maven] empty artifacts.")
}
val artFilePaths = resolveArtifacts(arts).map(_.getAbsolutePath)
buildFatJar(libs ++ artFilePaths, outFatJarPath)
}


/**
* Resolve the collectoin of artifacts, Artifacts will be download to
* ConfigConst.MAVEN_LOCAL_DIR if necessary. notes: Only compile scope
* dependencies will be resolved.
*
* @param mavenArtifacts collection of maven artifacts
* @return jar File Object of resolved artifacts
*/
@Nonnull
def resolveArtifacts(mavenArtifacts: Array[MavenArtifact]): Array[File] = {
if (mavenArtifacts == null) {
return Array[File]()
}
val (repoSystem, session) = MavenRetriever.retrieve()
val artifacts = mavenArtifacts.map(e => new DefaultArtifact(e.groupId, e.artifactId, "jar", e.version)).toSet
logInfo(s"start resolving dependencies: ${artifacts.mkString}")

// read relevant artifact descriptor info
val resolvedArtifacts = artifacts
.map(art => new ArtifactDescriptorRequest(art, MavenRetriever.remoteRepos, null))
.map(artReq => repoSystem.readArtifactDescriptor(session, artReq))
.flatMap(artRes => artRes.getDependencies.asScala)
.filter(dependency => dependency.isOptional)
.filter(dependency => "compile".equals(dependency.getScope))
.map(dependency => dependency.getArtifact)
logInfo(s"resolved dependencies: ${resolvedArtifacts.mkString}")

// download artifacts
val artReqs = resolvedArtifacts.map(art => new ArtifactRequest(art, MavenRetriever.remoteRepos, null)).asJava
repoSystem
.resolveArtifacts(session, artReqs)
.asScala
.map(artRes => artRes.getArtifact.getFile)
.toArray
}


}
Empty file.
Empty file.

0 comments on commit 3ed1799

Please sign in to comment.