Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration of hdt-rdf compression module #81

Merged
merged 7 commits into from May 10, 2019

Conversation

@abakarboubaa
Copy link
Contributor

commented May 4, 2019

I have created the hdt package as mentioned and created a class TripleOps and corresponding Test cases are submitted.

@GezimSejdiu GezimSejdiu self-requested a review May 6, 2019

@GezimSejdiu
Copy link
Member

left a comment

Hi @abakarboubaa ,

many thanks for your contribution. I went through it and provided some comments. Please, try to resolve them and let me know in case you need more info.
PS: The PR is failing, mostly due to the scala-style violations. Try to resolve them.

Best regards,

*/

/*
--input

This comment has been minimized.

Copy link
@GezimSejdiu

GezimSejdiu May 6, 2019

Member

Please, remove these comments from the class.

* @return Returns DataFrame [Subject,Object,Predicate]
*/
def convertRDDGraphToDF(triple:RDD[graph.Triple])={
logger.info("Converting RDD[Graph] to DataFrame[Row].")

This comment has been minimized.

Copy link
@GezimSejdiu

GezimSejdiu May 6, 2019

Member

No need to have this log here.

This comment has been minimized.

Copy link
@GezimSejdiu

GezimSejdiu May 6, 2019

Member

And remove all the remaining logs below as well.

logger.info(s"Rdd[Graph] is created. ${rddGraph.count()}")

val triplesDF=convertRDDGraphToDF(rddGraph)
triplesDF.printSchema()

This comment has been minimized.

Copy link
@GezimSejdiu

GezimSejdiu May 6, 2019

Member

No need to print.

triplesDF.cache()

objectDF=getDistinctObjectDictDF(rddGraph)
objectDF.printSchema()

This comment has been minimized.

Copy link
@GezimSejdiu

GezimSejdiu May 6, 2019

Member

No need to print. Remove it.

* @param outputDir Path to be written
* @param mode SaveMode of Write
*/
def saveAsCSV(outputDir:String, mode:SaveMode): Unit =

This comment has been minimized.

Copy link
@GezimSejdiu

GezimSejdiu May 6, 2019

Member

um. why do we need to save them as a csv? Aren't they just indexes? which could be kept in any file (parquet files, hadoop files?).

This comment has been minimized.

Copy link
@abakarboubaa

abakarboubaa May 7, 2019

Author Contributor

That is the future enhancement to support various target types and compression type.

val logger = LoggerFactory.getLogger(TripleOps.getClass)


def main(args: Array[String]) {

This comment has been minimized.

Copy link
@GezimSejdiu

GezimSejdiu May 6, 2019

Member

Please, remove the main method from here.

@abakarboubaa abakarboubaa reopened this May 7, 2019

@abakarboubaa

This comment has been minimized.

Copy link
Contributor Author

commented May 7, 2019

Code has been updated as mentioned. Also updated the code as per scalastyle

* @return DataFrame Subject dictionary of [index,subject]
*/
def getDistinctSubjectDictDF(triples : RDD[graph.Triple]) : DataFrame = {
spark.createDataFrame(triples.map(_.getSubject.toString()).distinct().zipWithIndex().map(t => Row(t._1, t._2)), dictionarySchema).cache()

This comment has been minimized.

Copy link
@GezimSejdiu

GezimSejdiu May 8, 2019

Member

Do we need to persist/cache them in advance? I think would be good that we remove the caching as a default approach and leave it to the user how they see it relevant for their use case.
So, remove the .cache() the method from everywhere where it is added and which can be provided by the user.

* @param registerAsTable If true, it register all the DF as Spark table
* @return Returns the Tuple4 [IndexDataFrame,SubjectDictDataFrame,ObjectDictDataFrame,PredicateDictDataFrame]
*/
def createOrReadDataSet(input : String, compressedDir : String, registerAsTable : Boolean = true) : (DataFrame, DataFrame, DataFrame, DataFrame) = {

This comment has been minimized.

Copy link
@GezimSejdiu

GezimSejdiu May 8, 2019

Member

This method requires to have the compressed data already kept in the dist (which usually is mean to be a pre-processing step), did you also consider that sometimes there is no need to materialize the process and keep them in memory and do not put them back to the dist.

I think we should provide another method which allows the users to read the data and compress them on the fly and or course to save them in case they want to do so.

This comment has been minimized.

Copy link
@abakarboubaa

abakarboubaa May 13, 2019

Author Contributor

"createOrReadDataSet" method does this. If NTriple file is passed, it processes and converts it into index dictionary and if compressed Directory (i.e. already parsed) is passed, then it simply read it as indexed dataFrames. However, I will separate out the function into 2, one for reading from raw NT file and the second one from processed data.

There is already a provision of saving processed (compressed) dataset into a disk using saveAsCSV() and re-read.

This comment has been minimized.

Copy link
@GezimSejdiu

GezimSejdiu May 13, 2019

Member

Hi @abakarboubaa ,

indeed. There is no need to do such a separate. I already did it here:

See you

@GezimSejdiu GezimSejdiu merged commit 61976ab into SANSA-Stack:develop May 10, 2019

1 check failed

continuous-integration/travis-ci/pr The Travis CI build failed
Details
@abakarboubaa

This comment has been minimized.

Copy link
Contributor Author

commented May 13, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants
You can’t perform that action at this time.