Skip to content
This repository has been archived by the owner on Sep 27, 2021. It is now read-only.

Commit

Permalink
Merging from master fixes from library dependencies (#27)
Browse files Browse the repository at this point in the history
* Upgraded kamon, disabled by default, instrumentation via agent (#9)

* Added asynchronous digest computation (#8)

* Added digest computation

* Fixed docs

* Added Accepted status code when Digest is empty

* Incorporated feedback

* Used java Cock

* Reverted to compute digest on create (#10)

* Removed unnecessary Content-Disposition header (#11)

* Added handling client errors as unexpected server errors (#13)

* Add 'nexus-fixer' (#12)

* Add Rust project and target in build.sbt

* Add call to 'nexus-fixer' binary from the service

* Update nexus-fixer README (#14)

* Bumped nexus dependencies (#15)

* Add sbt-assembly to build a fat JAR (#16)

* Add links detection to 'nexus-fixer' (#17)

* Fix assembly deduplication errors caused by Kamon (#18)

* Bumped sbt-nexus, nexus-commons and nexus-sourcing deps. (#19)

* Bumped sbt-nexus, nexus-commons and nexus-sourcing deps.

* Fixed formatting due to bump to scalfmt 2.0.0

* Fixed create empty file (#20)

* Added mediaType information on creation/move response (#21)

* Added mediaType information on creation/move response

* Reverted app.conf changes and added dir to detectMediaType

* Change dir mask to 750 (#22)

* Added javac flags for java 8 (#23)

* Bumped nexus-commons dependency (#24)

* Bumped deps. and increased SSE parsing size (#25)

* Bumped nexus-commons dependencies (#26)

* Bumped iam version
  • Loading branch information
umbreak committed Aug 24, 2019
1 parent 9a108ec commit f4520c4
Show file tree
Hide file tree
Showing 48 changed files with 2,266 additions and 256 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Expand Up @@ -11,7 +11,6 @@ native
# Maven / SBT / Ant target #
target/
.bloop
project/.bloop

# IDE
.settings
Expand All @@ -21,6 +20,9 @@ project/.bloop
**/logs/
**/nbactions.xml

# Rust
**/*.rs.bk
Cargo.lock

# Packages #
############
Expand Down
22 changes: 21 additions & 1 deletion .scalafmt.conf
@@ -1,3 +1,23 @@
style = defaultWithAlign
maxColumn = 120
rewrite.rules = [SortImports]
version = 2.0.0
align.tokens = [
{ code = "=>", owner = "Case" }
{ code = "⇒", owner = "Case" }
{ code = "extends", owner = "Defn.(Class|Trait|Object)" }
{ code = "//", owner = ".*" }
{ code = "{", owner = "Template" }
{ code = "}", owner = "Template" }
{ code = ":=", owner = "Term.ApplyInfix" }
{ code = "++=", owner = "Term.ApplyInfix" }
{ code = "+=", owner = "Term.ApplyInfix" }
{ code = "%", owner = "Term.ApplyInfix" }
{ code = "%%", owner = "Term.ApplyInfix" }
{ code = "%%%", owner = "Term.ApplyInfix" }
{ code = "->", owner = "Term.ApplyInfix" }
{ code = "→", owner = "Term.ApplyInfix" }
{ code = "<-", owner = "Enumerator.Generator" }
{ code = "←", owner = "Enumerator.Generator" }
{ code = "=", owner = "(Enumerator.Val|Defn.(Va(l|r)|Def|Type))" }
]
rewrite.rules = [SortImports]
6 changes: 6 additions & 0 deletions Jenkinsfile
Expand Up @@ -7,6 +7,11 @@ pipeline {
options {
timeout(time: 30, unit: 'MINUTES')
}
environment {
NEXUS_PATH_PREFIX = """${sh(returnStdout: true, script: 'oc get configmap storage -o jsonpath="{.data.path_prefix}"')}"""
NEXUS_USER_ID = """${sh(returnStdout: true, script: 'oc get configmap storage -o jsonpath="{.data.user_id}"')}"""
NEXUS_GROUP_ID = """${sh(returnStdout: true, script: 'oc get configmap storage -o jsonpath="{.data.group_id}"')}"""
}
stages {
stage("Review") {
when {
Expand Down Expand Up @@ -39,6 +44,7 @@ pipeline {
}
steps {
checkout scm
sh 'echo $NEXUS_PATH_PREFIX $NEXUS_USER_ID $NEXUS_GROUP_ID'
sh 'sbt releaseEarly universal:packageZipTarball'
stash name: "service", includes: "target/universal/storage-*.tgz"
}
Expand Down
12 changes: 10 additions & 2 deletions README.md
Expand Up @@ -4,11 +4,19 @@

# Nexus Storage Service

A service that is intended to abstract I/O file system operations with an API to deal with uploads and downloads of files.

A service to abstract I/O operations on a remote file system, to support Nexus' file management API.

Please visit the [parent project](https://github.com/BlueBrain/nexus) for more information about Nexus.

## Build

In the project's top-level directory run:

```shell script
./sbt assembly
```

This outputs a self-contained JAR `nexus-storage.jar`.

## Getting involved
There are several channels provided to address different issues:
Expand Down
67 changes: 52 additions & 15 deletions build.sbt
Expand Up @@ -25,16 +25,16 @@ scalafmt: {
*/

// Dependency versions
val akkaVersion = "2.5.23"
val akkaVersion = "2.5.25"
val akkaHttpVersion = "10.1.9"
val apacheCompressVersion = "1.18"
val alpakkaVersion = "1.0.2"
val alpakkaVersion = "1.1.1"
val catsVersion = "1.6.1"
val catsEffectVersion = "1.3.1"
val catsEffectVersion = "1.4.0"
val circeVersion = "0.11.1"
val commonsVersion = "0.17.0"
val iamVersion = "1.1.1"
val mockitoVersion = "1.5.11"
val commonsVersion = "0.17.6"
val iamVersion = "1.1.2"
val mockitoVersion = "1.5.14"
val monixVersion = "3.0.0-RC3"
val pureconfigVersion = "0.11.1"
val scalaTestVersion = "3.0.8"
Expand All @@ -60,13 +60,14 @@ lazy val scalaTest = "org.scalatest" %% "scalatest"

lazy val storage = project
.in(file("."))
.settings(testSettings, buildInfoSettings)
.settings(assemblySettings, testSettings, buildInfoSettings)
.enablePlugins(BuildInfoPlugin, ServicePackagingPlugin)
.aggregate(client)
.settings(
name := "storage",
moduleName := "storage",
coverageFailOnMinimum := true,
name := "storage",
moduleName := "storage",
coverageFailOnMinimum := true,
javaSpecificationVersion := "1.8",
libraryDependencies ++= Seq(
apacheCompress,
akkaHttp,
Expand All @@ -86,8 +87,12 @@ lazy val storage = project
mockito % Test,
scalaTest % Test
),
cleanFiles ++= Seq(
baseDirectory.value / "permissions-fixer" / "target" / "**",
baseDirectory.value / "nexus-storage.jar"
),
mappings in Universal := {
val universalMappings = (mappings in Universal).value
val universalMappings = (mappings in Universal).value :+ cargo.value
universalMappings.foldLeft(Vector.empty[(File, String)]) {
case (acc, (file, filename)) if filename.contains("kanela-agent") =>
acc :+ (file, "lib/instrumentation-agent.jar")
Expand All @@ -99,11 +104,13 @@ lazy val storage = project

lazy val client = project
.in(file("client"))
.disablePlugins(AssemblyPlugin)
.settings(
testSettings,
name := "storage-client",
moduleName := "storage-client",
coverageFailOnMinimum := true,
name := "storage-client",
moduleName := "storage-client",
coverageFailOnMinimum := true,
javaSpecificationVersion := "1.8",
libraryDependencies ++= Seq(
akkaHttp,
akkaStream,
Expand All @@ -114,10 +121,24 @@ lazy val client = project
akkaHttpTestKit % Test,
commonsTest % Test,
mockito % Test,
scalaTest % Test,
scalaTest % Test
)
)

lazy val assemblySettings = Seq(
test in assembly := {},
assemblyOutputPath in assembly := baseDirectory.value / "nexus-storage.jar",
assemblyMergeStrategy in assembly := {
case PathList("org", "apache", "commons", "logging", xs @ _*) => MergeStrategy.last
case PathList("akka", "remote", "kamon", xs @ _*) => MergeStrategy.last
case PathList("kamon", "instrumentation", "akka", "remote", xs @ _*) => MergeStrategy.last
case "META-INF/versions/9/module-info.class" => MergeStrategy.discard
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
)

lazy val testSettings = Seq(
Test / testOptions += Tests.Argument(TestFrameworks.ScalaTest, "-o", "-u", "target/test-reports"),
Test / parallelExecution := false
Expand All @@ -128,6 +149,22 @@ lazy val buildInfoSettings = Seq(
buildInfoPackage := "ch.epfl.bluebrain.nexus.storage.config"
)

lazy val cargo = taskKey[(File, String)]("Run Cargo to build 'nexus-fixer'")

cargo := {
import scala.sys.process._

val log = streams.value.log
val cmd = Process(Seq("cargo", "build", "--release"), baseDirectory.value / "permissions-fixer")
if ((cmd !) == 0) {
log.success("Cargo build successful.")
(baseDirectory.value / "permissions-fixer" / "target" / "release" / "nexus-fixer") -> "bin/nexus-fixer"
} else {
log.error("Cargo build failed.")
throw new RuntimeException
}
}

inThisBuild(
List(
homepage := Some(url("https://github.com/BlueBrain/nexus-storage")),
Expand Down
Expand Up @@ -23,6 +23,7 @@ import ch.epfl.bluebrain.nexus.iam.client.types._
import ch.epfl.bluebrain.nexus.storage.client.StorageClient._
import ch.epfl.bluebrain.nexus.storage.client.StorageClientError._
import ch.epfl.bluebrain.nexus.storage.client.config.StorageClientConfig
import ch.epfl.bluebrain.nexus.storage.client.types.FileAttributes.Digest
import ch.epfl.bluebrain.nexus.storage.client.types.{FileAttributes, LinkFile, ServiceDescription}
import io.circe
import io.circe.parser.parse
Expand All @@ -31,17 +32,17 @@ import journal.Logger

import scala.concurrent.{ExecutionContext, Future}
import scala.reflect.ClassTag
import scala.util.control.NonFatal

class StorageClient[F[_]] private[client] (
config: StorageClientConfig,
attributes: HttpClient[F, FileAttributes],
digest: HttpClient[F, Digest],
source: HttpClient[F, AkkaSource],
serviceDesc: HttpClient[F, ServiceDescription],
emptyBody: HttpClient[F, NotUsed]
)(implicit F: Effect[F], ec: ExecutionContext) {

private val emptyChunk = "An HttpEntity.Chunk must have non-empty data"

def serviceDescription: F[ServiceDescription] =
serviceDesc(Get(config.iri.toAkkaUri))

Expand All @@ -66,15 +67,15 @@ class StorageClient[F[_]] private[client] (
* @return The file attributes wrapped on the effect type F[] containing the metadata plus the file bytes, digest and location
*/
def createFile(name: String, relativePath: Uri.Path, source: AkkaSource)(
implicit cred: Option[AuthToken]): F[FileAttributes] = {
implicit cred: Option[AuthToken]
): F[FileAttributes] = {
val endpoint = config.files(name) + slashIfNone(relativePath).toIriPath
val bodyPartEntity = HttpEntity.IndefiniteLength(ContentTypes.`application/octet-stream`, source)
val filename = extractName(relativePath).getOrElse("myfile")
val multipartForm = FormData(BodyPart("file", bodyPartEntity, Map("filename" -> filename))).toEntity()
attributes(Put(endpoint.toAkkaUri, multipartForm).withCredentials).recoverWith {
case ex: IllegalArgumentException if ex.getMessage != null && ex.getMessage.endsWith(emptyChunk) =>
createFile(name, relativePath, Source.empty)
case ex => F.raiseError(ex)
case EmptyChunk => createFile(name, relativePath, Source.empty)
case ex => F.raiseError(ex)
}
}

Expand All @@ -96,6 +97,17 @@ class StorageClient[F[_]] private[client] (
source(Get(endpoint.toAkkaUri).withCredentials)
}

/**
* Retrieves the file digest.
*
* @param name the storage bucket name
* @param relativePath the relative path to the file location
*/
def getDigest(name: String, relativePath: Uri.Path)(implicit cred: Option[AuthToken]): F[Digest] = {
val endpoint = config.digests(name) + slashIfNone(relativePath).toIriPath
digest(Get(endpoint.toAkkaUri).withCredentials)
}

/**
* Moves a path from the provided ''sourceRelativePath'' to ''destRelativePath'' inside the nexus folder.
*
Expand All @@ -105,7 +117,8 @@ class StorageClient[F[_]] private[client] (
* @return The file attributes wrapped on the effect type F[] containing the metadata plus the file bytes, digest and location
*/
def moveFile(name: String, sourceRelativePath: Uri.Path, destRelativePath: Uri.Path)(
implicit cred: Option[AuthToken]): F[FileAttributes] = {
implicit cred: Option[AuthToken]
): F[FileAttributes] = {
val endpoint = (config.files(name) + slashIfNone(destRelativePath).toIriPath).toAkkaUri
attributes(Put(endpoint, LinkFile(sourceRelativePath)).withCredentials)
}
Expand Down Expand Up @@ -149,7 +162,8 @@ object StorageClient {
cl: UntypedHttpClient[F],
um: FromEntityUnmarshaller[A]
): HttpClient[F, A] = new HttpClient[F, A] {
private val logger = Logger(s"IamHttpClient[${implicitly[ClassTag[A]]}]")
private val logger = Logger(s"IamHttpClient[${implicitly[ClassTag[A]]}]")
private val emptyChunk = "An HttpEntity.Chunk must have non-empty data"

private def typeAndReason(string: String): Either[circe.Error, (String, String)] =
parse(string).flatMap { json =>
Expand All @@ -158,8 +172,16 @@ object StorageClient {
}
}

private def handleError[B](req: HttpRequest): Throwable => F[B] = {
case NonFatal(ex: IllegalArgumentException) if ex.getMessage != null && ex.getMessage.endsWith(emptyChunk) =>
F.raiseError(EmptyChunk)
case NonFatal(th) =>
logger.error(s"Unexpected response for Storage call. Request: '${req.method} ${req.uri}'", th)
F.raiseError(UnknownError(StatusCodes.InternalServerError, th.getMessage))
}

override def apply(req: HttpRequest): F[A] =
cl.apply(req).flatMap { resp =>
cl(req).handleErrorWith(handleError(req)).flatMap { resp =>
resp.status match {
case StatusCodes.Unauthorized =>
cl.toString(resp.entity).flatMap { entityAsString =>
Expand All @@ -179,12 +201,12 @@ object StorageClient {
val value = L.liftIO(IO.fromFuture(IO(um(resp.entity))))
value.recoverWith {
case pf: ParsingFailure =>
logger.error(
s"Failed to parse a successful response of '${req.method.name()} ${req.getUri().toString}'.")
logger
.error(s"Failed to parse a successful response of '${req.method.name()} ${req.getUri().toString}'.")
F.raiseError[A](UnmarshallingError(pf.getMessage()))
case df: DecodingFailure =>
logger.error(
s"Failed to decode a successful response of '${req.method.name()} ${req.getUri().toString}'.")
logger
.error(s"Failed to decode a successful response of '${req.method.name()} ${req.getUri().toString}'.")
F.raiseError(UnmarshallingError(df.getMessage()))
}

Expand All @@ -197,7 +219,8 @@ object StorageClient {
case "PathAlreadyExists" => F.raiseError(InvalidPath(msg))
case _ =>
logger.error(
s"Received '${other.value}' when accessing '${req.method.name()} ${req.uri.toString()}', response entity as string: '$msg'")
s"Received '${other.value}' when accessing '${req.method.name()} ${req.uri.toString()}', response entity as string: '$msg'"
)
F.raiseError[A](UnknownError(other, msg))

}
Expand All @@ -222,11 +245,14 @@ object StorageClient {
implicit val mt: ActorMaterializer = ActorMaterializer()
implicit val ec: ExecutionContext = as.dispatcher
implicit val ucl: UntypedHttpClient[F] = HttpClient.untyped[F]
new StorageClient(config,
httpClient[F, FileAttributes],
httpClient[F, AkkaSource],
httpClient[F, ServiceDescription],
httpClient[F, NotUsed])
new StorageClient(
config,
httpClient[F, FileAttributes],
httpClient[F, Digest],
httpClient[F, AkkaSource],
httpClient[F, ServiceDescription],
httpClient[F, NotUsed]
)
}
}
// $COVERAGE-ON$
Expand Up @@ -15,11 +15,14 @@ object StorageClientError {

final case class UnmarshallingError[A: ClassTag](reason: String)
extends StorageClientError(
s"Unable to parse or decode the response from Storage to a '${implicitly[ClassTag[A]]}' due to '$reason'.")
s"Unable to parse or decode the response from Storage to a '${implicitly[ClassTag[A]]}' due to '$reason'."
)

final case class UnknownError(status: StatusCode, entityAsString: String)
extends StorageClientError("The request did not complete successfully.")

final case object EmptyChunk extends StorageClientError("Chunk with empty data")

final case class NotFound(reason: String) extends StorageClientError(reason)

final case class InvalidPath(reason: String) extends StorageClientError(reason)
Expand Down
Expand Up @@ -19,6 +19,13 @@ final case class StorageClientConfig(iri: AbsoluteIri, prefix: String) {
*/
def files(name: String): AbsoluteIri = buckets + name + "files"

/**
* The digests endpoint: /buckets/{name}/digests
*
* @param name the storage bucket name
*/
def digests(name: String): AbsoluteIri = buckets + name + "digests"

}

object StorageClientConfig {
Expand Down

0 comments on commit f4520c4

Please sign in to comment.