Skip to content

Commit

Permalink
scala docs for DaSimEstimator
Browse files Browse the repository at this point in the history
  • Loading branch information
carstendraschner committed Oct 13, 2021
1 parent 574e749 commit ba40080
Showing 1 changed file with 122 additions and 6 deletions.
Expand Up @@ -37,51 +37,117 @@ class DaSimEstimator {
// general
var _parameterVerboseProcess = false


/**
* candidate filtering sparql
* with this parameter you can reduce the list of of candidates by use use of a sparql query
* @param sparqlFilter SPARQL filter applied ontop of input KG
* @return adjusted transformer
*/
def setSparqlFilter(sparqlFilter: String): this.type = {
_pInitialFilterBySPARQL = sparqlFilter
this
}

/**
* FIlter init KG by spo object
* Filter the KG by the object of spo structure, so an alternative and faster compared to sparql
* @param objectFilter string representing the object for spo filter
* @return adjusted transformer
*/
def setObjectFilter(objectFilter: String): this.type = {
_pInitialFilterByObject = objectFilter
this
}

/**
* DistSim feature extraction method
* feature extracting method for first guesses via DistSim
* @param distSimFeatureExtractionMethod DistSim feature Extraction Method
* @return adjusted transformer
*/
def setDistSimFeatureExtractionMethod(distSimFeatureExtractionMethod: String): this.type = {
_pDistSimFeatureExtractionMethod = distSimFeatureExtractionMethod
this
}

/**
* DistSim Threshold min Similarity
* This is the threshold for minimal similarity score being used within Distsim for promising canidates
* @param distSimThreshold DistSim threshold min similarity score for prefilter candidate pairs
* @return adjusted transformer
*/
def setDistSimThreshold(distSimThreshold: Double): this.type = {
_pDistSimThreshold = distSimThreshold
this
}

/**
* Execution order of similarity scores
* here you can specify in which order the similarity values should be executed
* @param similarityCalculationExecutionOrder
* @return adjusted transformer
*/
def setSimilarityCalculationExecutionOrder(similarityCalculationExecutionOrder: Array[String]): this.type = {
pSimilarityCalculationExecutionOrder = similarityCalculationExecutionOrder
this
}

/**
* Normalize similairty scores per feature
* this parameter offers that the feature dedicated similarity scores are streched/normed s.t. they all reach from zero to one
* @param valueStreching
* @return adjusted transformer
*/
def setSimilarityValueStreching(valueStreching: Boolean): this.type = {
pValueStreching = valueStreching
this
}

/**
* specify manually the availability of each feature
* this parameter weights the relevance of a certain feature similarity based on their availability
* it is possible that the availability is known
* if the value is not given, it will be considered to be equally distributed
* @param availability
* @return adjusted transformer
*/
def setAvailability(availability: Map[String, Double]): this.type = {
pAvailability = availability
this
}

/**
* specify manually the reliability of each feature
* this parameter weights the relevance of a certain feature similarity based on their reliability
* it is possible that the reliability is known, for example that certain data might be influenced by ffake news or that data is rarely updated
* if the value is not given, it will be considered to be equally distributed
* @param reliability
* @return adjusted transformer
*/
def setReliability(reliability: Map[String, Double]): this.type = {
pReliability = reliability
this
}

/**
* specify manually the importance of each feature
* this parameter weights the relevance of a certain feature similarity based on their importance
* this value offers user to influence weightning on personal preferance
* @param importance
* @return adjusted transformer
*/
def setImportance(importance: Map[String, Double]): this.type = {
pImportance = importance
this
}

/**
* internal method that collects seeds by either sparql or object filter
* @param ds dataset of triples representing input kg
* @param sparqlFilter filter by sparql initial kg
* @param objectFilter gilter init kg by spo object
* @return dataframe with one column containing string representation of seed URIs
*/
def gatherSeeds(ds: Dataset[Triple], sparqlFilter: String = null, objectFilter: String = null): DataFrame = {

val spark = SparkSession.builder.getOrCreate()
Expand Down Expand Up @@ -124,6 +190,14 @@ class DaSimEstimator {
seeds
}

/**
* we use distsim to gather promising candidates
* @param dataset prefiltered KG for gathering candidates
* @param seeds the seeds to be used for calculating promising cadidates via DistSim
* @param _pDistSimFeatureExtractionMethod method for distsim feature extractor
* @param _pDistSimThreshold threshold for distsim postfilter pairs by min threshold
* @return dataframe with candidate pairs resulting from DistSim
*/
def gatherCandidatePairs(dataset: Dataset[Triple], seeds: DataFrame, _pDistSimFeatureExtractionMethod: String = "os", _pDistSimThreshold: Double = 0): DataFrame = {
val spark = SparkSession.builder().getOrCreate()
implicit val rdfTripleEncoder: Encoder[Triple] = org.apache.spark.sql.Encoders.kryo[Triple]
Expand Down Expand Up @@ -190,6 +264,17 @@ class DaSimEstimator {
candidatePairsForSimEst
}

/**
* feature extraction for extensive similarity scores
* creates dataframe with all features
* two options for feature gathering
* either SparqlFrame
* or SmartFeature Extractor which operates pivot based
* @param ds dataset of KG
* @param candidates dandidate pairs from distsim
* @param sparqlFeatureExtractionQuery optional, but if set we use sparql frame and not smartfeatureextractor
* @return dataframe with columns corresponding to the features and the uri identifier
*/
def gatherFeatures(ds: Dataset[Triple], candidates: DataFrame, sparqlFeatureExtractionQuery: String = null): DataFrame = {
val featureDf = {
if (sparqlFeatureExtractionQuery != null) {
Expand Down Expand Up @@ -222,6 +307,11 @@ class DaSimEstimator {
featureDf
}

/**
* list all elements which exists within the resulting uris of distsim
* @param candidatePairs candidate pairs in a dataframe coming from distsim
* @return dataframw ith one column having the relevant uris as strings
*/
def listDistinctCandidates(candidatePairs: DataFrame): DataFrame = {

candidatePairs
Expand All @@ -234,14 +324,21 @@ class DaSimEstimator {
).distinct()
}

def calculateDasinSimilarities(
/**
* calculate with the new approach the weighted and feature specific simialrity scores
*
* @param candidatePairsDataFrame candidate pairs which span up the combinations to be calculated on
* @param extractedFeatureDataframe extracted feature dataframe
* @return calculate for each feature the pairwise similarity score
*/
def calculateDaSimSimilarities(
candidatePairsDataFrame: DataFrame,
extractedFeatureDataframe: DataFrame,
): DataFrame = {

var similarityEstimations: DataFrame = candidatePairsDataFrame

if (pSimilarityCalculationExecutionOrder == 0) pSimilarityCalculationExecutionOrder = extractedFeatureDataframe.columns.drop(1)
if (pSimilarityCalculationExecutionOrder == null) pSimilarityCalculationExecutionOrder = extractedFeatureDataframe.columns.drop(1)

pSimilarityCalculationExecutionOrder.foreach(
featureName => {
Expand Down Expand Up @@ -417,6 +514,11 @@ class DaSimEstimator {

}

/**
* optional method to normalize similarity columns
* @param df similarity scored dataframe which needs to be normalized
* @return normalized dataframe
*/
def normSimColumns(df: DataFrame): DataFrame = {
var norm_sim_df: DataFrame = df.cache()

Expand All @@ -438,6 +540,15 @@ class DaSimEstimator {
norm_sim_df
}

/**
* aggregate similarity scores and weight those
* @param simDf similarity dataframw with the feature specific sim scores
* @param valueStreching parameter, optional to strech features, by deafault set
* @param availability weightning by availability
* @param importance user specific weighning over importance
* @param reliability optional opportunity to incluence weighning by reliability
* @return similarity dataframe with aggregated and weigthed final similarity score
*/
def aggregateSimilarityScore(
simDf: DataFrame,
valueStreching: Boolean = true,
Expand Down Expand Up @@ -496,6 +607,12 @@ class DaSimEstimator {



/**
* transforms da kg to a similarity score dataframe based on parameters
* overall method encapsulating the methods and should be used from outside
* @param dataset knowledge graph
* @return dataframw with results of similarity scores as metagraph
*/
def transform(dataset: Dataset[Triple]): DataFrame = {
// gather seeds
println("gather seeds")
Expand All @@ -519,10 +636,9 @@ class DaSimEstimator {
featureDf
// dasim similarity estimation calculation
println("column wise similarity calculation")
val similarityEstimations: DataFrame = calculateDasinSimilarities(
val similarityEstimations: DataFrame = calculateDaSimSimilarities(
candidatePairs,
featureDf,
pSimilarityCalculationExecutionOrder,
featureDf
).cache()
similarityEstimations.show(false)

Expand Down

0 comments on commit ba40080

Please sign in to comment.