From 914c55cbe2732dc34e549082a4d67832c5c502c5 Mon Sep 17 00:00:00 2001 From: Yang Lei Date: Wed, 29 Mar 2017 17:32:51 -0400 Subject: [PATCH 1/7] [BAHIR-101] Initial code of SparkSQL for Cloudant --- pom.xml | 17 +- sql-cloudant/pom.xml | 110 +++++++ .../src/main/resources/application.conf | 14 + .../src/main/resources/reference.conf | 0 .../bahir/cloudant/CloudantConfig.scala | 273 ++++++++++++++++++ .../bahir/cloudant/CloudantReceiver.scala | 90 ++++++ .../apache/bahir/cloudant/DefaultSource.scala | 159 ++++++++++ .../bahir/cloudant/common/FilterUtil.scala | 149 ++++++++++ .../common/JsonStoreConfigManager.scala | 213 ++++++++++++++ .../cloudant/common/JsonStoreDataAccess.scala | 272 +++++++++++++++++ .../bahir/cloudant/common/JsonStoreRDD.scala | 106 +++++++ .../bahir/cloudant/common/JsonUtil.scala | 42 +++ 12 files changed, 1444 insertions(+), 1 deletion(-) create mode 100644 sql-cloudant/pom.xml create mode 100644 sql-cloudant/src/main/resources/application.conf create mode 100644 sql-cloudant/src/main/resources/reference.conf create mode 100644 sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala create mode 100644 sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala create mode 100644 sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala create mode 100644 sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/FilterUtil.scala create mode 100644 sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala create mode 100644 sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala create mode 100644 sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala create mode 100644 sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonUtil.scala diff --git a/pom.xml b/pom.xml index f9ee4a09..73cac1f5 100644 --- a/pom.xml +++ b/pom.xml @@ -75,6 +75,7 @@ + sql-cloudant streaming-akka streaming-mqtt sql-streaming-mqtt @@ -162,7 +163,7 @@ - + org.scala-lang @@ -450,6 +463,8 @@ **/README.md **/examples/data/*.txt **/*.iml + **/src/main/resources/application.conf + **/src/main/resources/reference.conf diff --git a/sql-cloudant/pom.xml b/sql-cloudant/pom.xml new file mode 100644 index 00000000..e4900d5a --- /dev/null +++ b/sql-cloudant/pom.xml @@ -0,0 +1,110 @@ + + + + + 4.0.0 + + org.apache.bahir + bahir-parent_2.11 + 2.2.0-SNAPSHOT + ../pom.xml + + + org.apache.bahir + spark-sql-cloudant_2.11 + + spark-sql-cloudant + + jar + Apache Bahir - Spark SQL Cloudant DataSource + http://bahir.apache.org/ + + + + com.typesafe.play + play-json_${scala.binary.version} + + + + + org.scalaj + scalaj-http_${scala.binary.version} + + + org.apache.spark + spark-tags_${scala.binary.version} + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${spark.version} + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + test-jar + test + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + test-jar + test + + + org.scalacheck + scalacheck_${scala.binary.version} + test + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + diff --git a/sql-cloudant/src/main/resources/application.conf b/sql-cloudant/src/main/resources/application.conf new file mode 100644 index 00000000..2d8b2368 --- /dev/null +++ b/sql-cloudant/src/main/resources/application.conf @@ -0,0 +1,14 @@ +spark-sql { + bulkSize = 200 + schemaSampleSize = -1 + createDBOnSave = false + jsonstore.rdd = { + partitions = 10 + maxInPartition = -1 + minInPartition = 10 + requestTimeout = 900000 + } + cloudant = { + protocol = https + } +} diff --git a/sql-cloudant/src/main/resources/reference.conf b/sql-cloudant/src/main/resources/reference.conf new file mode 100644 index 00000000..e69de29b diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala new file mode 100644 index 00000000..ac14f4ba --- /dev/null +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bahir.cloudant + +import java.net.URLEncoder + +import play.api.libs.json.JsArray +import play.api.libs.json.Json +import play.api.libs.json.JsValue + +import org.apache.bahir.cloudant.common._ + +/* +* Only allow one field pushdown now +* as the filter today does not tell how to link the filters out And v.s. Or +*/ + +class CloudantConfig(val protocol: String, val host: String, + val dbName: String, val indexName: String = null, val viewName: String = null) + (implicit val username: String, val password: String, + val partitions: Int, val maxInPartition: Int, val minInPartition: Int, + val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int, + val createDBOnSave: Boolean, val selector: String) + extends Serializable{ + + private val SCHEMA_FOR_ALL_DOCS_NUM = -1 + private lazy val dbUrl = {protocol + "://" + host + "/" + dbName} + + val pkField = "_id" + val defaultIndex = "_all_docs" // "_changes" does not work for partition + val default_filter: String = "*:*" + + def getChangesUrl(): String = { + dbUrl + "/_changes?include_docs=true&feed=normal" + } + + def getContinuousChangesUrl(): String = { + var url = dbUrl + "/_changes?include_docs=true&feed=continuous&heartbeat=3000" + if (selector != null) { + url = url + "&filter=_selector" + } + url + } + + def getSelector() : String = { + selector + } + + def getDbUrl(): String = { + dbUrl + } + + def getLastUrl(skip: Int): String = { + if (skip ==0 ) null + else s"$dbUrl/$defaultIndex?limit=$skip" + } + + def getSchemaSampleSize(): Int = { + schemaSampleSize + } + + def getCreateDBonSave(): Boolean = { + createDBOnSave + } + + def getLastNum(result: JsValue): JsValue = (result \ "last_seq").get + + def getTotalUrl(url: String): String = { + if (url.contains('?')) { + url + "&limit=1" + } else { + url + "?limit=1" + } + } + + def getDbname(): String = { + dbName + } + + def allowPartition(): Boolean = {indexName==null} + + def getOneUrl(): String = { + dbUrl + "/_all_docs?limit=1&include_docs=true" + } + + def getOneUrlExcludeDDoc1(): String = { + dbUrl + "/_all_docs?endkey=%22_design/%22&limit=1&include_docs=true" + } + + def getOneUrlExcludeDDoc2(): String = { + dbUrl + "/_all_docs?startkey=%22_design0/%22&limit=1&include_docs=true" + } + + def getAllDocsUrlExcludeDDoc(limit: Int): String = { + if (viewName == null) { + dbUrl + "/_all_docs?startkey=%22_design0/%22&limit=" + limit + "&include_docs=true" + } else { + dbUrl + "/" + viewName + "?limit=1" + } + } + + def getAllDocsUrl(limit: Int): String = { + if (viewName == null) { + if (limit == SCHEMA_FOR_ALL_DOCS_NUM) { + dbUrl + "/_all_docs?include_docs=true" + } else { + dbUrl + "/_all_docs?limit=" + limit + "&include_docs=true" + } + } else { + if (limit == JsonStoreConfigManager.SCHEMA_FOR_ALL_DOCS_NUM) { + dbUrl + "/" + viewName + } else { + dbUrl + "/" + viewName + "?limit=" + limit + } + } + } + + def getRangeUrl(field: String = null, start: Any = null, + startInclusive: Boolean = false, end: Any = null, + endInclusive: Boolean = false, + includeDoc: Boolean = true): (String, Boolean) = { + val (url: String, pusheddown: Boolean) = + calculate(field, start, startInclusive, end, endInclusive) + if (includeDoc) { + if (url.indexOf('?') > 0) { + (url + "&include_docs=true", pusheddown) + } else { + (url + "?include_docs=true", pusheddown) + } + } else { + (url, pusheddown) + } + } + + private def calculate(field: String, start: Any, startInclusive: Boolean, + end: Any, endInclusive: Boolean): (String, Boolean) = { + if (field != null && field.equals(pkField)) { + var condition = "" + if (start != null && end != null && start.equals(end)) { + condition += "?key=%22" + URLEncoder.encode(start.toString(), "UTF-8") + "%22" + } else { + if (start != null) { + condition += "?startkey=%22" + URLEncoder.encode( + start.toString(), "UTF-8") + "%22" + } + if (end != null) { + if (start != null) { + condition += "&" + } else { + condition += "?" + } + condition += "endkey=%22" + URLEncoder.encode(end.toString(), "UTF-8") + "%22" + } + } + (dbUrl + "/_all_docs" + condition, true) + } else if (indexName!=null) { + // push down to indexName + val condition = calculateCondition(field, start, startInclusive, + end, endInclusive) + (dbUrl + "/" + indexName + "?q=" + condition, true) + } else if (viewName != null) { + (dbUrl + "/" + viewName, true) + } else { + (s"$dbUrl/$defaultIndex", false) + } + + } + + def calculateCondition(field: String, min: Any, minInclusive: Boolean = false, + max: Any, maxInclusive: Boolean = false) : String = { + if (field != null && (min != null || max!= null)) { + var condition = field + ":" + if (min!=null && max!=null && min.equals(max)) { + condition += min + } else { + if (minInclusive) { + condition+="[" + } else { + condition +="{" + } + if (min!=null) { + condition += min + } else { + condition+="*" + } + condition+=" TO " + if (max !=null) { + condition += max + } else { + condition += "*" + } + if (maxInclusive) { + condition+="]" + } else { + condition +="}" + } + } + URLEncoder.encode(condition, "UTF-8") + } else { + default_filter + } + } + + def getSubSetUrl (url: String, skip: Int, limit: Int) + (implicit convertSkip: (Int) => String): String = { + val suffix = { + if (url.indexOf("_all_docs")>0) "include_docs=true&limit=" + + limit + "&skip=" + skip + else if (url.indexOf("_changes")>0) "include_docs=true&limit=" + + limit + "&since=" + convertSkip(skip) + else if (viewName != null) { + "limit=" + limit + "&skip=" + skip + } else { + "include_docs=true&limit=" + limit + } // TODO Index query does not support subset query. Should disable Partitioned loading? + } + if (url.indexOf('?') > 0) { + url + "&" + suffix + } + else { + url + "?" + suffix + } + } + + def getTotalRows(result: JsValue): Int = { + val tr = (result \ "total_rows").asOpt[Int] + tr match { + case None => + (result \ "pending").as[Int] + 1 + case Some(tr2) => + tr2 + } + } + + def getRows(result: JsValue): Seq[JsValue] = { + if (viewName == null) { + ((result \ "rows").as[JsArray]).value.map(row => (row \ "doc").get) + } else { + ((result \ "rows").as[JsArray]).value.map(row => row) + } + } + + def getBulkPostUrl(): String = { + dbUrl + "/_bulk_docs" + } + + def getBulkRows(rows: List[String]): String = { + val docs = rows.map { x => Json.parse(x) } + Json.stringify(Json.obj("docs" -> Json.toJson(docs.toSeq))) + } + + def getConflictErrStr(): String = { + """"error":"conflict"""" + } + + def getForbiddenErrStr(): String = { + """"error":"forbidden"""" + } +} diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala new file mode 100644 index 00000000..0fcd4849 --- /dev/null +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bahir.cloudant + +// scalastyle:off +import scalaj.http._ + +import play.api.libs.json.Json + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver + +import org.apache.bahir.cloudant.common._ +// scalastyle:on + +class CloudantReceiver(cloudantParams: Map[String, String]) + extends Receiver[String](StorageLevel.MEMORY_AND_DISK) { + lazy val config: CloudantConfig = { + JsonStoreConfigManager.getConfig(cloudantParams: Map[String, String]) + .asInstanceOf[CloudantConfig] + } + + def onStart() { + // Start the thread that receives data over a connection + new Thread("Cloudant Receiver") { + override def run() { receive() } + }.start() + } + + private def receive(): Unit = { + val url = config.getContinuousChangesUrl() + val selector: String = if (config.getSelector() != null) { + "{\"selector\":" + config.getSelector() + "}" + } else { + "{}" + } + + val clRequest: HttpRequest = config.username match { + case null => + Http(url) + .postData(selector) + .timeout(connTimeoutMs = 1000, readTimeoutMs = 0) + .header("Content-Type", "application/json") + .header("User-Agent", "spark-cloudant") + case _ => + Http(url) + .postData(selector) + .timeout(connTimeoutMs = 1000, readTimeoutMs = 0) + .header("Content-Type", "application/json") + .header("User-Agent", "spark-cloudant") + .auth(config.username, config.password) + } + + clRequest.exec((code, headers, is) => { + if (code == 200) { + scala.io.Source.fromInputStream(is, "utf-8").getLines().foreach(line => { + if (line.length() > 0) { + val json = Json.parse(line) + val jsonDoc = (json \ "doc").get + val doc = Json.stringify(jsonDoc) + store(doc) + } + }) + } else { + val status = headers.getOrElse("Status", IndexedSeq.empty) + val errorMsg = "Error retrieving changes feed for a database %s : %s " + (config.getDbname(), status(0)) + reportError(errorMsg, new RuntimeException(errorMsg)) + } + }) + } + + def onStop(): Unit = { + + } +} diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala new file mode 100644 index 00000000..4c973f73 --- /dev/null +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bahir.cloudant + +import org.slf4j.LoggerFactory + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ + +import org.apache.bahir.cloudant.common.{FilterInterpreter, JsonStoreDataAccess, JsonStoreRDD, _} + +case class CloudantReadWriteRelation (config: CloudantConfig, + schema: StructType, + allDocsDF: DataFrame = null) + (@transient val sqlContext: SQLContext) + extends BaseRelation with PrunedFilteredScan with InsertableRelation { + + @transient lazy val dataAccess = {new JsonStoreDataAccess(config)} + + implicit lazy val logger = LoggerFactory.getLogger(getClass) + + def buildScan(requiredColumns: Array[String], + filters: Array[Filter]): RDD[Row] = { + val colsLength = requiredColumns.length + + if (allDocsDF != null) { + if (colsLength == 0) { + allDocsDF.select().rdd + } else if (colsLength == 1) { + allDocsDF.select(requiredColumns(0)).rdd + } else { + val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i) + allDocsDF.select(requiredColumns(0), colsExceptCol0: _*).rdd + } + } else { + val filterInterpreter = new FilterInterpreter(filters) + var searchField: String = { + if (filterInterpreter.containsFiltersFor(config.pkField)) { + config.pkField + } else { + filterInterpreter.firstField + } + } + + val (min, minInclusive, max, maxInclusive) = filterInterpreter.getInfo(searchField) + implicit val columns = requiredColumns + val (url: String, pusheddown: Boolean) = config.getRangeUrl(searchField, + min, minInclusive, max, maxInclusive, false) + if (!pusheddown) searchField = null + implicit val attrToFilters = filterInterpreter.getFiltersForPostProcess(searchField) + + val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config, url) + val df = sqlContext.read.json(cloudantRDD) + if (colsLength > 1) { + val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i) + df.select(requiredColumns(0), colsExceptCol0: _*).rdd + } else { + df.rdd + } + } + } + + + def insert(data: DataFrame, overwrite: Boolean): Unit = { + if (config.getCreateDBonSave()) { + dataAccess.createDB() + } + if (data.count() == 0) { + logger.warn(("Database " + config.getDbname() + + ": nothing was saved because the number of records was 0!")) + } else { + val result = data.toJSON.foreachPartition { x => + val list = x.toList // Has to pass as List, Iterator results in 0 data + dataAccess.saveAll(list) + } + } + } +} + +class DefaultSource extends RelationProvider + with CreatableRelationProvider + with SchemaRelationProvider { + + val logger = LoggerFactory.getLogger(getClass) + + def createRelation(sqlContext: SQLContext, + parameters: Map[String, String]): CloudantReadWriteRelation = { + create(sqlContext, parameters, null) + } + + private def create(sqlContext: SQLContext, + parameters: Map[String, String], + inSchema: StructType) = { + + val config: CloudantConfig = JsonStoreConfigManager.getConfig(sqlContext, parameters) + + var allDocsDF: DataFrame = null + + val schema: StructType = { + if (inSchema != null) { + inSchema + } else { + val df = if (config.getSchemaSampleSize() == + JsonStoreConfigManager.SCHEMA_FOR_ALL_DOCS_NUM && + config.viewName == null + && config.indexName == null) { + val filterInterpreter = new FilterInterpreter(null) + var searchField = null + val (min, minInclusive, max, maxInclusive) = + filterInterpreter.getInfo(searchField) + val (url: String, pusheddown: Boolean) = config.getRangeUrl(searchField, + min, minInclusive, max, maxInclusive, false) + val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config, url) + allDocsDF = sqlContext.read.json(cloudantRDD) + allDocsDF + } else { + val dataAccess = new JsonStoreDataAccess(config) + val aRDD = sqlContext.sparkContext.parallelize( + dataAccess.getMany(config.getSchemaSampleSize())) + sqlContext.read.json(aRDD) + } + df.schema + } + } + CloudantReadWriteRelation(config, schema, allDocsDF)(sqlContext) + } + + def createRelation(sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): CloudantReadWriteRelation = { + val relation = create(sqlContext, parameters, data.schema) + relation.insert(data, mode==SaveMode.Overwrite) + relation + } + + def createRelation(sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): CloudantReadWriteRelation = { + create(sqlContext, parameters, schema) + } + +} diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/FilterUtil.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/FilterUtil.scala new file mode 100644 index 00000000..12cd81c4 --- /dev/null +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/FilterUtil.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bahir.cloudant.common + +import org.slf4j.LoggerFactory +import play.api.libs.json.{JsString, JsValue} + +import org.apache.spark.sql.sources._ + + +/** + * Only handles the following filter condition + * 1. EqualTo,GreaterThan,LessThan,GreaterThanOrEqual,LessThanOrEqual,In + * 2. recursive AND of (filters in 1 and AND). Issue: Spark 1.3.0 does not return + * AND filter instead returned 2 filters + */ +class FilterInterpreter(origFilters: Array[Filter]) { + + private val logger = LoggerFactory.getLogger(getClass) + + lazy val firstField = { + if (origFilters.length > 0) getFilterAttribute(origFilters(0)) + else null + } + + private lazy val filtersByAttr = { + origFilters + .filter(f => getFilterAttribute(f) != null) + .map(f => (getFilterAttribute(f), f)) + .groupBy(attrFilter => attrFilter._1) + .mapValues(a => a.map(p => p._2)) + } + + private def getFilterAttribute(f: Filter): String = { + val result = f match { + case EqualTo(attr, v) => attr + case GreaterThan(attr, v) => attr + case LessThan(attr, v) => attr + case GreaterThanOrEqual(attr, v) => attr + case LessThanOrEqual(attr, v) => attr + case In(attr, v) => attr + case IsNotNull(attr) => attr + case IsNull(attr) => attr + case _ => null + } + result + } + + def containsFiltersFor(key: String): Boolean = { + filtersByAttr.contains(key) + } + + private lazy val analyzedFilters = { + filtersByAttr.map(m => m._1 -> analyze(m._2)) + } + + private def analyze(filters: Array[Filter]): (Any, Boolean, Any, Boolean, Array[Filter]) = { + + var min: Any = null + var minInclusive: Boolean = false + var max: Any = null + var maxInclusive: Boolean = false + var others: Array[Filter] = Array[Filter]() + + def evaluate(filter: Filter) { + filter match { + case GreaterThanOrEqual(attr, v) => min = v; minInclusive = true + case LessThanOrEqual(attr, v) => max = v; maxInclusive = true + case EqualTo(attr, v) => min = v; max = v + case GreaterThan(attr, v) => min = v + case LessThan(attr, v) => max = v + case _ => others = others :+ filter + } + } + + filters.map(f => evaluate(f)) + + logger.info(s"Calculated range info: min=$min," + + s" minInclusive=$minInclusive," + + s"max=$max," + + s"maxInclusive=$maxInclusive," + + s"others=$others") + (min, minInclusive, max, maxInclusive, others) + } + + def getInfo(field: String): (Any, Boolean, Any, Boolean) = { + if (field == null) (null, false, null, false) + else { + val data = analyzedFilters.getOrElse(field, (null, false, null, false, null)) + (data._1, data._2, data._3, data._4) + } + } + + def getFiltersForPostProcess(pushdownField: String): Map[String, Array[Filter]] = { + filtersByAttr.map(f => { + if (f._1.equals(pushdownField)) f._1 -> analyzedFilters.get(pushdownField).get._5 + else f._1 -> f._2 + }) + } +} + +/** + * + */ +class FilterUtil(filters: Map[String, Array[Filter]]) { + private val logger = LoggerFactory.getLogger(getClass) + def apply(implicit r: JsValue = null): Boolean = { + if (r == null) return true + val satisfied = filters.forall({ + case (attr, filters) => + val field = JsonUtil.getField(r, attr).getOrElse(null) + if (field == null) { + logger.debug(s"field $attr not exisit:$r") + false + } else { + true + } + }) + satisfied + } +} + + +object FilterDDocs { + def filter(row: JsValue): Boolean = { + if (row == null) return true + val id : String = JsonUtil.getField(row, "_id"). + getOrElse(null).as[JsString].value + if (id.startsWith("_design")) { + false + } else { + true + } + } +} diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala new file mode 100644 index 00000000..8df7b63c --- /dev/null +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bahir.cloudant.common + +import com.typesafe.config.ConfigFactory + +import org.apache.spark.sql.SQLContext +import org.apache.spark.SparkConf + +import org.apache.bahir.cloudant.CloudantConfig + + object JsonStoreConfigManager { + val CLOUDANT_CONNECTOR_VERSION = "2.0.0" + val SCHEMA_FOR_ALL_DOCS_NUM = -1 + + private val CLOUDANT_HOST_CONFIG = "cloudant.host" + private val CLOUDANT_USERNAME_CONFIG = "cloudant.username" + private val CLOUDANT_PASSWORD_CONFIG = "cloudant.password" + private val CLOUDANT_PROTOCOL_CONFIG = "cloudant.protocol" + + + private val PARTITION_CONFIG = "jsonstore.rdd.partitions" + private val MAX_IN_PARTITION_CONFIG = "jsonstore.rdd.maxInPartition" + private val MIN_IN_PARTITION_CONFIG = "jsonstore.rdd.minInPartition" + private val REQUEST_TIMEOUT_CONFIG = "jsonstore.rdd.requestTimeout" + private val BULK_SIZE_CONFIG = "bulkSize" + private val SCHEMA_SAMPLE_SIZE_CONFIG = "schemaSampleSize" + private val CREATE_DB_ON_SAVE = "createDBOnSave" + + + private val configFactory = ConfigFactory.load() + + private val ROOT_CONFIG_NAME = "spark-sql" + private val rootConfig = configFactory.getConfig(ROOT_CONFIG_NAME) + + + /** + * The sequence of getting configuration + * 1. "spark."+key in the SparkConf + * (as they are treated as the one passed in through spark-submit) + * 2. key in the parameters, which is set in DF option + * 3. key in the SparkConf, which is set in SparkConf + * 4. default in the Config, which is set in the application.conf + */ + + + private def getInt(sparkConf: SparkConf, parameters: Map[String, String], + key: String) : Int = { + val valueS = parameters.getOrElse(key, null) + if (sparkConf != null) { + val default = { + if (valueS == null) { + sparkConf.getInt(key, rootConfig.getInt(key)) + } else { + valueS.toInt + } + } + sparkConf.getInt(s"spark.$key", default) + } else { + if (valueS == null) { + rootConfig.getInt(key) + } else { + valueS.toInt + } + } + } + + private def getLong(sparkConf: SparkConf, parameters: Map[String, String], + key: String) : Long = { + val valueS = parameters.getOrElse(key, null) + if (sparkConf != null) { + val default = { + if (valueS == null) { + sparkConf.getLong(key, rootConfig.getLong(key)) + } else { + valueS.toLong + } + } + sparkConf.getLong(s"spark.$key", default) + } else { + if (valueS == null) rootConfig.getLong(key) else valueS.toLong + } + } + + private def getString(sparkConf: SparkConf, parameters: Map[String, String], + key: String) : String = { + val defaultInConfig = if (rootConfig.hasPath(key)) rootConfig.getString(key) else null + val valueS = parameters.getOrElse(key, null) + if (sparkConf != null) { + val default = { + if (valueS == null) { + sparkConf.get(key, defaultInConfig) + } else { + valueS + } + } + sparkConf.get(s"spark.$key", default) + } else { + if (valueS == null) defaultInConfig else valueS + } + } + + private def getBool(sparkConf: SparkConf, parameters: Map[String, String], + key: String) : Boolean = { + val valueS = parameters.getOrElse(key, null) + if (sparkConf != null) { + val default = { + if (valueS == null) { + sparkConf.getBoolean(key, rootConfig.getBoolean(key)) + } else { + valueS.toBoolean + } + } + sparkConf.getBoolean(s"spark.$key", default) + } else + if (valueS == null) { + rootConfig.getBoolean(key) + } else { + valueS.toBoolean + } + } + + + + def getConfig(context: SQLContext, parameters: Map[String, String]): CloudantConfig = { + + val sparkConf = context.sparkContext.getConf + + implicit val total = getInt(sparkConf, parameters, PARTITION_CONFIG) + implicit val max = getInt(sparkConf, parameters, MAX_IN_PARTITION_CONFIG) + implicit val min = getInt(sparkConf, parameters, MIN_IN_PARTITION_CONFIG) + implicit val requestTimeout = getLong(sparkConf, parameters, REQUEST_TIMEOUT_CONFIG) + implicit val bulkSize = getInt(sparkConf, parameters, BULK_SIZE_CONFIG) + implicit val schemaSampleSize = getInt(sparkConf, parameters, SCHEMA_SAMPLE_SIZE_CONFIG) + implicit val createDBOnSave = getBool(sparkConf, parameters, CREATE_DB_ON_SAVE) + + val dbName = parameters.getOrElse("database", parameters.getOrElse("path", null)) + val indexName = parameters.getOrElse("index", null) + val viewName = parameters.getOrElse("view", null) + + // FIXME: Add logger + // scalastyle:off println + println(s"Use connectorVersion=$CLOUDANT_CONNECTOR_VERSION, dbName=$dbName, " + + s"indexName=$indexName, viewName=$viewName," + + s"$PARTITION_CONFIG=$total, $MAX_IN_PARTITION_CONFIG=$max," + + s"$MIN_IN_PARTITION_CONFIG=$min, $REQUEST_TIMEOUT_CONFIG=$requestTimeout," + + s"$BULK_SIZE_CONFIG=$bulkSize, $SCHEMA_SAMPLE_SIZE_CONFIG=$schemaSampleSize") + // scalastyle:on println + + val protocol = getString(sparkConf, parameters, CLOUDANT_PROTOCOL_CONFIG) + val host = getString( sparkConf, parameters, CLOUDANT_HOST_CONFIG) + val user = getString(sparkConf, parameters, CLOUDANT_USERNAME_CONFIG) + val passwd = getString(sparkConf, parameters, CLOUDANT_PASSWORD_CONFIG) + val selector = getString(sparkConf, parameters, "selector") + + if (host != null) { + new CloudantConfig(protocol, host, dbName, indexName, + viewName) (user, passwd, total, max, min, requestTimeout, bulkSize, + schemaSampleSize, createDBOnSave, selector) + } else { + throw new RuntimeException("Spark configuration is invalid! " + + "Please make sure to supply required values for cloudant.host.") + } + } + + def getConfig(parameters: Map[String, String]): CloudantConfig = { + val sparkConf = null + + implicit val total = getInt(sparkConf, parameters, PARTITION_CONFIG) + implicit val max = getInt(sparkConf, parameters, MAX_IN_PARTITION_CONFIG) + implicit val min = getInt(sparkConf, parameters, MIN_IN_PARTITION_CONFIG) + implicit val requestTimeout = getLong(sparkConf, parameters, REQUEST_TIMEOUT_CONFIG) + implicit val bulkSize = getInt(sparkConf, parameters, BULK_SIZE_CONFIG) + implicit val schemaSampleSize = getInt(sparkConf, parameters, SCHEMA_SAMPLE_SIZE_CONFIG) + implicit val createDBOnSave = getBool(sparkConf, parameters, CREATE_DB_ON_SAVE) + + val dbName = parameters.getOrElse("database", null) + + // scalastyle:off println + println(s"Use connectorVersion=$CLOUDANT_CONNECTOR_VERSION, dbName=$dbName, " + + s"$REQUEST_TIMEOUT_CONFIG=$requestTimeout") + // scalastyle:on println + + val protocol = getString(sparkConf, parameters, CLOUDANT_PROTOCOL_CONFIG) + val host = getString( sparkConf, parameters, CLOUDANT_HOST_CONFIG) + val user = getString(sparkConf, parameters, CLOUDANT_USERNAME_CONFIG) + val passwd = getString(sparkConf, parameters, CLOUDANT_PASSWORD_CONFIG) + val selector = getString(sparkConf, parameters, "selector") + + if (host != null) { + new CloudantConfig(protocol, host, dbName)(user, passwd, + total, max, min, requestTimeout, bulkSize, + schemaSampleSize, createDBOnSave, selector) + } else { + throw new RuntimeException("Cloudant parameters are invalid!" + + "Please make sure to supply required values for cloudant.host.") + } + } +} diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala new file mode 100644 index 00000000..e84a44c0 --- /dev/null +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bahir.cloudant.common + +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.HashMap +import scala.concurrent._ +import scala.concurrent.duration._ +import scala.language.implicitConversions +import scala.util.{Failure, Success} + +import scalaj.http.{Http, HttpRequest, HttpResponse} +import ExecutionContext.Implicits.global +import org.slf4j.LoggerFactory +import play.api.libs.json._ + +import org.apache.spark.sql.sources._ + +import org.apache.bahir.cloudant.CloudantConfig +import org.apache.bahir.cloudant.common._ + + +class JsonStoreDataAccess (config: CloudantConfig) { + lazy val logger = LoggerFactory.getLogger(getClass) + implicit lazy val timeout = config.requestTimeout + + def getOne()( implicit columns: Array[String] = null): Seq[String] = { + var r = this.getQueryResult[Seq[String]](config.getOneUrlExcludeDDoc1(), processAll) + if (r.size == 0 ) { + r = this.getQueryResult[Seq[String]](config.getOneUrlExcludeDDoc2(), processAll) + } + if (r.size == 0) { + throw new RuntimeException("Database " + config.getDbname() + + " doesn't have any non-design documents!") + } else { + r + } + } + + def getMany(limit: Int)(implicit columns: Array[String] = null): Seq[String] = { + if (limit == 0) { + throw new RuntimeException("Database " + config.getDbname() + + " schema sample size is 0!") + } + if (limit < -1) { + throw new RuntimeException("Database " + config.getDbname() + + " schema sample size is " + limit + "!") + } + var r = this.getQueryResult[Seq[String]](config.getAllDocsUrl(limit), processAll) + if (r.size == 0) { + r = this.getQueryResult[Seq[String]](config.getAllDocsUrlExcludeDDoc(limit), processAll) + } + if (r.size == 0) { + throw new RuntimeException("Database " + config.getDbname() + + " doesn't have any non-design documents!") + } else { + r + } + } + + def getAll[T](url: String) + (implicit columns: Array[String] = null, + attrToFilters: Map[String, Array[Filter]] = null): Seq[String] = { + this.getQueryResult[Seq[String]](url, processAll) + } + + def getIterator(skip: Int, limit: Int, url: String) + (implicit columns: Array[String] = null, + attrToFilters: Map[String, Array[Filter]] = null): Iterator[String] = { + implicit def convertSkip(skip: Int): String = { + val url = config.getLastUrl(skip) + if (url == null) { + skip.toString() + } else { + this.getQueryResult[String](url, + { result => config.getLastNum(Json.parse(result)).as[JsString].value}) + } + } + val newUrl = config.getSubSetUrl(url, skip, limit) + this.getQueryResult[Iterator[String]](newUrl, processIterator) + } + + def getTotalRows(url: String): Int = { + val totalUrl = config.getTotalUrl(url) + this.getQueryResult[Int](totalUrl, + { result => config.getTotalRows(Json.parse(result))}) + } + + private def processAll (result: String) + (implicit columns: Array[String], + attrToFilters: Map[String, Array[Filter]] = null) = { + logger.debug(s"processAll columns:$columns, attrToFilters:$attrToFilters") + val jsonResult: JsValue = Json.parse(result) + var rows = config.getRows(jsonResult) + if (config.viewName == null) { + // filter design docs + rows = rows.filter(r => FilterDDocs.filter(r)) + } + rows.map(r => convert(r)) + } + + private def processIterator (result: String) + (implicit columns: Array[String], + attrToFilters: Map[String, Array[Filter]] = null): Iterator[String] = { + processAll(result).iterator + } + + private def convert(rec: JsValue)(implicit columns: Array[String]): String = { + if (columns == null) return Json.stringify(Json.toJson(rec)) + val m = new HashMap[String, JsValue]() + for ( x <- columns) { + val field = JsonUtil.getField(rec, x).getOrElse(JsNull) + m.put(x, field) + } + val result = Json.stringify(Json.toJson(m.toMap)) + logger.debug(s"converted: $result") + result + } + + + def getChanges(url: String, processResults: (String) => String): String = { + getQueryResult(url, processResults) + } + + + private def getQueryResult[T] + (url: String, postProcessor: (String) => T) + (implicit columns: Array[String] = null, + attrToFilters: Map[String, Array[Filter]] = null) : T = { + logger.warn("Loading data from Cloudant using query: " + url) + val requestTimeout = config.requestTimeout.toInt + val clRequest: HttpRequest = config.username match { + case null => + Http(url) + .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout) + .header("User-Agent", "spark-cloudant") + case _ => + Http(url) + .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout) + .header("User-Agent", "spark-cloudant") + .auth(config.username, config.password) + } + + val clResponse: HttpResponse[String] = clRequest.execute() + if (! clResponse.isSuccess) { + throw new RuntimeException("Database " + config.getDbname() + + " request error: " + clResponse.body) + } + val data = postProcessor(clResponse.body) + logger.debug(s"got result:$data") + data + } + + + def createDB(): Unit = { + val url = config.getDbUrl() + val requestTimeout = config.requestTimeout.toInt + val clRequest: HttpRequest = config.username match { + case null => + Http(url) + .method("put") + .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout) + .header("User-Agent", "spark-cloudant") + case _ => + Http(url) + .method("put") + .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout) + .header("User-Agent", "spark-cloudant") + .auth(config.username, config.password) + } + + val clResponse: HttpResponse[String] = clRequest.execute() + if (! clResponse.isSuccess) { + throw new RuntimeException("Database " + config.getDbname() + + " create error: " + clResponse.body) + } else { + logger.warn(s"Database ${config.getDbname()} was created.") + } + } + + + def getClPostRequest(data: String): HttpRequest = { + val url = config.getBulkPostUrl() + val requestTimeout = config.requestTimeout.toInt + config.username match { + case null => + Http(url) + .postData(data) + .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout) + .header("Content-Type", "application/json") + .header("User-Agent", "spark-cloudant") + case _ => + Http(url) + .postData(data) + .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout) + .header("Content-Type", "application/json") + .header("User-Agent", "spark-cloudant") + .auth(config.username, config.password) + } + } + + + def saveAll(rows: List[String]): Unit = { + if (rows.size == 0) return + val bulkSize = config.bulkSize + val bulks = rows.grouped(bulkSize).toList + val totalBulks = bulks.size + logger.debug(s"total records:${rows.size}=bulkSize:$bulkSize * totalBulks:$totalBulks") + + val futures = bulks.map( bulk => { + val data = config.getBulkRows(bulk) + val clRequest: HttpRequest = getClPostRequest(data) + Future { + clRequest.execute() + } + } + ) + // remaining - number of requests remained to succeed + val remaining = new AtomicInteger(futures.length) + val p = Promise[HttpResponse[String]] + futures foreach { + _ onComplete { + case Success(clResponse: HttpResponse[String]) => + // find if there was error in saving at least one of docs + val resBody: String = clResponse.body + val isErr = (resBody contains config.getConflictErrStr()) || + (resBody contains config.getForbiddenErrStr()) + if (!(clResponse.isSuccess) || isErr) { + val e = new RuntimeException("Save to database:" + config.getDbname() + + " failed with reason: " + clResponse.body) + p.tryFailure(e) + } else if (remaining.decrementAndGet() == 0) { + // succeed the whole save operation if all requests success + p.trySuccess(clResponse) + } + // if a least one save request fails - fail the whole save operation + case Failure(e) => + p.tryFailure(e) + } + } + + val mainFtr = p.future + mainFtr onSuccess { + case clResponsesList => + logger.warn(s"Saved total ${rows.length} " + + s"with bulkSize $bulkSize " + + s"for database: ${config.getDbname()}") + } + mainFtr onFailure { + case e => + throw new RuntimeException("Save to database:" + config.getDbname() + + " failed with reason: " + e.getMessage) + } + Await.result(mainFtr, (config.requestTimeout * totalBulks).millis) // scalastyle:ignore + } + +} diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala new file mode 100644 index 00000000..46774f54 --- /dev/null +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bahir.cloudant.common + +import org.slf4j.LoggerFactory + +import org.apache.spark.Partition +import org.apache.spark.SparkContext +import org.apache.spark.TaskContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.sources.Filter + +import org.apache.bahir.cloudant.CloudantConfig + +/** + * JsonStoreRDDPartition defines each partition as a subset of a query result: + * the limit rows returns and the skipped rows. + */ + +private[cloudant] class JsonStoreRDDPartition(val skip: Int, val limit: Int, + val idx: Int, val config: CloudantConfig, + val attrToFilters: Map[String, Array[Filter]]) + extends Partition with Serializable{ + val index = idx +} + +/** + * The main purpose of JsonStoreRDD is to be able to create parallel read + * by partition for dataaccess getAll (by condition) scenarios + * defaultPartitions : how many partition intent, + * will be re-calculate based on the value based on total rows + * and minInPartition / maxInPartition ) + * maxRowsInPartition: -1 means unlimited + */ +class JsonStoreRDD(sc: SparkContext, config: CloudantConfig, + url: String)(implicit requiredcolumns: Array[String] = null, + attrToFilters: Map[String, Array[Filter]] = null) + extends RDD[String](sc, Nil) { + + lazy val totalRows = { + new JsonStoreDataAccess(config).getTotalRows(url) + } + lazy val totalPartition = { + if (totalRows == 0 || ! config.allowPartition() ) 1 + else if (totalRows < config.partitions * config.minInPartition) { + val total = totalRows / config.minInPartition + if (total == 0 ) { + total + 1 + } else { + total + } + } + else if (config.maxInPartition <=0) config.partitions + else { + val total = totalRows / config.maxInPartition + if ( totalRows % config.maxInPartition != 0) { + total + 1 + } + else { + total + } + } + } + + lazy val limitPerPartition = { + val limit = totalRows/totalPartition + if (totalRows % totalPartition != 0) { + limit + 1 + } else { + limit + } + } + + override def getPartitions: Array[Partition] = { + val logger = LoggerFactory.getLogger(getClass) + logger.info(s"Partition config - total=$totalPartition, " + + s"limit=$limitPerPartition for totalRows of $totalRows") + + (0 until totalPartition).map(i => { + val skip = i * limitPerPartition + new JsonStoreRDDPartition(skip, limitPerPartition, i, config, + attrToFilters).asInstanceOf[Partition] + }).toArray + } + + override def compute(splitIn: Partition, context: TaskContext): + Iterator[String] = { + val myPartition = splitIn.asInstanceOf[JsonStoreRDDPartition] + new JsonStoreDataAccess(myPartition.config).getIterator(myPartition.skip, + myPartition.limit, url) + } +} diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonUtil.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonUtil.scala new file mode 100644 index 00000000..cd46b16f --- /dev/null +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonUtil.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bahir.cloudant.common + +import play.api.libs.json.{JsUndefined, JsValue} +import scala.util.control.Breaks._ + +object JsonUtil { + def getField(row: JsValue, field: String) : Option[JsValue] = { + var path = field.split('.') + var currentValue = row + var finalValue: Option[JsValue] = None + breakable { + for (i <- path.indices) { + val f: Option[JsValue] = (currentValue \ path(i)).toOption + f match { + case Some(f2) => currentValue = f2 + case None => break + } + if (i == path.length -1) { + // The leaf node + finalValue = Some(currentValue) + } + } + } + finalValue + } +} From 42dd883db06b490ad2c81c2d49c23c590652f550 Mon Sep 17 00:00:00 2001 From: Yang Lei Date: Thu, 30 Mar 2017 22:46:44 -0400 Subject: [PATCH 2/7] add examples, README and fix pom.xml for the missing application.conf --- sql-cloudant/README.md | 329 ++++++++++++++++++ sql-cloudant/examples/python/CloudantApp.py | 45 +++ sql-cloudant/examples/python/CloudantDF.py | 75 ++++ .../examples/python/CloudantDFOption.py | 72 ++++ .../examples/sql/cloudant/CloudantApp.scala | 73 ++++ .../examples/sql/cloudant/CloudantDF.scala | 64 ++++ .../sql/cloudant/CloudantDFOption.scala | 71 ++++ .../sql/cloudant/CloudantStreaming.scala | 99 ++++++ .../cloudant/CloudantStreamingSelector.scala | 64 ++++ sql-cloudant/pom.xml | 5 + .../bahir/cloudant/CloudantReceiver.scala | 3 +- 11 files changed, 898 insertions(+), 2 deletions(-) create mode 100644 sql-cloudant/README.md create mode 100644 sql-cloudant/examples/python/CloudantApp.py create mode 100644 sql-cloudant/examples/python/CloudantDF.py create mode 100644 sql-cloudant/examples/python/CloudantDFOption.py create mode 100644 sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantApp.scala create mode 100644 sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantDF.scala create mode 100644 sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantDFOption.scala create mode 100644 sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala create mode 100644 sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala diff --git a/sql-cloudant/README.md b/sql-cloudant/README.md new file mode 100644 index 00000000..36be297a --- /dev/null +++ b/sql-cloudant/README.md @@ -0,0 +1,329 @@ +Spark Cloudant Connector +================ + +Cloudant integration with Spark as Spark SQL external datasource, and Spark Streaming as a custom receiver. + + +## Contents: +0. [Linking](#Linking) +1. [Implementation of RelationProvider](#implementation-of-relationProvider) +2. [Implementation of Receiver](#implementation-of-Receiver) +3. [Sample applications](#Sample-application) + 1. [Using SQL In Python](#Using-SQL-In-Python) + 2. [Using SQL In Scala](#Using-SQL-In-Scala) + 3. [Using DataFrame In Python](#Using-DataFrame-In-Python) + 4. [Using DataFrame In Scala](#Using-DataFrame-In-Scala) + 5. [Using Streams In Scala](#Using-Streams-In-Scala) +4. [Configuration Overview](#Configuration-Overview) +5. [Known limitations and areas for improvement](#Known-limitations) + + +
+ +## Linking + +Using SBT: + + libraryDependencies += "org.apache.bahir" %% "spark-sql-cloudant" % "2.2.0-SNAPSHOT" + +Using Maven: + + + org.apache.bahir + spark-sql-cloudant_2.11 + 2.2.0-SNAPSHOT + + +This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option. +For example, to include it when starting the spark shell: + + $ bin/spark-shell --packages org.apache.bahir:spark-sql-cloudant_2.11:2.2.0-SNAPSHOT + +Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath. +The `--packages` argument can also be used with `bin/spark-submit`. + +This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards. + + +
+ +### Implementation of RelationProvider + +[DefaultSource.scala](src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala) is a RelationProvider for loading data from Cloudant to Spark, and saving it back from Cloudant to Spark. It has the following functionalities: + +Functionality | Enablement +--- | --- +Table Option | database or path, search index, view +Scan Type | PrunedFilteredScan +Column Pruning | yes +Predicates Push Down | _id or first predicate +Parallel Loading | yes, except with search index +Insert-able | yes + + +
+ +### Implementation of Receiver + +Spark Cloudant connector creates a discretized stream in Spark (Spark input DStream) out of Cloudant data sources. [CloudantReceiver.scala](src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala) is a custom Receiver that converts `_changes` feed from a Cloudant database to DStream in Spark. This allows all sorts of processing on this streamed data including [using DataFrames and SQL operations on it](examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala). + +**NOTE:** Since CloudantReceiver for Spark Streaming is based on `_changes` API, there are some limitations that application developers should be aware of. Firstly, results returned from `_changes` are partially ordered, and may not be presented in order in which documents were updated. Secondly, in case of shards' unavailability, you may see duplicates, changes that have been seen already. Thus, it is up to applications using Spark Streaming with CloudantReceiver to keep track of _changes they have processed and detect duplicates. + + +
+ +## Sample applications + +
+ +### Using SQL In Python + +[CloudantApp.py](examples/python/CloudantApp.py) + +```python +spark = SparkSession\ + .builder\ + .appName("Cloudant Spark SQL Example in Python using temp tables")\ + .config("cloudant.host","ACCOUNT.cloudant.com")\ + .config("cloudant.username", "USERNAME")\ + .config("cloudant.password","PASSWORD")\ + .getOrCreate() + + +#### Loading temp table from Cloudant db +spark.sql(" CREATE TEMPORARY TABLE airportTable USING org.apache.bahir.cloudant OPTIONS ( database 'n_airportcodemapping')") +airportData = spark.sql("SELECT _id, airportName FROM airportTable WHERE _id >= 'CAA' AND _id <= 'GAA' ORDER BY _id") +airportData.printSchema() +print 'Total # of rows in airportData: ' + str(airportData.count()) +for code in airportData.collect(): + print code._id +``` + +
+ +### Using SQL In Scala + + +[CloudantApp.scala](examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantApp.scala) + +```scala +val spark = SparkSession + .builder() + .appName("Cloudant Spark SQL Example") + .config("cloudant.host","ACCOUNT.cloudant.com") + .config("cloudant.username", "USERNAME") + .config("cloudant.password","PASSWORD") + .getOrCreate() + +// For implicit conversions of Dataframe to RDDs +import spark.implicits._ + +// create a temp table from Cloudant db and query it using sql syntax +spark.sql( + s""" + |CREATE TEMPORARY TABLE airportTable + |USING org.apache.bahir.cloudant.spark + |OPTIONS ( database 'n_airportcodemapping') + """.stripMargin) +// create a dataframe +val airportData = spark.sql("SELECT _id, airportName FROM airportTable WHERE _id >= 'CAA' AND _id <= 'GAA' ORDER BY _id") +airportData.printSchema() +println(s"Total # of rows in airportData: " + airportData.count()) +// convert dataframe to array of Rows, and process each row +airportData.map(t => "code: " + t(0) + ",name:" + t(1)).collect().foreach(println) + +``` + + +
+ +### Using DataFrame In Python + +[CloudantDF.py](examples/python/CloudantDF.py). + +```python +spark = SparkSession\ + .builder\ + .appName("Cloudant Spark SQL Example in Python using dataframes")\ + .config("cloudant.host","ACCOUNT.cloudant.com")\ + .config("cloudant.username", "USERNAME")\ + .config("cloudant.password","PASSWORD")\ + .config("jsonstore.rdd.partitions", 8)\ + .getOrCreate() + +#### Loading dataframe from Cloudant db +df = spark.read.load("n_airportcodemapping", "org.apache.bahir.cloudant") +df.cache() +df.printSchema() +df.filter(df.airportName >= 'Moscow').select("_id",'airportName').show() +df.filter(df._id >= 'CAA').select("_id",'airportName').show() +``` + +In case of doing multiple operations on a dataframe (select, filter etc.), +you should persist a dataframe. Otherwise, every operation on a dataframe will load the same data from Cloudant again. +Persisting will also speed up computation. This statement will persist an RDD in memory: `df.cache()`. Alternatively for large dbs to persist in memory & disk, use: + +```python +from pyspark import StorageLevel +df.persist(storageLevel = StorageLevel(True, True, False, True, 1)) +``` + +[Sample code on using DataFrame option to define cloudant configuration](examples/python/CloudantDFOption.py) + +
+ +### Using DataFrame In Scala + +[CloudantDF.scala](examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantDF.scala) + +``` scala +val spark = SparkSession + .builder() + .appName("Cloudant Spark SQL Example with Dataframe") + .config("cloudant.host","ACCOUNT.cloudant.com") + .config("cloudant.username", "USERNAME") + .config("cloudant.password","PASSWORD") + .config("createDBOnSave","true") // to create a db on save + .config("jsonstore.rdd.partitions", "20") // using 20 partitions + .getOrCreate() + +// 1. Loading data from Cloudant db +val df = spark.read.format("org.apache.bahir.cloudant").load("n_flight") +// Caching df in memory to speed computations +// and not to retrieve data from cloudant again +df.cache() +df.printSchema() + +// 2. Saving dataframe to Cloudant db +val df2 = df.filter(df("flightSegmentId") === "AA106") + .select("flightSegmentId","economyClassBaseCost") +df2.show() +df2.write.format("org.apache.bahir.cloudant").save("n_flight2") +``` + + [Sample code on using DataFrame option to define cloudant configuration](examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantDFOption.scala) + + +
+ +### Using Streams In Scala +[CloudantStreaming.scala](examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala) + +```scala +val ssc = new StreamingContext(sparkConf, Seconds(10)) +val changes = ssc.receiverStream(new CloudantReceiver(Map( + "cloudant.host" -> "ACCOUNT.cloudant.com", + "cloudant.username" -> "USERNAME", + "cloudant.password" -> "PASSWORD", + "database" -> "n_airportcodemapping"))) + +changes.foreachRDD((rdd: RDD[String], time: Time) => { + // Get the singleton instance of SparkSession + val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf) + + println(s"========= $time =========") + // Convert RDD[String] to DataFrame + val changesDataFrame = spark.read.json(rdd) + if (!changesDataFrame.schema.isEmpty) { + changesDataFrame.printSchema() + changesDataFrame.select("*").show() + .... + } +}) +ssc.start() +// run streaming for 120 secs +Thread.sleep(120000L) +ssc.stop(true) + +``` + +By default, Spark Streaming will load all documents from a database. If you want to limit the loading to specific documents, use `selector` option of `CloudantReceiver` and specify your conditions ([CloudantStreamingSelector.scala](examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala)): + +```scala +val changes = ssc.receiverStream(new CloudantReceiver(Map( + "cloudant.host" -> "ACCOUNT.cloudant.com", + "cloudant.username" -> "USERNAME", + "cloudant.password" -> "PASSWORD", + "database" -> "sales", + "selector" -> "{\"month\":\"May\", \"rep\":\"John\"}"))) +``` + + +
+ +## Configuration Overview + +The configuration is obtained in the following sequence: + +1. default in the Config, which is set in the application.conf +2. key in the SparkConf, which is set in SparkConf +3. key in the parameters, which is set in a dataframe or temporaty table options +4. "spark."+key in the SparkConf (as they are treated as the one passed in through spark-submit using --conf option) + +Here each subsequent configuration overrides the previous one. Thus, configuration set using DataFrame option overrides what has beens set in SparkConf. And configuration passed in spark-submit using --conf takes precedence over any setting in the code. + + +### Configuration in application.conf +Default values are defined in [here](src/main/resources/application.conf) + +### Configuration on SparkConf + +Name | Default | Meaning +--- |:---:| --- +cloudant.protocol|https|protocol to use to transfer data: http or https +cloudant.host||cloudant host url +cloudant.username||cloudant userid +cloudant.password||cloudant password +jsonstore.rdd.partitions|10|the number of partitions intent used to drive JsonStoreRDD loading query result in parallel. The actual number is calculated based on total rows returned and satisfying maxInPartition and minInPartition +jsonstore.rdd.maxInPartition|-1|the max rows in a partition. -1 means unlimited +jsonstore.rdd.minInPartition|10|the min rows in a partition. +jsonstore.rdd.requestTimeout|900000| the request timeout in milliseconds +bulkSize|200| the bulk save size +schemaSampleSize| "-1" | the sample size for RDD schema discovery. 1 means we are using only first document for schema discovery; -1 means all documents; 0 will be treated as 1; any number N means min(N, total) docs +createDBOnSave|"false"| whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised. + + +### Configuration on Spark SQL Temporary Table or DataFrame + +Besides all the configurations passed to a temporary table or dataframe through SparkConf, it is also possible to set the following configurations in temporary table or dataframe using OPTIONS: + +Name | Default | Meaning +--- |:---:| --- +database||cloudant database name +view||cloudant view w/o the database name. only used for load. +index||cloudant search index w/o the database name. only used for load data with less than or equal to 200 results. +path||cloudant: as database name if database is not present +schemaSampleSize|"-1"| the sample size used to discover the schema for this temp table. -1 scans all documents +bulkSize|200| the bulk save size +createDBOnSave|"false"| whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised. + +For fast loading, views are loaded without include_docs. Thus, a derived schema will always be: `{id, key, value}`, where `value `can be a compound field. An example of loading data from a view: + +```python +spark.sql(" CREATE TEMPORARY TABLE flightTable1 USING org.apache.bahir.cloudant OPTIONS ( database 'n_flight', view '_design/view/_view/AA0')") + +``` + +### Configuration on Cloudant Receiver for Spark Streaming + +Name | Default | Meaning +--- |:---:| --- +cloudant.host||cloudant host url +cloudant.username||cloudant userid +cloudant.password||cloudant passwor +database||cloudant database name +selector| all documents| a selector written in Cloudant Query syntax, specifying conditions for selecting documents. Only documents satisfying the selector's conditions will be retrieved from Cloudant and loaded into Spark. + + +### Configuration in spark-submit using --conf option + +The above stated configuration keys can also be set using `spark-submit --conf` option. When passing configuration in spark-submit, make sure adding "spark." as prefix to the keys. + + +
+ +## Known limitations and areas for improvement + +* Loading data from Cloudant search index will work only for up to 200 results. + +* Need to improve how number of partitions is determined for parallel loading diff --git a/sql-cloudant/examples/python/CloudantApp.py b/sql-cloudant/examples/python/CloudantApp.py new file mode 100644 index 00000000..029f39b0 --- /dev/null +++ b/sql-cloudant/examples/python/CloudantApp.py @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pprint +from pyspark.sql import SparkSession + +spark = SparkSession\ + .builder\ + .appName("Cloudant Spark SQL Example in Python using temp tables")\ + .config("cloudant.host","ACCOUNT.cloudant.com")\ + .config("cloudant.username", "USERNAME")\ + .config("cloudant.password","PASSWORD")\ + .getOrCreate() + + +# ***1. Loading temp table from Cloudant db +spark.sql(" CREATE TEMPORARY TABLE airportTable USING org.apache.bahir.cloudant OPTIONS ( database 'n_airportcodemapping')") +airportData = spark.sql("SELECT _id, airportName FROM airportTable WHERE _id >= 'CAA' AND _id <= 'GAA' ORDER BY _id") +airportData.printSchema() +print 'Total # of rows in airportData: ' + str(airportData.count()) +for code in airportData.collect(): + print code._id + + +# ***2. Loading temp table from Cloudant search index +print 'About to test org.apache.bahir.cloudant for flight with index' +spark.sql(" CREATE TEMPORARY TABLE flightTable1 USING org.apache.bahir.cloudant OPTIONS ( database 'n_flight', index '_design/view/_search/n_flights')") +flightData = spark.sql("SELECT flightSegmentId, scheduledDepartureTime FROM flightTable1 WHERE flightSegmentId >'AA9' AND flightSegmentId<'AA95'") +flightData.printSchema() +for code in flightData.collect(): + print 'Flight {0} on {1}'.format(code.flightSegmentId, code.scheduledDepartureTime) + diff --git a/sql-cloudant/examples/python/CloudantDF.py b/sql-cloudant/examples/python/CloudantDF.py new file mode 100644 index 00000000..c009e982 --- /dev/null +++ b/sql-cloudant/examples/python/CloudantDF.py @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pprint +from pyspark.sql import SparkSession + +# define cloudant related configuration +# set protocol to http if needed, default value=https +# config("cloudant.protocol","http") +spark = SparkSession\ + .builder\ + .appName("Cloudant Spark SQL Example in Python using dataframes")\ + .config("cloudant.host","ACCOUNT.cloudant.com")\ + .config("cloudant.username", "USERNAME")\ + .config("cloudant.password","PASSWORD")\ + .config("jsonstore.rdd.partitions", 8)\ + .getOrCreate() + + +# ***1. Loading dataframe from Cloudant db +df = spark.read.load("n_airportcodemapping", "org.apache.bahir.cloudant") +# In case of doing multiple operations on a dataframe (select, filter etc.) +# you should persist the dataframe. +# Othewise, every operation on the dataframe will load the same data from Cloudant again. +# Persisting will also speed up computation. +df.cache() # persisting in memory +# alternatively for large dbs to persist in memory & disk: +# from pyspark import StorageLevel +# df.persist(storageLevel = StorageLevel(True, True, False, True, 1)) +df.printSchema() +df.filter(df.airportName >= 'Moscow').select("_id",'airportName').show() +df.filter(df._id >= 'CAA').select("_id",'airportName').show() + + +# ***2. Saving a datafram to Cloudant db +df = spark.read.load(format="org.apache.bahir.cloudant", database="n_flight") +df.printSchema() +df2 = df.filter(df.flightSegmentId=='AA106')\ + .select("flightSegmentId", "economyClassBaseCost") +df2.write.save("n_flight2", "org.apache.bahir.cloudant", + bulkSize = "100", createDBOnSave="true") +total = df.filter(df.flightSegmentId >'AA9').select("flightSegmentId", + "scheduledDepartureTime").orderBy(df.flightSegmentId).count() +print "Total", total, "flights from table" + + +# ***3. Loading dataframe from a Cloudant search index +df = spark.read.load(format="org.apache.bahir.cloudant", database="n_flight", + index="_design/view/_search/n_flights") +df.printSchema() +total = df.filter(df.flightSegmentId >'AA9').select("flightSegmentId", + "scheduledDepartureTime").orderBy(df.flightSegmentId).count() +print "Total", total, "flights from index" + + +# ***4. Loading dataframe from a Cloudant view +df = spark.read.load(format="org.apache.bahir.cloudant", path="n_flight", + view="_design/view/_view/AA0", schemaSampleSize="20") +# schema for view will always be: _id, key, value +# where value can be a complex field +df.printSchema() +df.show() diff --git a/sql-cloudant/examples/python/CloudantDFOption.py b/sql-cloudant/examples/python/CloudantDFOption.py new file mode 100644 index 00000000..c0455322 --- /dev/null +++ b/sql-cloudant/examples/python/CloudantDFOption.py @@ -0,0 +1,72 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pprint +from pyspark.sql import SparkSession + +spark = SparkSession\ + .builder\ + .appName("Cloudant Spark SQL Example in Python using dataframes with options")\ + .getOrCreate() + +cloudant_host = "ACCOUNT.cloudant.com" +cloudant_username = "USERNAME" +cloudant_password = "PASSWORD" + +# ***1. Loading dataframe from Cloudant db +df = spark.read.format("org.apache.bahir.cloudant") \ + .option("cloudant.host", cloudant_host) \ + .option("cloudant.username", cloudant_username) \ + .option("cloudant.password", cloudant_password) \ + .load("n_airportcodemapping") +df.cache() # persisting in memory +df.printSchema() +df.filter(df._id >= 'CAA').select("_id",'airportName').show() + + +# ***2.Saving dataframe to Cloudant db +df.filter(df._id >= 'CAA').select("_id",'airportName') \ + .write.format("org.apache.bahir.cloudant") \ + .option("cloudant.host", cloudant_host) \ + .option("cloudant.username", cloudant_username) \ + .option("cloudant.password",cloudant_password) \ + .option("bulkSize","100") \ + .option("createDBOnSave", "true") \ + .save("airportcodemapping_df") +df = spark.read.format("org.apache.bahir.cloudant") \ + .option("cloudant.host", cloudant_host) \ + .option("cloudant.username", cloudant_username) \ + .option("cloudant.password", cloudant_password) \ + .load("n_flight") +df.printSchema() +total = df.filter(df.flightSegmentId >'AA9') \ + .select("flightSegmentId", "scheduledDepartureTime") \ + .orderBy(df.flightSegmentId).count() +print "Total", total, "flights from table" + + +# ***3. Loading dataframe from Cloudant search index +df = spark.read.format("org.apache.bahir.cloudant") \ + .option("cloudant.host",cloudant_host) \ + .option("cloudant.username",cloudant_username) \ + .option("cloudant.password",cloudant_password) \ + .option("index","_design/view/_search/n_flights").load("n_flight") +df.printSchema() + +total = df.filter(df.flightSegmentId >'AA9') \ + .select("flightSegmentId", "scheduledDepartureTime") \ + .orderBy(df.flightSegmentId).count() +print "Total", total, "flights from index" diff --git a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantApp.scala b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantApp.scala new file mode 100644 index 00000000..d3e5ecc1 --- /dev/null +++ b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantApp.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.sql.cloudant + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.SQLContext + +object CloudantApp { + def main(args: Array[String]) { + val spark = SparkSession + .builder() + .appName("Cloudant Spark SQL Example") + .config("cloudant.host", "ACCOUNT.cloudant.com") + .config("cloudant.username", "USERNAME") + .config("cloudant.password", "PASSWORD") + .getOrCreate() + + // For implicit conversions of Dataframe to RDDs + import spark.implicits._ + + // create a temp table from Cloudant db and query it using sql syntax + spark.sql( + s""" + |CREATE TEMPORARY TABLE airportTable + |USING org.apache.bahir.cloudant + |OPTIONS ( database 'n_airportcodemapping') + """.stripMargin) + // create a dataframe + val airportData = spark.sql( + s""" + |SELECT _id, airportName + |FROM airportTable + |WHERE _id >= 'CAA' AND _id <= 'GAA' ORDER BY _id + """.stripMargin) + airportData.printSchema() + println(s"Total # of rows in airportData: " + airportData.count()) // scalastyle:ignore + // convert dataframe to array of Rows, and process each row + airportData.map(t => "code: " + t(0) + ",name:" + t(1)).collect().foreach(println) // scalastyle:ignore + + // create a temp table from Cloudant index and query it using sql syntax + spark.sql( + s""" + |CREATE TEMPORARY TABLE flightTable + |USING org.apache.bahir.cloudant + |OPTIONS (database 'n_flight', index '_design/view/_search/n_flights') + """.stripMargin) + val flightData = spark.sql( + s""" + |SELECT flightSegmentId, scheduledDepartureTime + |FROM flightTable + |WHERE flightSegmentId >'AA9' AND flightSegmentId<'AA95' + """.stripMargin) + flightData.printSchema() + flightData.map(t => "flightSegmentId: " + t(0) + ", scheduledDepartureTime: " + t(1)) + .collect().foreach(println) // scalastyle:ignore + } +} diff --git a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantDF.scala b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantDF.scala new file mode 100644 index 00000000..d97b6884 --- /dev/null +++ b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantDF.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.sql.cloudant + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.SQLContext +import org.apache.spark.storage.StorageLevel + +object CloudantDF{ + def main(args: Array[String]) { + val spark = SparkSession + .builder() + .appName("Cloudant Spark SQL Example with Dataframe") + .config("cloudant.host", "ACCOUNT.cloudant.com") + .config("cloudant.username", "USERNAME") + .config("cloudant.password", "PASSWORD") + .config("createDBOnSave", "true") // to create a db on save + .config("jsonstore.rdd.partitions", "20") // using 20 partitions + .getOrCreate() + + // 1. Loading data from Cloudant db + val df = spark.read.format("org.apache.bahir.cloudant").load("n_flight") + // Caching df in memory to speed computations + // and not to retrieve data from cloudant again + df.cache() + df.printSchema() + + // 2. Saving dataframe to Cloudant db + val df2 = df.filter(df("flightSegmentId") === "AA106") + .select("flightSegmentId", "economyClassBaseCost") + df2.show() + df2.write.format("org.apache.bahir.cloudant").save("n_flight2") + + // 3. Loading data from Cloudant search index + val df3 = spark.read.format("org.apache.bahir.cloudant") + .option("index", "_design/view/_search/n_flights").load("n_flight") + val total = df3.filter(df3("flightSegmentId") >"AA9") + .select("flightSegmentId", "scheduledDepartureTime") + .orderBy(df3("flightSegmentId")).count() + println(s"Total $total flights from index") // scalastyle:ignore + + // 4. Loading data from view + val df4 = spark.read.format("org.apache.bahir.cloudant") + .option("view", "_design/view/_view/AA0").load("n_flight") + df4.printSchema() + df4.show() + } +} diff --git a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantDFOption.scala b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantDFOption.scala new file mode 100644 index 00000000..164ca214 --- /dev/null +++ b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantDFOption.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.sql.cloudant + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.SQLContext +import org.apache.spark.storage.StorageLevel + +object CloudantDFOption{ + def main(args: Array[String]) { + val spark = SparkSession + .builder() + .appName("Cloudant Spark SQL Example with Dataframe using Option") + .getOrCreate() + + val cloudantHost = "ACCOUNT.cloudant.com" + val cloudantUser = "USERNAME" + val cloudantPassword = "PASSWORD" + + // 1. Loading data from Cloudant db + val df = spark.read.format("org.apache.bahir.cloudant") + .option("cloudant.host", cloudantHost) + .option("cloudant.username", cloudantUser) + .option("cloudant.password", cloudantPassword) + .load("n_airportcodemapping") + + df.cache() + df.printSchema() + df.filter(df("_id") >= "CAA").select("_id", "airportName").show() + + // 2. Saving dataframe to Cloudant db + // To create a Cloudant db during save set the option createDBOnSave=true + df.filter(df("_id") >= "CAA") + .select("_id", "airportName") + .write.format("org.apache.bahir.cloudant") + .option("cloudant.host", cloudantHost) + .option("cloudant.username", cloudantUser) + .option("cloudant.password", cloudantPassword) + .option("createDBOnSave", "true") + .save("airportcodemapping_df") + + // 3. Loading data from Cloudant search index + val df2 = spark.read.format("org.apache.bahir.cloudant") + .option("index", "_design/view/_search/n_flights") + .option("cloudant.host", cloudantHost) + .option("cloudant.username", cloudantUser) + .option("cloudant.password", cloudantPassword) + .load("n_flight") + val total2 = df2.filter(df2("flightSegmentId") >"AA9") + .select("flightSegmentId", "scheduledDepartureTime") + .orderBy(df2("flightSegmentId")) + .count() + println(s"Total $total2 flights from index")// scalastyle:ignore + } +} diff --git a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala new file mode 100644 index 00000000..85350b3a --- /dev/null +++ b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.sql.cloudant + +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.streaming.{ Seconds, StreamingContext, Time } +import org.apache.spark.streaming.scheduler.{ StreamingListener, StreamingListenerReceiverError} + +import org.apache.bahir.cloudant.CloudantReceiver + +object CloudantStreaming { + def main(args: Array[String]) { + val sparkConf = new SparkConf().setAppName("Cloudant Spark SQL External Datasource in Scala") + // Create the context with a 10 seconds batch size + val ssc = new StreamingContext(sparkConf, Seconds(10)) + + val changes = ssc.receiverStream(new CloudantReceiver(Map( + "cloudant.host" -> "ACCOUNT.cloudant.com", + "cloudant.username" -> "USERNAME", + "cloudant.password" -> "PASSWORD", + "database" -> "n_airportcodemapping"))) + + changes.foreachRDD((rdd: RDD[String], time: Time) => { + // Get the singleton instance of SparkSession + val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf) + + println(s"========= $time =========")// scalastyle:ignore + // Convert RDD[String] to DataFrame + val changesDataFrame = spark.read.json(rdd) + if (!changesDataFrame.schema.isEmpty) { + changesDataFrame.printSchema() + changesDataFrame.select("*").show() + + var hasDelRecord = false + var hasAirportNameField = false + for (field <- changesDataFrame.schema.fieldNames) { + if ("_deleted".equals(field)) { + hasDelRecord = true + } + if ("airportName".equals(field)) { + hasAirportNameField = true + } + } + if (hasDelRecord) { + changesDataFrame.filter(changesDataFrame("_deleted")).select("*").show() + } + + if (hasAirportNameField) { + changesDataFrame.filter(changesDataFrame("airportName") >= "Paris").select("*").show() + changesDataFrame.registerTempTable("airportcodemapping") + val airportCountsDataFrame = + spark.sql( + s""" + |select airportName, count(*) as total + |from airportcodemapping + |group by airportName") + """.stripMargin) + airportCountsDataFrame.show() + } + } + + }) + ssc.start() + // run streaming for 120 secs + Thread.sleep(120000L) + ssc.stop(true) + } +} + +/** Lazily instantiated singleton instance of SparkSession */ +object SparkSessionSingleton { + @transient private var instance: SparkSession = _ + def getInstance(sparkConf: SparkConf): SparkSession = { + if (instance == null) { + instance = SparkSession + .builder + .config(sparkConf) + .getOrCreate() + } + instance + } +} diff --git a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala new file mode 100644 index 00000000..5e82a39b --- /dev/null +++ b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.sql.cloudant + +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.streaming.{ Seconds, StreamingContext, Time } + +import org.apache.bahir.cloudant.CloudantReceiver + +object CloudantStreamingSelector { + def main(args: Array[String]) { + val sparkConf = new SparkConf().setAppName("Cloudant Spark SQL External Datasource in Scala") + + // Create the context with a 10 seconds batch size + val ssc = new StreamingContext(sparkConf, Seconds(10)) + val curTotalAmount = new AtomicLong(0) + val curSalesCount = new AtomicLong(0) + var batchAmount = 0L + + val changes = ssc.receiverStream(new CloudantReceiver(Map( + "cloudant.host" -> "ACCOUNT.cloudant.com", + "cloudant.username" -> "USERNAME", + "cloudant.password" -> "PASSWORD", + "database" -> "sales", + "selector" -> "{\"month\":\"May\", \"rep\":\"John\"}"))) + + changes.foreachRDD((rdd: RDD[String], time: Time) => { + // Get the singleton instance of SQLContext + val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf) + println(s"========= $time =========") // scalastyle:ignore + val changesDataFrame = spark.read.json(rdd) + if (!changesDataFrame.schema.isEmpty) { + changesDataFrame.select("*").show() + batchAmount = changesDataFrame.groupBy().sum("amount").collect()(0).getLong(0) + curSalesCount.getAndAdd(changesDataFrame.count()) + curTotalAmount.getAndAdd(batchAmount) + println("Current sales count:" + curSalesCount)// scalastyle:ignore + println("Current total amount:" + curTotalAmount)// scalastyle:ignore + } + }) + + ssc.start() + ssc.awaitTermination() + } +} diff --git a/sql-cloudant/pom.xml b/sql-cloudant/pom.xml index e4900d5a..58600333 100644 --- a/sql-cloudant/pom.xml +++ b/sql-cloudant/pom.xml @@ -104,6 +104,11 @@ + + + src/main/resources + + target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala index 0fcd4849..c3376f77 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala @@ -77,8 +77,7 @@ class CloudantReceiver(cloudantParams: Map[String, String]) }) } else { val status = headers.getOrElse("Status", IndexedSeq.empty) - val errorMsg = "Error retrieving changes feed for a database %s : %s " - (config.getDbname(), status(0)) + val errorMsg = "Error retrieving _changes feed " + config.getDbname() + ": " + status(0) reportError(errorMsg, new RuntimeException(errorMsg)) } }) From 6784391b1bf4afe960ad13446afca226d5ae9b70 Mon Sep 17 00:00:00 2001 From: Yang Lei Date: Thu, 30 Mar 2017 22:53:02 -0400 Subject: [PATCH 3/7] remove duplicated configuration --- sql-cloudant/README.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql-cloudant/README.md b/sql-cloudant/README.md index 36be297a..0e7eecc1 100644 --- a/sql-cloudant/README.md +++ b/sql-cloudant/README.md @@ -293,9 +293,6 @@ database||cloudant database name view||cloudant view w/o the database name. only used for load. index||cloudant search index w/o the database name. only used for load data with less than or equal to 200 results. path||cloudant: as database name if database is not present -schemaSampleSize|"-1"| the sample size used to discover the schema for this temp table. -1 scans all documents -bulkSize|200| the bulk save size -createDBOnSave|"false"| whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised. For fast loading, views are loaded without include_docs. Thus, a derived schema will always be: `{id, key, value}`, where `value `can be a compound field. An example of loading data from a view: From 8998b0579c433dc6ee5feca34ed952fff1f6fe01 Mon Sep 17 00:00:00 2001 From: Yang Lei Date: Thu, 30 Mar 2017 23:04:01 -0400 Subject: [PATCH 4/7] remove duplicate configuration --- sql-cloudant/README.md | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sql-cloudant/README.md b/sql-cloudant/README.md index 0e7eecc1..95da1b39 100644 --- a/sql-cloudant/README.md +++ b/sql-cloudant/README.md @@ -257,13 +257,14 @@ The configuration is obtained in the following sequence: 1. default in the Config, which is set in the application.conf 2. key in the SparkConf, which is set in SparkConf -3. key in the parameters, which is set in a dataframe or temporaty table options +3. key in the parameters, which is set in a dataframe or temporaty table options, or StreamReceiver 4. "spark."+key in the SparkConf (as they are treated as the one passed in through spark-submit using --conf option) Here each subsequent configuration overrides the previous one. Thus, configuration set using DataFrame option overrides what has beens set in SparkConf. And configuration passed in spark-submit using --conf takes precedence over any setting in the code. ### Configuration in application.conf + Default values are defined in [here](src/main/resources/application.conf) ### Configuration on SparkConf @@ -285,7 +286,7 @@ createDBOnSave|"false"| whether to create a new database during save operation. ### Configuration on Spark SQL Temporary Table or DataFrame -Besides all the configurations passed to a temporary table or dataframe through SparkConf, it is also possible to set the following configurations in temporary table or dataframe using OPTIONS: +You can set the following configurations at temporary table or dataframe level, besides overriding any SparkConf configuration. Name | Default | Meaning --- |:---:| --- @@ -303,11 +304,11 @@ spark.sql(" CREATE TEMPORARY TABLE flightTable1 USING org.apache.bahir.cloudant ### Configuration on Cloudant Receiver for Spark Streaming +You can set the following configurations at stream Receiver level, besides overriding any SparkConf configuration. +: + Name | Default | Meaning --- |:---:| --- -cloudant.host||cloudant host url -cloudant.username||cloudant userid -cloudant.password||cloudant passwor database||cloudant database name selector| all documents| a selector written in Cloudant Query syntax, specifying conditions for selecting documents. Only documents satisfying the selector's conditions will be retrieved from Cloudant and loaded into Spark. From 5d5a67d51c6b6aaf34774c1d9df217d975082eba Mon Sep 17 00:00:00 2001 From: Yang Lei Date: Thu, 30 Mar 2017 23:06:38 -0400 Subject: [PATCH 5/7] improve README --- sql-cloudant/README.md | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/sql-cloudant/README.md b/sql-cloudant/README.md index 95da1b39..4a560d90 100644 --- a/sql-cloudant/README.md +++ b/sql-cloudant/README.md @@ -260,7 +260,7 @@ The configuration is obtained in the following sequence: 3. key in the parameters, which is set in a dataframe or temporaty table options, or StreamReceiver 4. "spark."+key in the SparkConf (as they are treated as the one passed in through spark-submit using --conf option) -Here each subsequent configuration overrides the previous one. Thus, configuration set using DataFrame option overrides what has beens set in SparkConf. And configuration passed in spark-submit using --conf takes precedence over any setting in the code. +Here each subsequent configuration overrides the previous one. Thus, configuration set using DataFrame option overrides what has beens set in SparkConf. And configuration passed in spark-submit using --conf takes precedence over any setting in the code. When passing configuration in spark-submit, make sure adding "spark." as prefix to the keys. ### Configuration in application.conf @@ -313,11 +313,6 @@ database||cloudant database name selector| all documents| a selector written in Cloudant Query syntax, specifying conditions for selecting documents. Only documents satisfying the selector's conditions will be retrieved from Cloudant and loaded into Spark. -### Configuration in spark-submit using --conf option - -The above stated configuration keys can also be set using `spark-submit --conf` option. When passing configuration in spark-submit, make sure adding "spark." as prefix to the keys. - -
## Known limitations and areas for improvement From 863e94832c6f50f80417865cf1c9e2cbbd8fe983 Mon Sep 17 00:00:00 2001 From: Yang Lei Date: Thu, 30 Mar 2017 23:07:44 -0400 Subject: [PATCH 6/7] improve README --- sql-cloudant/README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/sql-cloudant/README.md b/sql-cloudant/README.md index 4a560d90..f976ba02 100644 --- a/sql-cloudant/README.md +++ b/sql-cloudant/README.md @@ -305,7 +305,6 @@ spark.sql(" CREATE TEMPORARY TABLE flightTable1 USING org.apache.bahir.cloudant ### Configuration on Cloudant Receiver for Spark Streaming You can set the following configurations at stream Receiver level, besides overriding any SparkConf configuration. -: Name | Default | Meaning --- |:---:| --- From 5ccdae38741c33e72d49697e919e5230c5cb1722 Mon Sep 17 00:00:00 2001 From: Yang Lei Date: Fri, 31 Mar 2017 15:09:03 -0400 Subject: [PATCH 7/7] enable streaming receiver to honor --conf and add cloudant document link to the main README --- README.md | 1 + sql-cloudant/README.md | 11 +++++++---- .../examples/sql/cloudant/CloudantStreaming.scala | 4 ++-- .../sql/cloudant/CloudantStreamingSelector.scala | 2 +- .../org/apache/bahir/cloudant/CloudantReceiver.scala | 5 +++-- .../cloudant/common/JsonStoreConfigManager.scala | 3 +-- 6 files changed, 15 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index ff8599b2..ebbaea7d 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,7 @@ Currently, each submodule has its own README.md, with information on example usa * [Streaming Mqtt](https://github.com/apache/bahir/blob/master/streaming-mqtt/README.md) * [Streaming Zeromq](https://github.com/apache/bahir/blob/master/streaming-zeromq/README.md) * [Streaming Twitter](https://github.com/apache/bahir/blob/master/streaming-twitter/README.md) +* [SQL Cloudant](sql-cloudant/README.md) Furthermore, to generate scaladocs for each module: diff --git a/sql-cloudant/README.md b/sql-cloudant/README.md index f976ba02..98a1c856 100644 --- a/sql-cloudant/README.md +++ b/sql-cloudant/README.md @@ -286,16 +286,19 @@ createDBOnSave|"false"| whether to create a new database during save operation. ### Configuration on Spark SQL Temporary Table or DataFrame -You can set the following configurations at temporary table or dataframe level, besides overriding any SparkConf configuration. +Besides overriding any SparkConf configuration, you can also set the following configurations at temporary table or dataframe level. Name | Default | Meaning --- |:---:| --- database||cloudant database name -view||cloudant view w/o the database name. only used for load. +view||cloudant view w/o the database name. only used for load. index||cloudant search index w/o the database name. only used for load data with less than or equal to 200 results. path||cloudant: as database name if database is not present -For fast loading, views are loaded without include_docs. Thus, a derived schema will always be: `{id, key, value}`, where `value `can be a compound field. An example of loading data from a view: + +#### View Specific + +For fast loading, views are loaded without include_docs. Thus, a derived schema will always be: `{id, key, value}`, where `value `can be a compound field. An example of loading data from a view: ```python spark.sql(" CREATE TEMPORARY TABLE flightTable1 USING org.apache.bahir.cloudant OPTIONS ( database 'n_flight', view '_design/view/_view/AA0')") @@ -304,7 +307,7 @@ spark.sql(" CREATE TEMPORARY TABLE flightTable1 USING org.apache.bahir.cloudant ### Configuration on Cloudant Receiver for Spark Streaming -You can set the following configurations at stream Receiver level, besides overriding any SparkConf configuration. +Besides overriding any SparkConf configuration, you can also set the following configurations at stream Receiver level Name | Default | Meaning --- |:---:| --- diff --git a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala index 85350b3a..a1de6965 100644 --- a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala +++ b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala @@ -31,7 +31,7 @@ object CloudantStreaming { // Create the context with a 10 seconds batch size val ssc = new StreamingContext(sparkConf, Seconds(10)) - val changes = ssc.receiverStream(new CloudantReceiver(Map( + val changes = ssc.receiverStream(new CloudantReceiver(sparkConf, Map( "cloudant.host" -> "ACCOUNT.cloudant.com", "cloudant.username" -> "USERNAME", "cloudant.password" -> "PASSWORD", @@ -70,7 +70,7 @@ object CloudantStreaming { s""" |select airportName, count(*) as total |from airportcodemapping - |group by airportName") + |group by airportName """.stripMargin) airportCountsDataFrame.show() } diff --git a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala index 5e82a39b..51d939a2 100644 --- a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala +++ b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala @@ -36,7 +36,7 @@ object CloudantStreamingSelector { val curSalesCount = new AtomicLong(0) var batchAmount = 0L - val changes = ssc.receiverStream(new CloudantReceiver(Map( + val changes = ssc.receiverStream(new CloudantReceiver(sparkConf, Map( "cloudant.host" -> "ACCOUNT.cloudant.com", "cloudant.username" -> "USERNAME", "cloudant.password" -> "PASSWORD", diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala index c3376f77..0446660f 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala @@ -23,14 +23,15 @@ import play.api.libs.json.Json import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.SparkConf import org.apache.bahir.cloudant.common._ // scalastyle:on -class CloudantReceiver(cloudantParams: Map[String, String]) +class CloudantReceiver(sparkConf: SparkConf, cloudantParams: Map[String, String]) extends Receiver[String](StorageLevel.MEMORY_AND_DISK) { lazy val config: CloudantConfig = { - JsonStoreConfigManager.getConfig(cloudantParams: Map[String, String]) + JsonStoreConfigManager.getConfig(sparkConf, cloudantParams) .asInstanceOf[CloudantConfig] } diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala index 8df7b63c..92192bba 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala @@ -177,8 +177,7 @@ import org.apache.bahir.cloudant.CloudantConfig } } - def getConfig(parameters: Map[String, String]): CloudantConfig = { - val sparkConf = null + def getConfig(sparkConf: SparkConf, parameters: Map[String, String]): CloudantConfig = { implicit val total = getInt(sparkConf, parameters, PARTITION_CONFIG) implicit val max = getInt(sparkConf, parameters, MAX_IN_PARTITION_CONFIG)