Skip to content

Latest commit

 

History

History
257 lines (187 loc) · 7.38 KB

pipes.md

File metadata and controls

257 lines (187 loc) · 7.38 KB

Elasticsearch Pipes

Pipes are the processing units of a pipeline for an Elasticsearch view.

See @ref:here to get more details on how pipes are applied and how the indexing process to Elasticsearch works.

Core pipes

These pipes are provided by default by Delta.

Filter deprecated

  • Allows excluding deprecated resources from being indexed
  • No config is needed
{
  "name" : "filterDeprecated"
}

Filter by type

  • Allow excluding resources which don't have one of the provided types
{
  "name" : "filterByType",
  "config" : {
    "types" : [
      "https://bluebrain.github.io/nexus/types/Type1",
      "https://bluebrain.github.io/nexus/types/Type2"
    ]
  }
}

Filter by schema

  • Allow excluding resources which haven't been validated by one of the provided schemas
{
  "name" : "filterBySchema",
  "config" : {
    "types" : [
      "https://bluebrain.github.io/nexus/schemas/Schema1",
      "https://bluebrain.github.io/nexus/schemas/Schema2"
    ]
  }
}

Discard metadata

  • Prevents all Nexus metadata from being indexed
  • No configuration is needed
{
  "name" : "discardMetadata"
}

Source as text

  • The original payload of the resource will be stored in the ElasticSearch document as a single escaped string value under the key _original_source.
  • No configuration is needed
{
  "name" : "sourceAsText"
}

Data construct query

  • The data graph of the resource will be transformed according to the provided SPARQL construct query
  • The resource metadata is not modified by this pipe
{
  "name" : "dataConstructQuery",
  "config": {
    "query": "{constructQuery}"
  }
}

Select predicates

  • Only the defined predicates in the data graph of the resource will be kept in the resource
  • The resource metadata is not modified by this type
{
  "name" : "selectPredicates",
  "config": {
    "predicates": [
      "rdfs:label",
      "schema:name"
    ]
  }
}

Default label predicates

  • Only default labels defined as skos:prefLabel, rdf:tpe, rdfs:label, schema:name will be kept in the data graph of the resource
  • No configuration is needed
{
  "name" : "defaultLabelPredicates"
}

Add custom pipes through plugins

@@@ note { .warning }

The pipe name must be a unique identifier in Delta.

Please also note that removing pipes or modifying configuration for a pipe will prevent existing views relying on them to index resources as the pipeline will be broken. They will have to be updated with a valid pipeline so that indexing can be restarted.

@@@

Besides these core pipes, it is possible to define custom pipes through plugins.

Please visit:

Please visit @ref:Plugins to learn about how to create/package/deploy a plugin.

Inside this plugin, you can then define additional pipes:

import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.ExpandedJsonLd
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.decoder.JsonLdDecoder
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.decoder.semiauto.deriveDefaultJsonLdDecoder
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Pipe
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes.MyPipes.MyOtherCustomPipe.MyConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{Elem, PipeDef, PipeRef}
import io.circe.syntax.EncoderOps
import io.circe.{Json, JsonObject}
import monix.bio.Task
import shapeless.Typeable

object MyPipes {

  // A first pipe which does not need any config
  // The function to implement needs to return a `Task[Elem[Out]]`
  final class MyCustomPipe extends Pipe {
    override type In = GraphResource
    override type Out = GraphResource

    override def ref: PipeRef = MyCustomPipe.ref

    override def inType: Typeable[GraphResource] = Typeable[GraphResource]

    override def outType: Typeable[GraphResource] = Typeable[GraphResource]

    override def apply(element: SuccessElem[GraphResource]): Task[Elem[GraphResource]] =
      element.evalMap(Task.delay(???))

  }

  object MyCustomPipe extends PipeDef {
    override type PipeType = MyCustomPipe
    override type Config = Unit

    override def configType: Typeable[Config] = Typeable[Unit]

    override def configDecoder: JsonLdDecoder[Config] = JsonLdDecoder[Unit]

    override def ref: PipeRef = PipeRef.unsafe("myCustomPipe")

    override def withConfig(config: Unit): MyCustomPipe = new MyCustomPipe

    /**
      * Returns the pipe ref and its empty config
      */
    def apply(): (PipeRef, ExpandedJsonLd) = ref -> ExpandedJsonLd.empty
  }

  // A second pipe relying on a config
  class MyOtherCustomPipe(config: MyConfig) extends Pipe {
    override type In = GraphResource
    override type Out = GraphResource

    override def ref: PipeRef = MyOtherCustomPipe.ref

    override def inType: Typeable[GraphResource] = Typeable[GraphResource]

    override def outType: Typeable[GraphResource] = Typeable[GraphResource]

    override def apply(element: SuccessElem[GraphResource]): Task[Elem[GraphResource]] =
      element.evalMap(Task.delay(???))

  }

  object MyOtherCustomPipe extends PipeDef {
    override type PipeType = MyOtherCustomPipe
    override type Config = MyConfig

    override def configType: Typeable[Config] = Typeable[MyConfig]

    override def configDecoder: JsonLdDecoder[Config] = JsonLdDecoder[Config]

    override def ref: PipeRef = PipeRef.unsafe("myOtherCustomType")

    override def withConfig(config: MyConfig): MyOtherCustomPipe = new MyOtherCustomPipe(config)

    final case class MyConfig(types: Set[Iri]) {
      def toJsonLd: ExpandedJsonLd = ExpandedJsonLd(
        Seq(
          ExpandedJsonLd.unsafe(
            nxv + ref.toString,
            JsonObject(
              (nxv + "types").toString -> Json.arr(types.toList.map(iri => Json.obj("@id" -> iri.asJson)): _*)
            )
          )
        )
      )
    }

    object MyConfig {
      implicit val myConfigJsonLdDecoder: JsonLdDecoder[MyConfig] = deriveDefaultJsonLdDecoder
    }

    def apply(types: Set[Iri]): (PipeRef, ExpandedJsonLd) = ref -> MyConfig(types).toJsonLd
  }
}

And then declare them in the distage module definition of the plugin to make them available:

import izumi.distage.model.definition.ModuleDef
object MyPluginModule extends ModuleDef {

  many[PipeDef].addSetValue(
   Set(pipe1, pipe2)
  )
  
}

The source code for the core pipes is available @link:here and the associated unit tests @link:here.