-
Notifications
You must be signed in to change notification settings - Fork 30
Integration of hdt-rdf compression module #81
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @abakarboubaa ,
many thanks for your contribution. I went through it and provided some comments. Please, try to resolve them and let me know in case you need more info.
PS: The PR is failing, mostly due to the scala-style violations. Try to resolve them.
Best regards,
*/ | ||
|
||
/* | ||
--input |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, remove these comments from the class.
* @return Returns DataFrame [Subject,Object,Predicate] | ||
*/ | ||
def convertRDDGraphToDF(triple:RDD[graph.Triple])={ | ||
logger.info("Converting RDD[Graph] to DataFrame[Row].") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to have this log here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And remove all the remaining logs below as well.
logger.info(s"Rdd[Graph] is created. ${rddGraph.count()}") | ||
|
||
val triplesDF=convertRDDGraphToDF(rddGraph) | ||
triplesDF.printSchema() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to print.
triplesDF.cache() | ||
|
||
objectDF=getDistinctObjectDictDF(rddGraph) | ||
objectDF.printSchema() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to print. Remove it.
* @param outputDir Path to be written | ||
* @param mode SaveMode of Write | ||
*/ | ||
def saveAsCSV(outputDir:String, mode:SaveMode): Unit = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
um. why do we need to save them as a csv? Aren't they just indexes? which could be kept in any file (parquet files, hadoop files?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is the future enhancement to support various target types and compression type.
val logger = LoggerFactory.getLogger(TripleOps.getClass) | ||
|
||
|
||
def main(args: Array[String]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, remove the main method from here.
Code has been updated as mentioned. Also updated the code as per scalastyle |
* @return DataFrame Subject dictionary of [index,subject] | ||
*/ | ||
def getDistinctSubjectDictDF(triples : RDD[graph.Triple]) : DataFrame = { | ||
spark.createDataFrame(triples.map(_.getSubject.toString()).distinct().zipWithIndex().map(t => Row(t._1, t._2)), dictionarySchema).cache() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to persist/cache them in advance? I think would be good that we remove the caching as a default approach and leave it to the user how they see it relevant for their use case.
So, remove the .cache()
the method from everywhere where it is added and which can be provided by the user.
* @param registerAsTable If true, it register all the DF as Spark table | ||
* @return Returns the Tuple4 [IndexDataFrame,SubjectDictDataFrame,ObjectDictDataFrame,PredicateDictDataFrame] | ||
*/ | ||
def createOrReadDataSet(input : String, compressedDir : String, registerAsTable : Boolean = true) : (DataFrame, DataFrame, DataFrame, DataFrame) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method requires to have the compressed data already kept in the dist (which usually is mean to be a pre-processing step), did you also consider that sometimes there is no need to materialize the process and keep them in memory and do not put them back to the dist.
I think we should provide another method which allows the users to read the data and compress them on the fly and or course to save them in case they want to do so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"createOrReadDataSet" method does this. If NTriple file is passed, it processes and converts it into index dictionary and if compressed Directory (i.e. already parsed) is passed, then it simply read it as indexed dataFrames. However, I will separate out the function into 2, one for reading from raw NT file and the second one from processed data.
There is already a provision of saving processed (compressed) dataset into a disk using saveAsCSV() and re-read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @Gezim,
Okay noted. Thank you for letting me know.
See you.
Abakar
…On Mon 13. May 2019 at 09:33, Gezim Sejdiu ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In
sansa-rdf-spark/src/main/scala/net/sansa_stack/rdf/spark/model/hdt/TripleOps.scala
<#81 (comment)>:
> + * @return returns RDD[Tripe]
+ */
+ def readRDFFromFile( input : String) : RDD[graph.Triple] = {
+ val triples : RDD[graph.Triple] = spark.rdf(Lang.NTRIPLES)(input)
+ triples
+ }
+
+
+ /**
+ * This is key function of TripleOps that read RDF file and create Dictionaries and Index Table and register them as Spark In memory Table
+ * @param input Input RDF File Path [Either One of the input is require]
+ * @param compressedDir Input compressed-directory Path to read compressed data directly [Either One of the input is require]
+ * @param registerAsTable If true, it register all the DF as Spark table
+ * @return Returns the Tuple4 [IndexDataFrame,SubjectDictDataFrame,ObjectDictDataFrame,PredicateDictDataFrame]
+ */
+ def createOrReadDataSet(input : String, compressedDir : String, registerAsTable : Boolean = true) : (DataFrame, DataFrame, DataFrame, DataFrame) = {
Hi @abakarboubaa <https://github.com/abakarboubaa> ,
indeed. There is no need to do such a separate. I already did it here:
- getHDT
<https://github.com/SANSA-Stack/SANSA-RDF/blob/develop/sansa-rdf-spark/src/main/scala/net/sansa_stack/rdf/spark/model/hdt/TripleOps.scala#L80>
and readHDTFromDisk
<https://github.com/SANSA-Stack/SANSA-RDF/blob/develop/sansa-rdf-spark/src/main/scala/net/sansa_stack/rdf/spark/model/hdt/TripleOps.scala#L110>
See you
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#81 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AKQTRGANWLN37RWQASGLSKLPVEKVNANCNFSM4HKYOT2A>
.
|
I have created the hdt package as mentioned and created a class TripleOps and corresponding Test cases are submitted.