Skip to content

General Analytics Overview

Alex edited this page May 16, 2016 · 27 revisions

Overview

All buckets (*) can contain an analytic thread, which can do one of three things:

  • provide a more flexible and powerful enrichment capability for data received from the harvester
  • take selected data from existing buckets (or external data sources) and combine/transform/analyze them in various ways, persisting the output(s) in this bucket's storage
  • (a combination of the above two)

(*) Note that this isn't quite true, there are currently some technical limitations - buckets cannot currently have both enrichment and analytic threads, analytic jobs must have the same lock_to_nodes true/false attribute (see below), and if harvesters are present then analytic jobs must have the default (false) lock_to_nodes attribute

This is the top-level "analytic bucket" JSON:

{
   "analytic_thread": {
      "enabled": true|false,
      "trigger_config": { ... },
      "jobs": [
         { ... },
      ]
   },
   // other standard bucket fields
}

Each analytic thread consists of two things:

  • A set of "analytic jobs" that typically represent a single distributed job or process, with fully or partially persistent inputs and outputs (see below)
    • (note that in-memory processing pipelines will often be nested within jobs, eg batch enrichment (todo link) is one example; spark or storm topologies another example todo somewhere describe 3 levels of nesting)
  • A set of triggers that determines when an analytic thread activates, as described below

Analytic buckets (ie buckets containing an analytic thread) can be in one of three states:

  • Suspended: when the containing buckets is suspended, no jobs will be active (and running jobs will be stopped as soon as possible)
  • Published: Batch jobs are ready to run but have not yet been triggered so are inactive. Note that streaming jobs are always active once the bucket is published
  • Activated: The bucket triggers have fired, so one or more of the batch jobs might be active. The actual job-level active/inactive status is a function of each job's dependencies. Once no jobs are inactive, the bucket status moves back to "Published".

The remainder of this page describes batch jobs, streaming jobs, and triggers.

Simple Triggers

Triggers are important because they control when jobs run. They can be very simple or very complicated. This section focuses on simple triggers, the more complex cases are covered in a later section.

The simplest trigger schema is described below:

{
   "analytic_thread": {
      "trigger_config": {
         "enabled": true|false,
         "auto_calculate": true|false,
         "schedule": string
      },
      // (other analytic thread fields, "jobs" and "enabled"
   },
   // other standard bucket fields
}

The above trigger_config schema contains the following fields:

  • enabled: this object is ignored if set to false. If there is no/disabled trigger_config object then the bucket is activated once published, but will not then re-active unless the bucket is re-published.
  • auto_calculate: if this is set to true (the default) then the bucket will activate when there is data in all of the external inputs (see below for information on how inputs are specified). For more complex input criteria there is the trigger attribute, which is described below. If false then will always active every schedule period (see below).
    • (Note that the triggering for internal bucket inputs (eg search_index_service) is not currently wired up, create an issue if this is needed, it is only a few lines of code. Until then such triggers will never fire).
  • schedule: A human readable time or schedule (eg "10 min", every 10 minutes; "Wed 3pm", every Wednesday at 3pm; "2018-01-01 12:00", a specific one-off date) when the trigger will be checked ... or if there is no trigger specified (ie auto_calculate missing or false and no trigger attribute) when the bucket will be activated. Note that if schedule is not specified but the root level bucket poll_frequency is specified then the poll_frequency will be used instead.

Batch analytic jobs

Batch analytic jobs - overview

It is difficult to get a neat overview of analytic jobs because what they can do is so varied and flexible.

Examples of batch analytic jobs:

  • Running a single (distributed) Hadoop MR job
  • Running a (distrubted) Spark (eg Machine Learning) topology that will iteratively run complicated algorithms until some completion criteria is satisfied
  • Running a standalone process or script to do anything a user wants (based on their access privileges), eg could be running the Gephi command line application to layout a huge (100K nodes+) graph network.

The point is that the core system acts as an orchestration engine for the core system

The analytic job JSON looks like:

{
   "name": string,
   "enabled": true|false,
   "analytic_type": "batch",

   // Which technology to run
   "analytic_technology_name_or_id": string,
   "entry_point": string,

   // What functionality is provided within that technology 
   "module_name_or_id": string,
   "library_names_or_ids": [ string ],

   // Configuration that drives the analytic technology behavior for this bucket
   "config": { ... },

   // Inputs:
   "dependencies": [ string ],
   "global_input_config": { ... },
   "inputs": [ { ... } ],

   // Output:
   "output": { ... },

   // Distribution control
   "node_rules_list": [ string ],
   "multi_node_enabled": true|false,
   "lock_to_nodes": true|false
}

The above schema has the following fields:

  • name: A unique name (within the bucket) for the analytic job (alphanumeric/_/. only) used by the Java API, by dependencies, and for other buckets'/jobs' inputs (to retrieve intermediate input).
  • enabled (optional): If false then the job is ignored as if it wasn't present.
  • analytic_type: "batch" for batch jobs, "streaming" for streaming jobs.
  • The next 2 fields define what the "umbrella" technology is (eg Spark/Hadoop/user process etc)
    • analytic_technology_name_or_id: Is either the path of the shared library (TODO link), or can be "BatchEnrichmentService" (see below, under "using the enrichment subsystem")
    • entry_point (optional): normally can be left blank because it's part of the shared library specification, but if the analytic technology specified above has a number of different types of analytic technology, then the entry_point can point to the IAnalyticTechnologyModule class path (eg "com.ikanow.aleph2.analytics.SparkTechnologyService"), which is guaranteed to be on the system classpath.
  • The next 2 fields allow user code to be executed:
    • module_name_or_id (optional): The shared library containing the user code (if any). The JAR inside this shared library is guaranteed to be in the system classpath.
      • (note that the equivalent of the entry_point field needs to be plumbed into the config field)
    • library_names_or_ids (optional): More shared libraries needed by the job. The only difference compared to module_name_or_id is that the JVM API makes it easier to access the JSON within the shared library.
  • These are the most critical fields:
    • config (optional): An arbitrary JSON object that is passed into the technology and is used to control the behavior of the technology and any user components, is not interpreted by the core at all, ie its format is defined by the combination of analytic_technology_name_or_id and optionally module_name_or_id.
    • dependencies: A list of strings corresponding to names of other jobs in the same bucket - this job will only run once all of the dependent jobs are complete. If a job does not have any dependencies then it starts as soon as the bucket is activated.
    • inputs (optional): defines the inputs that the analytic technology will have access to. Its flexible format is described below.
    • global_input_config (optional): sets default parameters applied to all input objects unless the input has that field set. Format described below.
    • output: defines what happens to data emitted from the analytic job (by the Java API). Its format is described below.
  • Finally, there are some less commonly used fields that define how jobs are distributed. They are less common because typically the underlying technology is distributed (eg a YARN job like Hadoop).
    • multi_node_enabled: (like the bucket (harvest) parameter, currently not supported)
    • lock_to_nodes: (like the bucket (harvest) parameter) if true (defaults to false), then each bucket is associated permanently with the first node to run it. This is necessary if running a persistent external process. Note that all jobs in the same bucket must have the same lock_to_nodes value, and it must be false if the bucket also has a harvester.
    • node_list_rules: (like the bucket (harvest) parameter) This job will only be run on the nodes that match these rules (XXX LINK). Jobs with the same analytic technologies can have different node_list_rules, but those jobs will only run on the intersection.

Batch analytic jobs - inputs

Batch analytic inputs can cover the following:

  • Access the bucket's batch input in the storage service (ie files in the bucket's HDFS "import" directory)
  • Access one of the data services (search_index_service, storage_service, document_service; or non-default services, eg document_service.v1 is the V1 document bridge) for any bucket (for which the user has read permission)
  • Access an internal or external job's intermediate output (see under "output" section below)

The remainder of this section describes the different schema for the above cases.

Accessing the bucket's own batch input:

{
   "enabled": true|false,
   "name": string,
   "resource_name_or_id": "",
   "data_service": "batch" 
}

With:

  • enabled: if false then it is as if the input does not exist
  • name: a unique name (within the job) - used for easy access to the input in the Java API
  • resource_name_or_id: can either be "" or the bucket path

Accessing a bucket's data service:

{
   "enabled": true|false,
   "name": string,
   "resource_name_or_id": string,
   "data_service": string,
   "filter": { ... },
   "config": {
      "new_data_only": true|false,
      "high_granularity_filter": true|false,
      "time_min": string,
      "time_max": string,
      "test_record_limit_request": integer
   } 
}

Where:

  • enabled: if false then it is as if the input does not exist
  • name: a unique name (within the job) - used for easy access to the input in the Java API
  • resource_name_or_id now points to the bucket path of the desired bucket. Currently globbing is not supported, though it will be in the future.
    • For "data_service": "storage_service" (see below), then resource_name_or_id can have one of the following three suffixes:
      • ":raw": the raw data (eg binary/CSV etc), if avaiable, before it has been converted to JSON
      • ":json": the raw JSON, before any enrichment (if any) has occurred
      • ":processed": (the default), if available, the same JSON that is stored in the document db/search index, ie after passing through that bucket's enrichment pipeline
    • For "data_service": "document_service.V1DocumentService" (see below), then resource_name_or_id is in the format "/aleph2_external/<community-ids-separated-by-_>"
      • (the prefix `"/aleph2_external/" is reserved for non-default data services like the one above, it means the user's access permissions are not checked internally but are delegated to the data service)
  • data_service: the following are always supported: "search_index_service", "document_service", "storage_service" (others will come later), other non-default service are supported with the format "<data_service>.<service_name>", eg "document_service.V1DocumentService" is a bridge to access V1 documents in MongoDB.
    • "storage_service" - reads from the bucket's HDFS "archive" (if enabled)
      • (as noted above, you can access any of the raw/json/processed versions of the data that are available)
    • "search_index_service" - a Lucene searchable JSON version of the data objects, if enabled.
    • "document_service" - a key/value version of the data objects, if enabled.
  • filter is a JSON object whose format depends on the data service being used:
    • For "storage_service", it is not used.
    • For "search_index_service": (longer term there will be a generic JSON search format based on the ICrudService Java API, in the meantime...)
      • For elasticsearch implementations (the default): use { "technology_override: string } or { "technology_override: { ... } }, where:
        • if it is a string, then it is treated like an Elasticsearch URL param eg "q=text", "q=field:text" etc.
        • if it is a JSON object, then it is treated like an Elasticsearch query DSL.
    • For "document_service", it is not currently used.
    • For "document_service.V1DocumentService", the V1 schema described here is used, except:
      • only the following extensions (fields starting with $) are supported: "$fields", "$splits", "$docsPerSplit", "$srctags", "$tmin", "$tmax"
      • Instead of the fields starting with $ they start with :, eg :tmin
  • config has the following schema:
    • new_data_only: (not currently supported - on the near term roadmap) will only process data that it has not already processed, based on various timestamps declared in the data schema.
    • high_granularity_filter: if true then will attempt to enforce the filter (and potentially config) exactly, and will error on job creation if that isn't possible. If false then the follow-on analytics should not assume that the records match the filter/config (trade-offs may be made between performance and false positives). If not present then either may be applied, so follow-on analytics should assume it behaves like false.
    • time_min: A human readable string (eg "1 day") expressing the newest data that will be processed - note the granularity of this is (currently) the grouping_period in the storage schema (for storage_service), or in the temporal schema (for other data services)
    • time_max: A human readable string (eg "1 day") expressing the newest data that will be processed, same remarks apply as for time_min
    • test_record_limit_request: If running a test version of the bucket, then this limit is a requested limit on the amount of data to be returned. This request might only be partially satisfied (eg that many records per input split), but it can help reduce the amount of data processed during smaller tests.

Accessing semi-transient internal stages of internal and external jobs:

{
   "enabled": true|false,
   "name": string,
   "resource_name_or_id": string,
   "data_service": "batch",
   "config": {
      "new_data_only": true|false,
      "time_min": string,
      "time_max": string,
      "test_record_limit_request": integer
   } 
}

Where:

  • enabled: if false then it is as if the input does not exist
  • name: a unique name (within the job) - used for easy access to the input in the Java API
  • resource_name_or_id is one of the following:
    • "<job name>" - retrieves the data from the transient output of the specified job (see below, under output) of this bucket.
    • "<bucket path>:<job name>" - retrieves the data from the transient output of the specified job (see below, under output) of the specified bucket, if owner of this bucket has read permission.
  • config has the following schema:
    • new_data_only: (not currently supported - on the near term roadmap) will only process data that it has not already processed, based on various timestamps declared in the data schema.
    • time_min: A human readable string (eg "1 day") expressing the newest data that will be processed - note the granularity of this is (currently) the grouping_period in the storage schema (for storage_service), or in the temporal schema (for other data services)
    • time_max: A human readable string (eg "1 day") expressing the newest data that will be processed, same remarks apply as for time_min
    • test_record_limit_request: If running a test version of the bucket, then this limit is a requested limit on the amount of data to be returned. This request might only be partially satisfied (eg that many records per input split), but it can help reduce the amount of data processed during smaller tests.

Batch analytic jobs - output

The output format is relatively simple:

{
   "preserve_existing_data": true|false,
   "is_transient": true|false,
   "transient_type": string,
   "sub_bucket_path": string
}

Where

  • preserve_existing_data: If this is set to true, then new data will be appended to the bucket. If this is set to false, the data will be (atomically) overwritten each time the analytic job runs.
  • is_transient: If this is set to false, then the output of this job is written into the bucket's data services according to however the data schema are set (eg into Elasticsearch and HDFS in a typical implementation with search_index_service and storage_service enabled).
    • If instead set to true, then the data is instead written into a "temporary" HDFS store, which other jobs can then read (as described above under "Accessing semi-transient internal stages of internal and external jobs")
  • transient_type: Only used if is_transient is true. Currently only "batch" is supported (once implemented, batch jobs will be able to sending their output into real-time queues of streaming jobs)
  • sub_bucket_path: Not currently supported. Once implemented will allow jobs in one bucket to write directly into the output of that bucket's sub-bucket (when "is_transient": false).

Batch transient jobs go into the (HDFS) folder /app/aleph2/data/<bucket path>/managed_bucket/import/transient/<job name>/, where they can be accessed via the input configurations by other jobs, or scripts and other external engines. (To get to the local file system, prepend /opt/hadoop-fileshare (v2.5-) or /opt/hadoop-nfs (v2.6+)).

Note that in addition to writing data out via these output blocks, the enrichment module can call an "external emit" to any buckets that are in the root external_emit_paths list of paths/globs. Objects are written in batches to (HDFS) /app/aleph2/data/<bucket path>/managed_bucket/import/ready.

Batch analytic jobs - using the enrichment subsystem

TODO link to batch enrichment section

TODO format

TODO passthrough

Streaming analytic jobs

Streaming analytic jobs - overview

Streaming analytic jobs - inputs

Streaming analytic jobs - output

Streaming analytic jobs - using the enrichment subsystem

Complex analytic thread triggers

Clone this wiki locally