Spark connector for Ryft ONE
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
examples
project
spark-ryft-connector-java
spark-ryft-connector
.gitignore
.travis.yml
CONTRIBUTING.md
LICENSE
README.md
build.properties
build.sbt
publish.sbt
pubring.gpg.enc
scalastyle-config.xml
secring.gpg.enc

README.md

Build Status Maven Central Maven Central

Spark Ryft Connector

This library lets you expose Ryft as Spark RDDs or Data Frames by consuming Ryft REST api.

Installation

sbt clean compile

Build executable jar

sbt clean assembly

You can find jar at: ../spark-ryft-connector/target/scala-2.10/

Ryft Query mechanism

There are two main types of RyftQuery:

  1. SimpleQuery
  2. RecordQuery

SimpleQuery represents RAW_TEXT search for one or more search queries. This type of query only uses the CONTAINS relational operator. For two or more search queries, use the OR logical operator. For example, to conduct a free text search for 3 words ‘query0’, ‘query1’, ‘query2’ use this statement:

SimpleQuery(List(“query0”,”query1”,”query2”))

It's transformed into this command:

((RAW_TEXT CONTAINS “query0”)OR(RAW_TEXT CONTAINS ”query1”)OR(RAW_TEXT CONTAINS “query2"))

RecordQuery provides more complexity. It allows you to use Ryft RECORD and RECORD.field queries to search on an entire record set or specific fields within record sets. You can use method chaining mechanism to create nested queries.

Here are two examples:

  • search for all records where field desc contains VEHICLE
RecordQuery(recordField("desc") contains "VEHICLE") -> (RECORD.desc CONTAINS “VEHICLE”) 
  • search for all records which contains ‘VEHICLE’ in any field
RecordQuery(record contains "VEHICLE") -> (RECORD CONTAINS “VEHICLE”) 

RecordQuery allows you to combine two or more queries by chaining the commands. If the original query is this:

RecordQuery(recordField("desc") contains "VEHICLE")
      .or(recordField("desc") contains "BIKE")  

Then RecordQuery will convert it to produce this query:

((RECORD.desc CONTAINS “VEHICLE”)OR(RECORD.desc CONTAINS “BIKE”))

You can also create complex nested queries, like this:

RecordQuery(
      RecordQuery(recordField("desc") contains "VEHICLE")
      .or(recordField("desc") contains "BIKE")
      .or(recordField("desc") contains "MOTO"))

Producing the following query:

(((RECORD.desc CONTAINS “VEHICLE”)OR(RECORD.desc CONTAINS “BIKE”)OR(RECORD.desc CONTAINS “MOTO”)))

Note that you should include import com.ryft.spark.connector._ to allow using of provided API.

Date expression

Record queries provide a mechanism to search on "dates." Different date ranges can be searched for by modifying the expression provided. There are two supported general expression types: DATE(DateFormat operator ValueB) DATE(ValueA operator DateFormat operator ValueB) Where DateFormat is a file-specific date (e.g. 'MM/DD/YYYY' for '02/04/2015') and operator is a math comparison operator (e.g. <, >=, === (equality) or =/= (inequality)). You should use simple DSL to specify date field:

RecordQuery(recordField("Date") contains DateValue(Format("MM/DD/YYYY") === Date("04/05/2015")))
RecordQuery(recordField("Date") contains DateValue(Date("01/05/2015") < Format("MM/DD/YYYY") < Date("04/05/2015")))

It produces the following queries:

(RECORD.Date CONTAINS DATE(MM/DD/YYYY = 04/05/2015))
(RECORD.Date CONTAINS DATE(01/05/2015 < MM/DD/YYYY < 04/05/2015))

Time expression

Different time ranges can be searched for by modifying the expression provided. There are two general supported expression types: TIME(TimeFormat operator ValueB) TIME(ValueA operator TimeFormat operator ValueB) Where TimeFormat is a file-specific time (e.g. 'HH:MM:SS' for '09:15:00') and operator is a math comparison operator (e.g. <, >=, === (equality) or =/= (inequality). Here is an example:

RecordQuery(recordField("Date") contains TimeValue(Format("HH:MM:SS") =/= Time("12:00:00")))
RecordQuery(recordField("Date") contains TimeValue(Time("12:00:00") < Format("HH:MM:SS") <= Time("12:30:00")))

Producing following queries:

(RECORD.Date CONTAINS TIME(HH:MM:SS != 12:00:00))
(RECORD.Date CONTAINS TIME(12:00:00 < HH:MM:SS <= 12:30:00))

Numeric expression

Different numeric ranges can be searched for by modifying the expression provided. There are two general supported expression types: NUMBER(NUM operator1 ValueA, "subitizer", "decimal") NUMBER(ValueA operator1 NUM operator2 ValueB, "subitizer", "decimal")

The "subitizer" is defined as the separating character to use. For example, for standard US numbers, a comma would be specified. The "decimal" is defined as the decimal specifier to use. For example, for standard US numbers, a period would be specified.

NOTE: The character specified for subitizer and decimal must be different.

Here is the code example:

SimpleQuery(NumericValue(NUM <= Number(50), ",", "."))
SimpleQuery(NumericValue(Number(35) < NUM <= Number(50), ",", "."))

It produces the following queries:

(RAW_TEXT CONTAINS NUMBER(NUM <= "50", ",", "."))
(RAW_TEXT CONTAINS NUMBER("35" < NUM <= "50", ",", "."))

Currency expression

Currency searches are a type of number searches and extend the general relational expression defined previously. Different currency ranges can be searched for by modifying the expression provided. There are two general supported expression types:

CURRENCY(CUR operator1 ValueA, "currency","subitizer","decimal")
CURRENCY(ValueA operator1 CUR operator2 ValueB, "currency","subitizer", "decimal")

"Subitizer" is an optional parameter for currency expression. In the US, a comma is used as the subitizer, and a period is used for the decimal, like this: $20,450.36. In Europe and elsewhere, it is the opposite (period is subitizer and comma is decimal), like this: €20.460,36.

Here is the code example:

SimpleQuery(CurrencyValue(CUR >= Currency("$500"), "$", "."))
SimpleQuery(CurrencyValue(Currency("$300") < CUR <= Currency("$500"), "$", ",", "."))

It produces the following queries:

(RAW_TEXT CONTAINS CURRENCY(CUR >= "$500", "$", "", "."))
(RAW_TEXT CONTAINS CURRENCY("$300" < CUR <= "$500", "$", ",", "."))

Regex expression

Different regex can be searched for by modifying the expression provided. REGEX(expression, "PCRE_OPTION_DEFAULT") PCRE_OPTION_DEFAULT is mandatory parameter.

SimpleQuery(RegexValue("$[3-5]00", "PCRE_OPTION_DEFAULT"))

producing following query:

(RAW_TEXT CONTAINS REGEX("$[3-5]00", PCRE_OPTION_DEFAULT))

Hamming & Edit search expressions

There are two ways the Fuzzy Hamming and Fuzzy Edit Distance searches are expressed, respectively: FHS(Value,CS,DIST,WIDTH) FEDS(Value,CS,DIST,WIDTH) These are the parameters:

  • Value: search expression
  • CS: denotes the case sensitive selection, true|false (optional false by default)
  • DIST: search distance for the primitive
  • WIDTH: surrounding width Here is the code example:
SimpleQuery(HammingValue("Jones", 1, 10))
SimpleQuery(EditValue("Jones", 2, 10, true))

It produces the following query:

(RAW_TEXT CONTAINS FHS("Jones", CS=false, DIST=1, WIDTH=10))
(RAW_TEXT CONTAINS FEDS("Jones", CS=true, DIST=2, WIDTH=10))

IPv4 expression

The IPv4 Search operation can be used to search for exact IPv4 addresses or IPv4 addresses in a particular range in both structured and unstructured text using the standard “a.b.c.d” format for IPv4 addresses.

SimpleQuery(IPv4Value(IP > IPv4("10.11.12.13")))
SimpleQuery(IPv4Value(IPv4("10.10.0.0") <= IP <= IPv4("10.10.255.255")))

producing following query:

(RAW_TEXT CONTAINS IPV4(IP > "10.11.12.13"))
(RAW_TEXT CONTAINS IPV4("10.10.0.0" <= IP <= "10.10.255.255"))

Expansion Of Search Types

Query builder DSL allows users to mix different types of query into one. It would be decomposed and processed on the Ryft REST side, like this:

RecordQuery(recordField("Description") contains HammingValue("vehycle", 1, 0))
    .and(recordField("ID") contains EditValue("10029", 3, 0))
    .and(recordField("Date") contains TimeValue(Time("11:50:00") < Format("HH:MM:SS") <= Time("11:55:00")))

It produces the following query:

((RECORD.Date CONTAINS TIME(11:50:00 < HH:MM:SS <= 11:55:00))
    AND(RECORD.ID CONTAINS FEDS("10029", CS=false, DIST=3, WIDTH=0))
    AND(RECORD.Description CONTAINS FHS("vehycle", CS=false, DIST=1, WIDTH=0)))

Query options

To provide specific settings for Ryft query, use RyftQueryOptions

  • files: Files to search.
  • surrounding: Width when generating results. For example, a value of 2 means that 2 characters before and after a search match will be included with data result. Also 'line' value means that will be included the whole line.
  • fuzziness: Specify the fuzzy search distance [0..255].
  • mode: Specify the fuzzy search mode. User fhs for Hamming search and feds for Edit distance search.
  • fields: Specify list of fields that should be returned (Words For structured data only).
  • ryftNodes: Specify number of ryft nodes to use for query.
  • format: Specify the format of file (xml or json).
  • caseSensitive: Case sensitive flag.

DataFrame support

Current connector supports registering as data frames in Spark SQL context so that they can be queried using SparkSQL. You need to do the following:

  • Describe a data schema.
  • Register the list of files to be searched by the query and the format of the file structure (xml or json). You can use Date\Time Ryft expressions; they work out of the box. You only need to specify Date/Time type in the schema and provide date_format option while registering data frame. Also you can use following optional parameters:
  • partitioner - Class name that implements partitioning logic for data partitioning.
  • date_format - Date\Time format.
  • subitizer - Separating character used for numeric values.
  • decimal - Decimal specifier used for numeric values.

Optional parameters should be passed wrapped into Scala Map.

Here's an example in scala:

  val schema = StructType(Seq(
    StructField("Arrest", BooleanType), StructField("CaseNumber", StringType),
    StructField("Date", DateType), StructField("Description", StringType), 
    ....
  ))
  
  sqlContext.read.ryft(schema, xml, "*.pcrime", "temp_table", Map("date_format" -> "MM/dd/yyyy hh:mm:ss aa"))

  val df = sqlContext.sql(
    """select Date, Description, Arrest from temp_table
       where Description LIKE '%VEHICLE%'
          AND  Date > to_date('2015-03-15')
       ORDER BY Date
    """)

Same code in python:

schema = StructType([
    StructField("Arrest", BooleanType()), StructField("CaseNumber", IntegerType()),
    StructField("Date", DateType()), StructField("Description", StringType()),
    ....
)
])

df = sqlContext.read.format("com.ryft.spark.connector.sql")
    .schema(schema)
    .option("files", "*.pcrime")
    .option("format", xml)
    .option("date_format", "MM/dd/yyyy hh:mm:ss aa")
    .load()
df.registerTempTable("temp_table")

df = sqlContext.sql("select Date, ID, Description, Arrest from temp_table\
       where Description LIKE '%VEHICLE%'\
          AND  Date > to_date('2015-03-15')\
          AND Arrest = true\
       ORDER BY Date")

NOTE: Since Data Frames use record fields name for both data output and query generation, it is important to have the field names in RDF be the same as the names of the tagger record. In example below, the name field is properly defined where the job is not.

...
record_start  = "<log>";
record_end    = "</log>";
data_type    = "XML";

fields = {
        name        = ( "<name>",                 "</name>" ),
        jobDesc     = ( "<job>",                  "</job>" )
};

For more examples in Scala, Java, PySpark an SparkR please look in examples directory.

Persisting RDD into Ryft

A RDD can be saved to an RyftBox by using the saveToRyft method. You can specify int using fllowing parameters:

  • path - a path of stored RDD on RyftBox
  • url - Ryft REST mount point URL
  • format - format of stored RDD (e.g. json)
  • catalog - path to appended catalog file (optional)
  • overwrite - whether to overwrite file/catalog or append it, default value is false (append mode)
  • local - the local/cluster flag (true by default)
  • lifetime - the lifetime of the uploaded file or catalog (optional)

Example in scala:

  val query = SimpleQuery("Jones")
  val queryOptions = RyftQueryOptions("passengers.txt", 10, 0 toByte)
  
  val ryftRDD = sc.ryftRDD(Seq(query), queryOptions)
  
  ryftRDD.saveToRyft("jones.xml", "http://ryft.com", xml, "passengers.catalog")

Example in java:

    final SparkContext sc = new SparkContext(sparkConf);
    final SparkContextJavaFunctions javaFunctions = RyftJavaUtil.javaFunctions(sc);
        
    final SimpleQuery query = RyftQueryUtil.toSimpleQuery("Jones");
    
    final RyftJavaRDD rdd = javaFunctions.ryftRDD(query,
            RyftQueryOptions.apply("passengers.txt", 0, 10),
            RyftJavaUtil.ryftQueryToEmptyList,
            RyftJavaUtil.stringToEmptySet);
            
     final Map<String, String> params = new HashMap<>();
     params.put("url", "http://ryft.com");
     params.put("format", "xml");
     params.put("catalog", "passengers.catalog");

    javaFunctions.saveToRyft(rdd, "jones.xml", params);

Also connector supports saving each entry from PairRDD by using saveEachEntry method. In this case key of an entry uses as saving path and value takes as stored data.

This is a scala example:

  val query = Seq(SimpleQuery("Jones"),SimpleQuery("Thomas"))
  val queryOptions = RyftQueryOptions("passengers.txt", 10, 0 toByte)
  
  val ryftRDD = sc.ryftPairRDD(query, queryOptions)

  val result = ryftRDD.map((_, 1)).reduceByKey(_ + _).saveEachEntryToRyft(Map("url" -> "http://ryft.com", "format" -> "xml", "catalog" -> "passengers.catalog")

Persisting DataFrame into Ryft

Storing DataFrame is similar to section above. Here's an example in scala:

  val schema = StructType(Seq(
    StructField("Arrest", BooleanType), StructField("CaseNumber", StringType),
    StructField("Date", DateType), StructField("Description", StringType), 
    ....
  ))
  
  sqlContext.read.ryft(schema, xml, "*.pcrime", "temp_table", Map("date_format" -> "MM/dd/yyyy hh:mm:ss aa"))

  val df = sqlContext.sql(
    """select Date, Description, Arrest from temp_table
       where Description LIKE '%VEHICLE%'
          AND  Date > to_date('2015-03-15')
       ORDER BY Date
    """)
    
    df.saveToRyft("2015-03-15.xml", Map("url" -> "http://ryft.com", "format" -> "xml", "overwrite" -> true);

Same code in python:

  schema = StructType([
      StructField("Arrest", BooleanType()), StructField("CaseNumber", IntegerType()),
      StructField("Date", DateType()), StructField("Description", StringType()),
      ....
  )
  ])
  
  df = sqlContext.read\
      .format("com.ryft.spark.connector.sql")\
      .schema(schema)\
      .option("files", "*.pcrime")\
      .option("format", xml)\
      .option("date_format", "MM/dd/yyyy hh:mm:ss aa")\
      .load()
  df.registerTempTable("temp_table")
  
  df = sqlContext.sql("select Date, ID, Description, Arrest from temp_table\
         where Description LIKE '%VEHICLE%'\
            AND  Date > to_date('2015-03-15')\
            AND Arrest = true\
         ORDER BY Date")
         
  df.write\
      .format("com.ryft.spark.connector.sql")\
      .options(url="http://ryft.com", format="xml", overwrite="true")\
      .save("2015-03-15.xml")

Spark Config Options

The following options can be set via SparkConf, command line options or Zeppelin Spark interpreter settings:

  • spark.ryft.rest.url - semicolon separated list of Ryft rest endpoints for search. For example: http://ryftone-1:8765;http://ryftone-2:8765
  • spark.ryft.consul.url - Ryft Consul address. For example: http://ryftone-1:8500
  • spark.ryft.rest.ssl.untrusted - boolean value that allows the use of self-signed SSL certificate. Set true only for testing.
  • spark.ryft.nodes - number of Ryft hardware nodes to be used by queries. Corresponds to nodes rest api query parameter.
  • spark.ryft.partitioner - Canonical class name that implements partitioning logic for data partitioning and collocation. See examples below.
  • spark.ryft.consul.partitioning - Whether to use Consul partitioning mechanism.
  • spark.ryft.auth.username - login for password authentication (required)
  • spark.ryft.auth.password - password for password authentication (required)
  • spark.ryft.rest.save.numRetries - amount of retries for save to Ryft, default value is 3

Data Partitioning and locality support

If setting spark.ryft.rest.url to multiple endpoints, then by default, search requests will be done on each of the servers and the results will be combined. To override this logic, do round-robin requests or execute requests depending on query value, then you can specify partitioning function or class.

The Partitioning class can be used with both RDD and DataFrame examples regardless of programming language. It is applied by loading jar file in spark context with the class that extends com.ryft.spark.connector.partitioner.RyftPartitioner and specifying its cannonical name via spark.ryft.partitioner configuration value. It can be done globally or on per query level. See full example or this code snippet:

  sqlContext.read.ryft(schema, "*.pcrime", "temp_table", classOf[ArrestPartitioner].getCanonicalName)
  ...
  
  
  class ArrestPartitioner extends RyftPartitioner {
  override def partitions(query: RyftQuery): Set[URL] = query match {
    case rq: RecordQuery =>
      partitionsAcc(rq.entries, List.empty[String]).filter(_.nonEmpty).map(new URL(_))
    case _ =>
      throw RyftSparkException(s"Unable to find partitions for such type of query: ${query.getClass}")
  }

  @tailrec
  private def partitionsAcc(entries: Set[(String,String)], acc: List[String]): Set[String] = {
    if(entries.isEmpty) acc.toSet
    else partitionsAcc(entries.tail, byFirsLetter(entries.head) :: acc)
  }

  private def byFirsLetter(entry: (String,String)) = {
    if (entry._1.equalsIgnoreCase("arrest")) {
      if (entry._2.equalsIgnoreCase("true")) "http://ryftone-1:8765"
      else "http://ryftone-2:8765"  
    } else ""
  }
}

The partitioning function works for RDD requests in Java and Scala, and can be specified as a parameter to the RDD methods. For example:

// If country name starts with [a-n] go to ryftone-2:8765
// Otherwise go to ryftone-2:8765
def byCountry(query: RyftQuery): List[String] = {
    def byFirstLetter(country: String): List[String] = {
        if (('a' to 'n').contains(country.head.toLower)) 
            List("http://ryftone-1:8765")
        else 
            List("http://ryftone-2:8765")   
    }
    
    RyftPartitioner.byField(query, byFirstLetter, Seq("country")).toList
}
  
sc.ryftRDD(List(query), qoptions, byCountry)
    .asInstanceOf[RyftRDD[Map[String, String]]].map(r=> new Record(r))
    .toDF().cache().registerTempTable("query2")
    
println("query2: " + sqlc.table("query2").count)

In case of Spark node running on Ryft ONE box, you can use partitioning to implement co-location strategy. For example:

import java.net.URL

// This query uses preferred node selection
// All requests to ryft boxes will request execution on the corresponding node (with same ip)
// If no nodes availble execution will fallback to other free nodes

def sameNodeSelection(url: String) = {
    val preferredNodes = Set(new URL(url).getHost)
    println("preferred nodes: " + url + "->" + preferredNodes)
    preferredNodes
  }
  
sc.ryftRDD(List(query), qoptions, nopart, sameNodeSelection)
    .asInstanceOf[RyftRDD[Map[String, String]]].map(r=> new Record(r))
    .toDF().cache().registerTempTable("query3")
    
println("query3: " + sqlc.table("query3").count)

NOTE: When multiple partitioning rules are applied, the one with most priority is used from low to high: partition class in sparkConfig, partition class in query, partition function.

##CI and Publishing

Continuous integration and deployment are managed with Travis CI service. As soon as a commit is pushed to the repository on Github, a continuous integration cycle is launched:

  • automatic building for Pull Requests and commits;
  • automatic tests execution for Pull Requests and commits;
  • building and testing project against multiple Scala versions: 2.10.6 and 2.11.7;
  • notifying about build results;
  • uploading assets to Maven Central and Github Releases repositories (only for tagged commit on master branch);

Code Contributions

Contributions guidelines can be found in CONTRIBUTING file.

##License Ryft-Customized BSD License is covered in LICENSE file.