Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Correct errors during save operation #61

Merged
merged 1 commit into from
Jun 7, 2016
Merged

Conversation

mayya-sharipova
Copy link
Contributor

Don't raise an error if any partition has 0 records

Update an example with save operation reflecting new API

BugzID: 67396

@@ -45,7 +45,7 @@

df.filter(df.airportName >= 'Moscow').select("_id",'airportName').show()
df.filter(df._id >= 'CAA').select("_id",'airportName').show()
df.filter(df._id >= 'CAA').select("_id",'airportName').save("airportcodemapping_df",
df.filter(df._id >= 'CAA').select("_id",'airportName').write.save("airportcodemapping_df",
"com.cloudant.spark", bulkSize = "100")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This statement did not work for me to demonstrate the new behavior. I had to modify to:

df.filter(df._id >= 'ZZZ').select("_id",'airportName').write.save("airportcodemapping_df", "com.cloudant.spark", bulkSize = "100")

in order to get
[WARN] [06/02/2016 17:32:50.954] [Thread-3] [CloudantReadWriteRelation(akka://CloudantSpark-a3f15ff1-f340-43eb-a7c5-891fb21f45bb)] Database airportcodemapping_df: nothing was saved because the number of records was 0!

The original query with >= CAA would still give me the good INFO message, like:

[INFO] [06/02/2016 17:27:43.275] [Executor task launch worker-0] [JsonStoreDataAccess(akka://CloudantSpark-4a0b1607-c66d-491f-b8d8-e6d4cad61cf1)] Save total 13 with bulkSize 100 in 0s

What behavior did you want to demonstrate here? Pass with or fail with warning?

Copy link
Contributor Author

@mayya-sharipova mayya-sharipova Jun 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HolgerKache Holger, this patch was intended when number of partitions > number of documents to be saved. In your case, the database is very small, it uses just one partition, so save function works fine.
Example to demonstrate the work of the patch:

conf.set("jsonstore.rdd.partitions",20)  #using 20 partitions
df = sqlContext.load("n_flight", "com.cloudant.spark") # big enough database to use all 20 partitions
df2 = df.filter(df.flightSegmentId=='AA106').select("flightSegmentId", "economyClassBaseCost") #df2 contains only 5 docs

#this will throw an error without the patch as not every partition will have at least 1 doc to save
df2.write.save("n_flight111",  "com.cloudant.spark", 
     bulkSize = "100", createDBOnSave="true")  

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I understand. My tests use a factor of 10 only for the load and as a result there are too few airport codes to work with: python -m helpers.dataload -load 10

I like the example you provide above based on flights (mostly because it has more comments). Want to deliver that instead of airportcodemapping?

@HolgerKache
Copy link
Contributor

HolgerKache commented Jun 2, 2016

@mayya-sharipova The new code works well and looks good. Only the samples may need some improvements. E.g., examples/python/CloudantDF.py depends on a database I don't have. Where is that database supposed to come from?

df = sqlContext.load(source="com.cloudant.spark", path="movies-glynn", view="_design/view1/_view/titleyear2", schemaSampleSize="20") df.printSchema()

results in

py4j.protocol.Py4JJavaError: An error occurred while calling o102.load.
: java.lang.RuntimeException: Database movies-glynn request error: {"error":"not_found","reason":"Database does not exist."}

    at com.cloudant.spark.common.JsonStoreDataAccess.getQueryResult(JsonStoreDataAccess.scala:176)
    at com.cloudant.spark.common.JsonStoreDataAccess.getMany(JsonStoreDataAccess.scala:85)

@HolgerKache
Copy link
Contributor

HolgerKache commented Jun 2, 2016

Another problem I have is with: examples/scala/src/main/scala/mytest/spark/CloudantDF.scala

Here we set
conf.set("createDBOnSave","true")

When I have a database airportcodemapping_df already, the file will

a) throw an expected exception

Use connectorVersion=1.6.3, dbName=airportcodemapping_df, indexName=null, viewName=null,jsonstore.rdd.partitions=5, jsonstore.rdd.maxInPartition=-1,jsonstore.rdd.minInPartition=10, jsonstore.rdd.requestTimeout=900000,bulkSize=20, schemaSampleSize=-1
Exception in thread "main" java.lang.RuntimeException: Database airportcodemapping_df create error: {"error":"file_exists","reason":"The database could not be created, the file already exists."}

    at com.cloudant.spark.common.JsonStoreDataAccess.createDB(JsonStoreDataAccess.scala:199)

b) executions hangs and does not return - unexpected

This ^ is probably unrelated to the code changes in this ticket but a problem nonetheless.

@mayya-sharipova
Copy link
Contributor Author

@HolgerKache About your last two comments:

  1. I will change the examples of views to use db that is a part of the test environment.
  2. Good catch on freezing behaviour of spark in scala. Strange this doesn't happen in Python. I will investigate more on this.

Don't raise an error if any partition has 0 records

Update an example with save operation reflecting new API

Solve an issue with an application freezing on error

BugzID: 67396
@mayya-sharipova mayya-sharipova merged commit e2b5159 into master Jun 7, 2016
@mayya-sharipova mayya-sharipova deleted the 67396_correct_save branch June 7, 2016 21:28
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants