Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

METRON-1708 Run the Batch Profiler in Spark [Feature Branch] #1161

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 15 additions & 0 deletions metron-analytics/metron-profiler-spark/pom.xml
Expand Up @@ -190,6 +190,21 @@
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptor>src/main/assembly/assembly.xml</descriptor>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
@@ -0,0 +1,60 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->

<assembly>
<id>archive</id>
<formats>
<format>tar.gz</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>${project.basedir}/src/main/config</directory>
<outputDirectory>config</outputDirectory>
<useDefaultExcludes>true</useDefaultExcludes>
<excludes>
<exclude>**/*.formatted</exclude>
<exclude>**/*.filtered</exclude>
</excludes>
<fileMode>0644</fileMode>
<lineEnding>unix</lineEnding>
<filtered>true</filtered>
</fileSet>
<fileSet>
<directory>${project.basedir}/src/main/scripts</directory>
<outputDirectory>bin</outputDirectory>
<useDefaultExcludes>true</useDefaultExcludes>
<excludes>
<exclude>**/*.formatted</exclude>
<exclude>**/*.filtered</exclude>
</excludes>
<fileMode>0755</fileMode>
<lineEnding>unix</lineEnding>
<filtered>true</filtered>
</fileSet>
<fileSet>
<directory>${project.basedir}/target</directory>
<includes>
<include>${project.artifactId}-${project.version}.jar</include>
</includes>
<outputDirectory>lib</outputDirectory>
<useDefaultExcludes>true</useDefaultExcludes>
</fileSet>
</fileSets>
</assembly>
@@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
spark.master=local
spark.app.name=Batch Profiler
@@ -0,0 +1,174 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.metron.profiler.spark.cli;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.io.IOUtils;
import org.apache.metron.common.configuration.profiler.ProfilerConfig;
import org.apache.metron.profiler.spark.BatchProfiler;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.util.Properties;

import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.CONFIGURATION_FILE;
import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.GLOBALS_FILE;
import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.PROFILE_DEFN_FILE;
import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.parse;

/**
* The main entry point which launches the Batch Profiler in Spark.
*
* With this class the Batch Profiler can be submitted using the following command.
*
* <pre>{@code
* $SPARK_HOME/bin/spark-submit \
* --class org.apache.metron.profiler.spark.cli.BatchProfilerCLI \
* --properties-file spark.properties \
* metron-profiler-spark-<version>.jar \
* --config profiler.properties \
* --globals global.properties \
* --profiles profiles.json
* }</pre>
*/
public class BatchProfilerCLI implements Serializable {

protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

public static Properties globals;
public static Properties profilerProps;
public static ProfilerConfig profiles;

public static void main(String[] args) throws IOException, org.apache.commons.cli.ParseException {
// parse the command line
CommandLine commandLine = parseCommandLine(args);
profilerProps = handleProfilerProperties(commandLine);
globals = handleGlobals(commandLine);
profiles = handleProfileDefinitions(commandLine);

// the batch profiler must use 'event time'
if(!profiles.getTimestampField().isPresent()) {
throw new IllegalArgumentException("The Batch Profiler must use event time. The 'timestampField' must be defined.");
}

// one or more profiles must be defined
if(profiles.getProfiles().size() == 0) {
throw new IllegalArgumentException("No profile definitions found.");
}

SparkSession spark = SparkSession
.builder()
.config(new SparkConf())
.getOrCreate();

BatchProfiler profiler = new BatchProfiler();
long count = profiler.run(spark, profilerProps, globals, profiles);
LOG.info("Profiler produced {} profile measurement(s)", count);
}

/**
* Load the Stellar globals from a file.
*
* @param commandLine The command line.
*/
private static Properties handleGlobals(CommandLine commandLine) throws IOException {
Properties globals = new Properties();
if(GLOBALS_FILE.has(commandLine)) {
String globalsPath = GLOBALS_FILE.get(commandLine);

LOG.info("Loading global properties from '{}'", globalsPath);
globals.load(new FileInputStream(globalsPath));

LOG.info("Globals = {}", globals);
}
return globals;
}

/**
* Load the Profiler configuration from a file.
*
* @param commandLine The command line.
*/
private static Properties handleProfilerProperties(CommandLine commandLine) throws IOException {
Properties config = new Properties();
if(CONFIGURATION_FILE.has(commandLine)) {
String propertiesPath = CONFIGURATION_FILE.get(commandLine);

LOG.info("Loading profiler properties from '{}'", propertiesPath);
config.load(new FileInputStream(propertiesPath));

LOG.info("Properties = {}", config.toString());
}
return config;
}

/**
* Load the profile definitions from a file.
*
* @param commandLine The command line.
*/
private static ProfilerConfig handleProfileDefinitions(CommandLine commandLine) throws IOException {
ProfilerConfig profiles;
if(PROFILE_DEFN_FILE.has(commandLine)) {
String profilePath = PROFILE_DEFN_FILE.get(commandLine);

LOG.info("Loading profiles from '{}'", profilePath);
String contents = IOUtils.toString(new FileInputStream(profilePath));

profiles = ProfilerConfig.fromJSON(contents);
LOG.info("Loaded {} profile(s)", profiles.getProfiles().size());

} else {
throw new IllegalArgumentException("No profile(s) defined");
}
return profiles;
}

/**
* Parse the command line arguments submitted by the user.
* @param args The command line arguments to parse.
* @throws org.apache.commons.cli.ParseException
*/
private static CommandLine parseCommandLine(String[] args) throws ParseException {
CommandLineParser parser = new PosixParser();
return parse(parser, args);
}

public static Properties getGlobals() {
return globals;
}

public static Properties getProfilerProps() {
return profilerProps;
}

public static ProfilerConfig getProfiles() {
return profiles;
}
}