From 30bb54b28220c321b49224945a5f49454d8474ff Mon Sep 17 00:00:00 2001 From: Esteban Laver Date: Fri, 12 Jan 2018 00:26:29 -0500 Subject: [PATCH 1/2] =?UTF-8?q?Fixed=20deprecation=20warning=20messages=20?= =?UTF-8?q?-=20Imported=20=E2=80=98spark.implicits.=5F=E2=80=99=20to=20con?= =?UTF-8?q?vert=20Spark=20RDD=20to=20Dataset=20-=20Replaced=20deprecated?= =?UTF-8?q?=20`json(RDD[String])`=20with=20`json(Dataset[String])`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../apache/bahir/cloudant/DefaultSource.scala | 241 +++++++++--------- 1 file changed, 119 insertions(+), 122 deletions(-) diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala index 36c2c78d..4e26e36a 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala @@ -30,57 +30,58 @@ import org.apache.bahir.cloudant.internal.ChangesReceiver case class CloudantReadWriteRelation (config: CloudantConfig, schema: StructType, dataFrame: DataFrame = null) - (@transient val sqlContext: SQLContext) + (@transient val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan with InsertableRelation { - @transient lazy val dataAccess: JsonStoreDataAccess = {new JsonStoreDataAccess(config)} + @transient lazy val dataAccess: JsonStoreDataAccess = {new JsonStoreDataAccess(config)} - implicit lazy val logger: Logger = LoggerFactory.getLogger(getClass) + implicit lazy val logger: Logger = LoggerFactory.getLogger(getClass) - def buildScan(requiredColumns: Array[String], + import sqlContext.implicits._ + + def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { - val colsLength = requiredColumns.length + val colsLength = requiredColumns.length - if (dataFrame != null) { - if (colsLength == 0) { - dataFrame.select().rdd - } else if (colsLength == 1) { - dataFrame.select(requiredColumns(0)).rdd - } else { - val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i) - dataFrame.select(requiredColumns(0), colsExceptCol0: _*).rdd - } + if (dataFrame != null) { + if (colsLength == 0) { + dataFrame.select().rdd + } else if (colsLength == 1) { + dataFrame.select(requiredColumns(0)).rdd } else { - implicit val columns : Array[String] = requiredColumns - implicit val origFilters : Array[Filter] = filters - - logger.info("buildScan:" + columns + "," + origFilters) - val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config) - val df = sqlContext.read.json(cloudantRDD) - if (colsLength > 1) { - val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i) - df.select(requiredColumns(0), colsExceptCol0: _*).rdd - } else { - df.rdd - } + val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i) + dataFrame.select(requiredColumns(0), colsExceptCol0: _*).rdd + } + } else { + implicit val columns : Array[String] = requiredColumns + implicit val origFilters : Array[Filter] = filters + + logger.info("buildScan:" + columns + "," + origFilters) + val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config) + val df = sqlContext.read.json(cloudantRDD.toDS()) + if (colsLength > 1) { + val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i) + df.select(requiredColumns(0), colsExceptCol0: _*).rdd + } else { + df.rdd } } - + } def insert(data: DataFrame, overwrite: Boolean): Unit = { - if (config.getCreateDBonSave) { - dataAccess.createDB() - } - if (data.count() == 0) { - logger.warn("Database " + config.getDbname + - ": nothing was saved because the number of records was 0!") - } else { - val result = data.toJSON.foreachPartition { x => - val list = x.toList // Has to pass as List, Iterator results in 0 data - dataAccess.saveAll(list) - } + if (config.getCreateDBonSave) { + dataAccess.createDB() + } + if (data.count() == 0) { + logger.warn("Database " + config.getDbname + + ": nothing was saved because the number of records was 0!") + } else { + val result = data.toJSON.foreachPartition { x => + val list = x.toList // Has to pass as List, Iterator results in 0 data + dataAccess.saveAll(list) } } + } } class DefaultSource extends RelationProvider @@ -91,97 +92,93 @@ class DefaultSource extends RelationProvider def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): CloudantReadWriteRelation = { - create(sqlContext, parameters, null) - } - - private def create(sqlContext: SQLContext, - parameters: Map[String, String], - inSchema: StructType) = { - - val config: CloudantConfig = JsonStoreConfigManager.getConfig(sqlContext, parameters) - - var dataFrame: DataFrame = null - - val schema: StructType = { - if (inSchema != null) { - inSchema - } else if (!config.isInstanceOf[CloudantChangesConfig] - || config.viewName != null || config.indexName != null) { - val df = if (config.getSchemaSampleSize == - JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT && - config.viewName == null - && config.indexName == null) { - val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config) - dataFrame = sqlContext.read.json(cloudantRDD) - dataFrame - } else { - val dataAccess = new JsonStoreDataAccess(config) - val aRDD = sqlContext.sparkContext.parallelize( - dataAccess.getMany(config.getSchemaSampleSize)) - sqlContext.read.json(aRDD) - } - df.schema + create(sqlContext, parameters, null) + } + + private def create(sqlContext: SQLContext, + parameters: Map[String, String], + inSchema: StructType) = { + + import sqlContext.implicits._ + + val config: CloudantConfig = JsonStoreConfigManager.getConfig(sqlContext, parameters) + + var dataFrame: DataFrame = null + + val schema: StructType = { + if (inSchema != null) { + inSchema + } else if (!config.isInstanceOf[CloudantChangesConfig] + || config.viewName != null || config.indexName != null) { + val df = if (config.getSchemaSampleSize == + JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT && + config.viewName == null + && config.indexName == null) { + val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config) + dataFrame = sqlContext.read.json(cloudantRDD.toDS()) + dataFrame } else { - /* Create a streaming context to handle transforming docs in - * larger databases into Spark datasets - */ - val changesConfig = config.asInstanceOf[CloudantChangesConfig] - val ssc = new StreamingContext(sqlContext.sparkContext, - Seconds(changesConfig.getBatchInterval)) - - val changes = ssc.receiverStream( - new ChangesReceiver(changesConfig)) - changes.persist(changesConfig.getStorageLevelForStreaming) - - // Global RDD that's created from union of all RDDs - var globalRDD = ssc.sparkContext.emptyRDD[String] - - logger.info("Loading data from Cloudant using " - + changesConfig.getChangesReceiverUrl) - - // Collect and union each RDD to convert all RDDs to a DataFrame - changes.foreachRDD((rdd: RDD[String]) => { - if (!rdd.isEmpty()) { - if (globalRDD != null) { - // Union RDDs in foreach loop - globalRDD = globalRDD.union(rdd) - } else { - globalRDD = rdd - } + val dataAccess = new JsonStoreDataAccess(config) + val aRDD = sqlContext.sparkContext.parallelize( + dataAccess.getMany(config.getSchemaSampleSize)) + sqlContext.read.json(aRDD.toDS()) + } + df.schema + } else { + /* Create a streaming context to handle transforming docs in + * larger databases into Spark datasets + */ + val changesConfig = config.asInstanceOf[CloudantChangesConfig] + val ssc = new StreamingContext(sqlContext.sparkContext, + Seconds(changesConfig.getBatchInterval)) + + val changes = ssc.receiverStream(new ChangesReceiver(changesConfig)) + changes.persist(changesConfig.getStorageLevelForStreaming) + + // Global RDD that's created from union of all RDDs + var globalRDD = ssc.sparkContext.emptyRDD[String] + + logger.info("Loading data from Cloudant using " + + changesConfig.getChangesReceiverUrl) + + // Collect and union each RDD to convert all RDDs to a DataFrame + changes.foreachRDD((rdd: RDD[String]) => { + if (!rdd.isEmpty()) { + if (globalRDD != null) { + // Union RDDs in foreach loop + globalRDD = globalRDD.union(rdd) } else { - // Convert final global RDD[String] to DataFrame - dataFrame = sqlContext.sparkSession.read.json(globalRDD) - ssc.stop(stopSparkContext = false, stopGracefully = false) + globalRDD = rdd } - }) - - ssc.start - // run streaming until all docs from continuous feed are received - ssc.awaitTermination - - if(dataFrame.schema.nonEmpty) { - dataFrame.schema } else { - throw new CloudantException(CloudantChangesConfig.receiverErrorMsg) + // Convert final global RDD[String] to DataFrame + dataFrame = sqlContext.sparkSession.read.json(globalRDD.toDS()) + ssc.stop(stopSparkContext = false, stopGracefully = false) } - } - } - CloudantReadWriteRelation(config, schema, dataFrame)(sqlContext) - } + }) - def createRelation(sqlContext: SQLContext, - mode: SaveMode, - parameters: Map[String, String], - data: DataFrame): CloudantReadWriteRelation = { - val relation = create(sqlContext, parameters, data.schema) - relation.insert(data, mode==SaveMode.Overwrite) - relation - } + ssc.start + // run streaming until all docs from continuous feed are received + ssc.awaitTermination - def createRelation(sqlContext: SQLContext, - parameters: Map[String, String], - schema: StructType): CloudantReadWriteRelation = { - create(sqlContext, parameters, schema) + dataFrame.schema + } } + CloudantReadWriteRelation(config, schema, dataFrame)(sqlContext) + } + def createRelation(sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): CloudantReadWriteRelation = { + val relation = create(sqlContext, parameters, data.schema) + relation.insert(data, mode==SaveMode.Overwrite) + relation + } + + def createRelation(sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): CloudantReadWriteRelation = { + create(sqlContext, parameters, schema) + } } From 228d9c854b5188ea5f40ccb3abc81ce617cb4cde Mon Sep 17 00:00:00 2001 From: Esteban Laver Date: Wed, 24 Jan 2018 23:16:06 -0500 Subject: [PATCH 2/2] Added back if/else block that was removed during rebase --- .../scala/org/apache/bahir/cloudant/DefaultSource.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala index 4e26e36a..26859935 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala @@ -161,7 +161,11 @@ class DefaultSource extends RelationProvider // run streaming until all docs from continuous feed are received ssc.awaitTermination - dataFrame.schema + if(dataFrame.schema.nonEmpty) { + dataFrame.schema + } else { + throw new CloudantException(CloudantChangesConfig.receiverErrorMsg) + } } } CloudantReadWriteRelation(config, schema, dataFrame)(sqlContext)