Skip to content

Commit

Permalink
Added support for Lang attribute to RdfSourceFactory.
Browse files Browse the repository at this point in the history
Upgraded source maven plugin version.
  • Loading branch information
Aklakan committed Jul 2, 2021
1 parent 13a3bf0 commit bfd52b2
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 26 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -1722,7 +1722,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.0.1</version>
<version>3.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
Expand Down
Expand Up @@ -2,6 +2,7 @@

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.jena.riot.Lang;


/**
Expand All @@ -16,14 +17,26 @@
public interface RdfSourceFactory {

default RdfSource get(String sourceStr) {
return get(sourceStr, (Lang) null);
}

default RdfSource get(String sourceStr, Lang lang) {
Path path = new Path(sourceStr);
return get(path, null, lang);
}

default RdfSource get(String sourceStr, FileSystem fileSystem) {
Path path = new Path(sourceStr);
return get(path, fileSystem, null);
}

default RdfSource get(Path path, FileSystem fileSystem, Lang lang) {
try {
return create(sourceStr);
return create(path, fileSystem, lang);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

RdfSource create(String sourceStr) throws Exception;
RdfSource create(String sourceStr, FileSystem fileSystem) throws Exception;
RdfSource create(Path path, FileSystem fileSystem) throws Exception;
RdfSource create(Path path, FileSystem fileSystem, Lang lang) throws Exception;
}
Expand Up @@ -36,33 +36,25 @@ public static RdfSourceFactory from(SparkSession sparkSession) {
return new RdfSourceFactoryImpl(sparkSession);
}


public RdfSource create(String sourceStr) throws Exception {
Configuration hadoopConf = sparkSession.sparkContext().hadoopConfiguration();
FileSystem fileSystem = FileSystem.get(hadoopConf);
return create(sourceStr, fileSystem);
}


@Override
public RdfSource create(String sourceStr, FileSystem fileSystem) throws Exception {
Path path = new Path(sourceStr);
public RdfSource create(Path path, FileSystem fileSystem, Lang lang) throws Exception {

return create(path, fileSystem);
}
if (fileSystem == null) {
Configuration hadoopConf = sparkSession.sparkContext().hadoopConfiguration();
fileSystem = FileSystem.get(hadoopConf);
}

@Override
public RdfSource create(Path path, FileSystem fileSystem) throws Exception {
Path resolvedPath = fileSystem.resolvePath(path);

EntityInfo entityInfo;
try (InputStream in = fileSystem.open(resolvedPath)) {
entityInfo = RDFDataMgrEx.probeEntityInfo(in, RDFDataMgrEx.DEFAULT_PROBE_LANGS);
}
if (lang == null) {
EntityInfo entityInfo;
try (InputStream in = fileSystem.open(resolvedPath)) {
entityInfo = RDFDataMgrEx.probeEntityInfo(in, RDFDataMgrEx.DEFAULT_PROBE_LANGS);
}
lang = RDFLanguages.contentTypeToLang(entityInfo.getContentType());

Lang lang = RDFLanguages.contentTypeToLang(entityInfo.getContentType());

Objects.requireNonNull(lang, "Could not obtain lang for " + entityInfo.getContentType() + " from " + path);
Objects.requireNonNull(lang, "Could not obtain lang for " + entityInfo.getContentType() + " from " + path);
}

return new RdfSourceImpl(sparkSession, resolvedPath, lang);
}
Expand Down

0 comments on commit bfd52b2

Please sign in to comment.