Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

read sequence file from HDFS #101

Open
pzhanggit opened this issue Sep 28, 2021 · 8 comments
Open

read sequence file from HDFS #101

pzhanggit opened this issue Sep 28, 2021 · 8 comments

Comments

@pzhanggit
Copy link

Hi @dfdx, thank you for showing me how to add new functions to Spark.jl in issue #98. Now I am trying to add "sequenceFile" following the same steps but could not get it working. Here is the code snippet.

"Create RDD from a sequence file"
function sequence_file(sc::SparkContext, path::AbstractString)
    jrdd = jcall(sc.jsc, "sequenceFile", JJavaPairRDD, (JString,), path)
    return JavaPairRDD(jrdd)
end

I got an error message saying

Exception in thread "main" java.lang.NoSuchMethodError: sequenceFile
ERROR: LoadError: JavaCall.JavaCallError("Error calling Java: java.lang.NoSuchMethodError: sequenceFile")
Stacktrace:
[1] geterror(allow::Bool)
@ JavaCall ~/.julia/packages/JavaCall/tjlYt/src/core.jl:418
[2] jcall(obj::JavaCall.JavaObject{Symbol("org.apache.spark.api.java.JavaSparkContext")}, method::String, rettype::Type, argtypes::Tuple{DataType}, args::String)
@ JavaCall ~/.julia/packages/JavaCall/tjlYt/src/core.jl:244
[3] sequence_file(sc::SparkContext, path::String)
@ Spark ~/.julia/packages/Spark/7NlAt/src/context.jl:87
...

Any suggestions on how to make it work? Thanks.

@dfdx
Copy link
Owner

dfdx commented Sep 29, 2021

It's almost correct! The mistake is that SparkContext.sequenceFile() also requires at least keyClass and valueClass arguments:

using JavaCall

# make sure we read the signature correctly
listmethods(sc.jsc, "sequenceFile")
# 2-element Vector{JMethod}:
#  org.apache.spark.api.java.JavaPairRDD sequenceFile(java.lang.String, java.lang.Class, java.lang.Class, int)
#  org.apache.spark.api.java.JavaPairRDD sequenceFile(java.lang.String, java.lang.Class, java.lang.Class)

# create class objects - using a hack for simplicity
# though a more robust approach would be to use Java reflection API
jstr = getclass(JString(""))

# actually call sequenceFile()
path = ...
jcall(sc.jsc, "sequenceFile", JJavaPairRDD, (JString, JClass, JClass), path, jstr, jstr)

@pzhanggit
Copy link
Author

Thank you, @dfdx! It works now. Are there any functions in Spark.jl that I can use to get the key and value of JavaPairRDD?

@pzhanggit
Copy link
Author

Update:
I found values function to get the value of JavaPairRDD. Here is the code.

listmethods(sc.jsc, "sequenceFile")
jstr = getclass(JString(""))
text = jcall(sc.jsc, "sequenceFile", JJavaPairRDD, (JString, JClass, JClass), filepath_input, jstr, jstr)
listmethods(text,"values")
# 1-element Vector{JMethod}:
# org.apache.spark.api.java.JavaRDD values()
text_v = JavaRDD(jcall(text,"values", JJavaRDD, ()))
collect(text_v)

But I got errors saying,

21/10/01 13:04:56 INFO DAGScheduler: ResultStage 0 (collect at JuliaRDD.scala:233) failed in 2.972 s due to Job aborted due to stage failure: Task 4 in stage 0.0 failed 4 times, most recent failure: Lost task 4.3 in stage 0.0 (TID 24) (127.0.0.1 executor 0): java.lang.ArrayStoreException: org.apache.hadoop.io.Text
at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:75)
at scala.Array$.slowcopy(Array.scala:84)
at scala.Array$.copy(Array.scala:110)
at scala.collection.mutable.ResizableArray.copyToArray(ResizableArray.scala:80)
at scala.collection.mutable.ResizableArray.copyToArray$(ResizableArray.scala:78)
...

and

ERROR: JavaCall.JavaCallError("Error calling Java: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 0.0 failed 4 times, most recent failure: Lost task 4.3 in stage 0.0 (TID 24) (127.0.0.1 executor 0): java.lang.ArrayStoreException: org.apache.hadoop.io.Text\n\tat scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:75)\n\tat scala.Array$.slowcopy(Array.scala:84)\n\tat scala.Array$.copy(Array.scala:110)\n\tat scala.collection.mutable.ResizableArray.copyToArray(ResizableArray.scala:80)\n\tat scala.collection.mutable.ResizableArray.copyToArray$(ResizableArray.scala:78)\n\tat scala.collection.mutable.ArrayBuffer.copyToArray(ArrayBuffer.scala:49)\n\tat scala.collection.TraversableOnce.copyToArray(TraversableOnce.scala:283)\n\tat scala.collection.TraversableOnce.copyToArray$(TraversableOnce.scala:282)\n\tat scala.collection.AbstractTraversable.copyToArray(Traversable.scala:108)\n\tat scala.collection.TraversableOnce.toArray(TraversableOnce.scala:291)\n\tat scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)\n\tat scala.collection.AbstractTraversable.toArray(Traversable.scala:108)\n\tat scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)\n\tat scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)\n\tat scala.collection.AbstractIterator.toArray(Iterator.scala:1429)\n\tat org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)\n\tat org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:131)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n\nDriver stacktrace:")
Stacktrace:
[1] geterror(allow::Bool)
@ JavaCall ~/.julia/packages/JavaCall/tjlYt/src/core.jl:418
[2] geterror
@ ~/.julia/packages/JavaCall/tjlYt/src/core.jl:403 [inlined]
[3] _jcall(obj::JavaMetaClass{Symbol("org.apache.spark.api.julia.JuliaRDD")}, jmethodId::Ptr{Nothing}, callmethod::Ptr{Nothing}, rettype::Type, argtypes::Tuple{DataType}, args::JavaObject{Symbol("org.apache.spark.api.java.JavaRDD")})
@ JavaCall ~/.julia/packages/JavaCall/tjlYt/src/core.jl:373
[4] jcall(typ::Type{JavaObject{Symbol("org.apache.spark.api.julia.JuliaRDD")}}, method::String, rettype::Type, argtypes::Tuple{DataType}, args::JavaObject{Symbol("org.apache.spark.api.java.JavaRDD")})
@ JavaCall ~/.julia/packages/JavaCall/tjlYt/src/core.jl:227
[5] collect_internal(rdd::JavaRDD, static_java_class::Type, result_class::Type)
@ Spark ~/.julia/packages/Spark/iBu5i/src/rdd.jl:234
[6] collect(rdd::JavaRDD)
@ Spark ~/.julia/packages/Spark/iBu5i/src/rdd.jl:261
[7] top-level scope
@ REPL[14]:1

@dfdx
Copy link
Owner

dfdx commented Oct 1, 2021

Just to make sure, in your sequence file both - keys and values - are strings, right? Because I used JString just as an example class :D

@pzhanggit
Copy link
Author

pzhanggit commented Oct 1, 2021

Just to make sure, in your sequence file both - keys and values - are strings, right? Because I used JString just as an example class :D

Aaaaa, they are actually org.apache.hadoop.io.Text. Any examples/directions I can learn to switch from string to Text? Thank you for being so patient with me. I barely know anything about Java/Scala.

@dfdx
Copy link
Owner

dfdx commented Oct 1, 2021

I had to refresh my memory about the Reflection API a bit, but it turns out we already have the convenient function to create instance of a Class - classforname. Try this (assuming both - key and value - are Text:

jtext = JavaCall.classforname("org.apache.hadoop.io.Text")
text = jcall(sc.jsc, "sequenceFile", JJavaPairRDD, (JString, JClass, JClass), filepath_input, jtext, jtext)

@pzhanggit
Copy link
Author

Thank you so much! That part works now. I saw an error when run collect(text_v),

Job aborted due to stage failure: task 6.0 in stage 0.0 (TID 6) had a not serializable result: org.apache.hadoop.io.Text\nSerialization stack:\n\t- object not serializable

After some searching, it seems that Text is not serializable and I found this solution.

text_v_T = jcall(text,"values", JJavaRDD, ())
text_v = jcall(text_v_T,"toString", JString, ())
collect(text_v)

and I got results like,

44-element Vector{Char}:
'M': ASCII/Unicode U+004D (category Lu: Letter, uppercase)
'a': ASCII/Unicode U+0061 (category Ll: Letter, lowercase)
'p': ASCII/Unicode U+0070 (category Ll: Letter, lowercase)
'P': ASCII/Unicode U+0050 (category Lu: Letter, uppercase)
'a': ASCII/Unicode U+0061 (category Ll: Letter, lowercase)
'r': ASCII/Unicode U+0072 (category Ll: Letter, lowercase)
't': ASCII/Unicode U+0074 (category Ll: Letter, lowercase)
'i': ASCII/Unicode U+0069 (category Ll: Letter, lowercase)
't': ASCII/Unicode U+0074 (category Ll: Letter, lowercase)
'i': ASCII/Unicode U+0069 (category Ll: Letter, lowercase)
'o': ASCII/Unicode U+006F (category Ll: Letter, lowercase)
'n': ASCII/Unicode U+006E (category Ll: Letter, lowercase)
's': ASCII/Unicode U+0073 (category Ll: Letter, lowercase)
'R': ASCII/Unicode U+0052 (category Lu: Letter, uppercase)
'D': ASCII/Unicode U+0044 (category Lu: Letter, uppercase)
'D': ASCII/Unicode U+0044 (category Lu: Letter, uppercase)
'[': ASCII/Unicode U+005B (category Ps: Punctuation, open)

's': ASCII/Unicode U+0073 (category Ll: Letter, lowercase)
' ': ASCII/Unicode U+0020 (category Zs: Separator, space)
'a': ASCII/Unicode U+0061 (category Ll: Letter, lowercase)
't': ASCII/Unicode U+0074 (category Ll: Letter, lowercase)
' ': ASCII/Unicode U+0020 (category Zs: Separator, space)
'<': ASCII/Unicode U+003C (category Sm: Symbol, math)
'u': ASCII/Unicode U+0075 (category Ll: Letter, lowercase)
'n': ASCII/Unicode U+006E (category Ll: Letter, lowercase)
'k': ASCII/Unicode U+006B (category Ll: Letter, lowercase)
'n': ASCII/Unicode U+006E (category Ll: Letter, lowercase)
'o': ASCII/Unicode U+006F (category Ll: Letter, lowercase)
'w': ASCII/Unicode U+0077 (category Ll: Letter, lowercase)
'n': ASCII/Unicode U+006E (category Ll: Letter, lowercase)
'>': ASCII/Unicode U+003E (category Sm: Symbol, math)
':': ASCII/Unicode U+003A (category Po: Punctuation, other)
'0': ASCII/Unicode U+0030 (category Nd: Number, decimal digit)

Something is still missing in my code.

@dfdx
Copy link
Owner

dfdx commented Oct 1, 2021

I understand the error, but I don't see an immediate solution. In your code you call .toString on the JavaPairRDD object, which is a single string "JavaPairRDD...". The solution you linked instead calls .map() method, applying .toString to each element of the incoming RDD. Spark.jl also provides a function map(), but to use it, you need to serialize the incoming object first to pass it to Julia, and Text is not serializable :(

The most straightforward way to convert Text into String would be to implement it in Java (e.g. in JuliaRDD or any other dummy class), but perhaps writing and compiling Java isn't something you dream about. Maybe we can bypass the problem via the newer Dataset API, but I don't see the appropriate method path right now. So let me think about it a little bit.

As a side note, very few people still work with sequence files and Hadoop's Text class, so I don't expect a good support for them in the modern versions of Spark.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants