Skip to content
This repository has been archived by the owner on Oct 8, 2020. It is now read-only.

Commit

Permalink
added comments and parameters, pipeline is working
Browse files Browse the repository at this point in the history
  • Loading branch information
carstendraschner committed Oct 1, 2020
1 parent f5958f2 commit f328796
Showing 1 changed file with 22 additions and 76 deletions.
Expand Up @@ -19,72 +19,15 @@ import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object SimilarityPipeline {
def main(args: Array[String]): Unit = {

// start spark session
val spark = SparkSession.builder
.appName(s"JaccardSimilarityEvaluation")
.master("local[*]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()

// we have two options. you can simply hand over only the path to the file or you give a config
val in = args(0)
if (in.endsWith(".nt")) {
run(
spark = spark,
inputPath = in,
similarityEstimator = "Jaccard",
parametersFeatureExtractorMode = "at",
parameterCountVectorizerMinDf = 1,
parameterCountVectorizerMaxVocabSize = 100000,
parameterSimilarityAlpha = 1.0,
parameterSimilarityBeta = 1.0,
parameterNumHashTables = 1,
parameterSimilarityAllPairThreshold = 0.5,
parameterSimilarityNearestNeighborsK = 20,
parameterThresholdMinSimilarity = 0.5
)
spark.stop()

}
else if (in.endsWith(".conf")) {
val config = ConfigFactory.parseFile(new File(in))
run(
spark = spark,
inputPath = config.getString("inputPath"),
similarityEstimator = config.getString("similarityEstimator"),
parametersFeatureExtractorMode = config.getString("parametersFeatureExtractorMode"),
parameterCountVectorizerMinDf = config.getInt("parameterCountVectorizerMinDf"),
parameterCountVectorizerMaxVocabSize = config.getInt("parameterCountVectorizerMaxVocabSize"),
parameterSimilarityAlpha = config.getDouble("parameterSimilarityAlpha"),
parameterSimilarityBeta = config.getDouble("parameterSimilarityBeta"),
parameterNumHashTables = config.getInt("parameterNumHashTables"),
parameterSimilarityAllPairThreshold = config.getDouble("parameterSimilarityAllPairThreshold"),
parameterSimilarityNearestNeighborsK = config.getInt("parameterSimilarityNearestNeighborsK"),
parameterThresholdMinSimilarity = config.getDouble("parameterThresholdMinSimilarity")
)
spark.stop()

}
else {
throw new Exception("You have to provide either a nt triple file or a conf specifying more parameters")
}
}

//noinspection ScalaStyle
def run(
spark: SparkSession,
inputPath: String,
similarityEstimator: String,
parametersFeatureExtractorMode: String,
parameterCountVectorizerMinDf: Int,
parameterCountVectorizerMaxVocabSize: Int,
parameterSimilarityAlpha: Double,
parameterSimilarityBeta: Double,
parameterNumHashTables: Int,
parameterSimilarityAllPairThreshold: Double,
parameterSimilarityNearestNeighborsK: Int,
parameterThresholdMinSimilarity: Double
): Unit = {
val inputFilePath: String = args(0)
val outputFilePath: String = args(1)
val parameterSimilarityEstimator: String = args(2)
val paramterterFeatureExtractorMode: String = "at"
val paramterCountVectorizerMinTf: Int = 1
val paramterCountVectorizerMinDf: Int = 1
val paramterCountVectorizerVocabSize: Int = 100000
val paramterSimilarityEstimatorThreshold: Double = 0.5

// setup spark session
val spark = SparkSession.builder
Expand All @@ -93,29 +36,31 @@ object SimilarityPipeline {
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()

// define inputpath if it is not parameter
val inputPath = "/Users/carstendraschner/GitHub/SANSA-ML/sansa-ml-spark/src/main/resources/movie.nt"

// read in data as Data`Frame
val triplesDf: DataFrame = spark.read.rdf(Lang.NTRIPLES)(inputPath)
println("ReadIn file:" + inputFilePath)
val triplesDf: DataFrame = spark.read.rdf(Lang.NTRIPLES)(inputFilePath)
// triplesDf.show()

// feature extraction
println("Extract features with mode:" + paramterterFeatureExtractorMode)
val featureExtractorModel = new FeatureExtractorModel()
.setMode("an")
.setMode(paramterterFeatureExtractorMode)
val extractedFeaturesDataFrame = featureExtractorModel
.transform(triplesDf)
.filter(t => t.getAs[String]("uri").startsWith("m"))
// extractedFeaturesDataFrame.show()

// filter for relevant URIs e.g. only movies
val filteredFeaturesDataFrame = extractedFeaturesDataFrame.filter(t => t.getAs[String]("uri").startsWith("m"))
val filteredFeaturesDataFrame = extractedFeaturesDataFrame // .filter(t => t.getAs[String]("uri").startsWith("m"))
// filteredFeaturesDataFrame.show()

// count Vectorization
println("Count Vectorizer")
val cvModel: CountVectorizerModel = new CountVectorizer()
.setInputCol("extractedFeatures")
.setOutputCol("vectorizedFeatures")
.setMinTF(paramterCountVectorizerMinTf)
.setMinDF(paramterCountVectorizerMinDf)
.setVocabSize(paramterCountVectorizerVocabSize)
.fit(filteredFeaturesDataFrame)
val tmpCvDf: DataFrame = cvModel.transform(filteredFeaturesDataFrame)
val isNoneZeroVector = udf({ v: Vector => v.numNonzeros > 0 }, DataTypes.BooleanType)
Expand All @@ -126,7 +71,8 @@ object SimilarityPipeline {
// for nearestNeighbors we need one key which is a Vector to search for NN
// val sample_key: Vector = countVectorizedFeaturesDataFrame.take(1)(0).getAs[Vector]("vectorizedFeatures")

val model: GenericSimilarityEstimatorModel = similarityEstimator match {
println("EstimatorModel setup")
val model: GenericSimilarityEstimatorModel = parameterSimilarityEstimator match {
case "Batet" => new BatetModel()
.setInputCol("vectorizedFeatures")
case "BraunBlanquet" => new BraunBlanquetModel()
Expand All @@ -148,15 +94,15 @@ object SimilarityPipeline {
}

val outputDf1: Dataset[_] = model
.similarityJoin(countVectorizedFeaturesDataFrame, countVectorizedFeaturesDataFrame, 0.5, "distCol")
.similarityJoin(countVectorizedFeaturesDataFrame, countVectorizedFeaturesDataFrame, paramterSimilarityEstimatorThreshold, "distCol")

val metaGraphFactory = new SimilarityExperimentMetaGraphFactory()
val metagraph: RDD[graph.Triple] = metaGraphFactory.createRdfOutput(
outputDataset = outputDf1)(
modelInformationEstimatorName = model.estimatorName, modelInformationEstimatorType = model.modelType, modelInformationMeasurementType = model.estimatorMeasureType)(
inputDatasetNumbertOfTriples = triplesDf.count(), dataSetInformationFilePath = inputPath)
inputDatasetNumbertOfTriples = triplesDf.count(), dataSetInformationFilePath = inputFilePath)

metagraph.coalesce(1, shuffle = true).saveAsNTriplesFile("rdfMetagraphSimilarityEstimation.nt")
metagraph.coalesce(1, shuffle = true).saveAsNTriplesFile(outputFilePath)

}
}

0 comments on commit f328796

Please sign in to comment.