Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 2 additions & 16 deletions .github/workflows/componentTests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,5 @@ jobs:
run: mvn clean compile test-compile

- name: Component Tests
run: |
log="/tmp/sysdstest.log"
echo "Starting Tests"
mvn surefire:test -DskipTests=false -Dtest=org.apache.sysds.test.component.*.** 2>&1 > $log
grep_args="SUCCESS"
grepvals="$( tail -n 100 $log | grep $grep_args)"
if [[ $grepvals == *"SUCCESS"* ]]; then
echo "--------------------- last 100 lines from test ------------------------"
tail -n 100 $log
echo "------------------ last 100 lines from test end -----------------------"
exit 0
else
echo "\n $(cat $log)"
exit 1
fi

run: mvn surefire:test -DskipTests=false -Dtest=org.apache.sysds.test.component.**

5 changes: 2 additions & 3 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ SystemDS's distinguishing characteristics are:
2. **Multiple execution modes**, including Spark MLContext, Spark Batch, Standalone, and JMLC.
3. **Automatic optimization** based on data and cluster characteristics to ensure both efficiency and scalability.

This version of SystemDS supports: Java 8+, Python 3.5+, Hadoop 2.6+ (Not 3.X), and Spark 2.1+ (Not 3.X).
This version of SystemDS supports: Java 8+, Python 3.5+, Hadoop 2.10+ (Not 3.X), and Spark 2.4.6+ (Not 3.X).

# Links
## Links

Various forms of documentation for SystemDS are available.

Expand All @@ -41,4 +41,3 @@ Various forms of documentation for SystemDS are available.
- The [javadoc API](./api/java/index) contains internal documentation of the system source code.
- [Install from Source](./site/install) guides through setup from git download to running system.
- If you want to contribute take a look at [Contributing](https://github.com/apache/systemds/blob/master/CONTRIBUTING.md)

26 changes: 21 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@
</licenses>

<properties>
<hadoop.version>2.6.0</hadoop.version>
<hadoop.version>2.10.0</hadoop.version>
<antlr.version>4.5.3</antlr.version>
<spark.version>2.1.0</spark.version>
<scala.version>2.11.8</scala.version>
<spark.version>2.4.6</spark.version>
<!-- <scala.version>2.11.8</scala.version> -->
<scala.binary.version>2.11</scala.binary.version>
<maven.build.timestamp.format>yyyy-MM-dd HH:mm:ss z</maven.build.timestamp.format>
<enableGPU>false</enableGPU>
Expand Down Expand Up @@ -989,7 +989,7 @@
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.0.8</version>
<version>3.0.16</version>
<scope>provided</scope>
</dependency>

Expand Down Expand Up @@ -1022,7 +1022,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.42.Final</version>
<version>4.1.47.Final</version>
<scope>provided</scope>
</dependency>

Expand Down Expand Up @@ -1058,5 +1058,21 @@
<artifactId>protobuf-java-util</artifactId>
<version>3.12.2</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
<version>2.11.1</version>
</dependency>
</dependencies>
</project>
8 changes: 7 additions & 1 deletion src/main/java/org/apache/sysds/api/PythonDMLScript.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ public class PythonDMLScript {
* @param args Command line arguments.
*/
public static void main(String[] args) {
start(Integer.parseInt(args[0]));
if(args.length != 1) {
throw new IllegalArgumentException("Python DML Script should be initialized with a singe number argument");
}
else {
int port = Integer.parseInt(args[0]);
start(port);
}
}

private static void start(int port) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@

import org.apache.spark.SparkConf;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.netty.SparkTransportConf;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.util.LongAccumulator;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.paramserv.LocalParamServer;
import org.apache.sysds.runtime.controlprogram.paramserv.SparkPSProxy;

Expand Down Expand Up @@ -59,6 +61,11 @@ public static SparkPSProxy createSparkPSProxy(SparkConf conf, int port, LongAccu
conf.getTimeAsMs("spark.network.timeout", "120s");
String host = conf.get("spark.driver.host");
TransportContext context = createTransportContext(conf, new LocalParamServer());
return new SparkPSProxy(context.createClientFactory().createClient(host, port), rpcTimeout, aRPC);
try{
TransportClient tc = context.createClientFactory().createClient(host, port);
return new SparkPSProxy(tc, rpcTimeout, aRPC);
}catch(InterruptedException e){
throw new DMLRuntimeException("Spark client threw Interrupted Exception",e);
}
}
}
28 changes: 0 additions & 28 deletions src/main/python/pre_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,6 @@
from zipfile import ZipFile

this_path = os.path.dirname(os.path.realpath(__file__))
python_dir = 'systemds'
java_dir = 'systemds-java'
java_dir_full_path = os.path.join(this_path, python_dir, java_dir)
if os.path.exists(java_dir_full_path):
shutil.rmtree(java_dir_full_path, True)
root_dir = os.path.dirname(os.path.dirname(os.path.dirname(this_path)))

# temporary directory for unzipping of bin zip
TMP_DIR = os.path.join(this_path, 'tmp')
if os.path.exists(TMP_DIR):
shutil.rmtree(TMP_DIR, True)
os.mkdir(TMP_DIR)

SYSTEMDS_BIN = 'systemds-*-SNAPSHOT-bin.zip'
for file in os.listdir(os.path.join(root_dir, 'target')):
if fnmatch.fnmatch(file, SYSTEMDS_BIN):
new_path = os.path.join(TMP_DIR, file)
shutil.copyfile(os.path.join(root_dir, 'target', file), new_path)
extract_dir = os.path.join(TMP_DIR)
with ZipFile(new_path, 'r') as zip:
for f in zip.namelist():
split_path = os.path.split(os.path.dirname(f))
if split_path[1] == 'lib':
zip.extract(f, TMP_DIR)
unzipped_dir_name = file.rsplit('.', 1)[0]
shutil.copytree(os.path.join(TMP_DIR, unzipped_dir_name), java_dir_full_path)
if os.path.exists(TMP_DIR):
shutil.rmtree(TMP_DIR, True)

root_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.getcwd())))
shutil.copyfile(os.path.join(root_dir, 'LICENSE'), 'LICENSE')
Expand Down
63 changes: 43 additions & 20 deletions src/main/python/systemds/context/systemds_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,30 +54,32 @@ def __init__(self):
that can be read from to get the printed statements from the JVM.
"""

systemds_java_path = os.path.join(get_module_dir(), "systemds-java")
# nt means its Windows
sys_root = os.environ.get("SYSTEMDS_ROOT")
if sys_root == None:
# Python API now require SystemDS root environment variable to be set.
raise Exception("SYSTEMDS_ROOT not set please consult the install guide")

cp_separator = ";" if os.name == "nt" else ":"
lib_cp = os.path.join(systemds_java_path, "lib", "*")
systemds_cp = os.path.join(systemds_java_path, "*")
classpath = cp_separator.join([lib_cp, systemds_cp])
lib_cp = os.path.join(sys_root, "target","lib", "*")
systemds_cp = os.path.join(sys_root,"target","SystemDS.jar")

classpath = cp_separator.join([lib_cp , systemds_cp])

# TODO make use of JavaHome env-variable if set to find java, instead of just using any java available.
command = ["java", "-cp", classpath]

sys_root = os.environ.get("SYSTEMDS_ROOT")
if sys_root != None:
files = glob(os.path.join(sys_root, "conf", "log4j*.properties"))
if len(files) > 1:
print("WARNING: Multiple logging files")
if len(files) == 0:
print("WARNING: No log4j file found at: "
+ os.path.join(sys_root, "conf")
+ " therefore using default settings")
else:
# print("Using Log4J file at " + files[0])
command.append("-Dlog4j.configuration=file:" + files[0])

files = glob(os.path.join(sys_root, "conf", "log4j*.properties"))
if len(files) > 1:
print("WARNING: Multiple logging files")
if len(files) == 0:
print("WARNING: No log4j file found at: "
+ os.path.join(sys_root, "conf")
+ " therefore using default settings")
else:
print("Default Log4J used, since environment $SYSTEMDS_ROOT not set")
# print("Using Log4J file at " + files[0])
command.append("-Dlog4j.configuration=file:" + files[0])


command.append("org.apache.sysds.api.PythonDMLScript")

Expand All @@ -89,8 +91,29 @@ def __init__(self):

process = Popen(command, stdout=PIPE, stdin=PIPE, stderr=PIPE)
first_stdout = process.stdout.readline()
assert (b"Server Started" in first_stdout), "Error JMLC Server not Started"

if(b"GatewayServer Started" in first_stdout):
print("Startup success")
else:
stderr = process.stderr.readline().decode("utf-8")
if(len(stderr) > 1):
raise Exception("Exception in startup of GatewayServer: " + stderr)
outputs = []
outputs.append(first_stdout.decode("utf-8"))
max_tries = 10
for i in range(max_tries):
next_line = process.stdout.readline()
if(b"GatewayServer Started" in next_line):
print("WARNING: Stdout corrupted by prints: " + str(outputs))
print("Startup success")
break
else:
outputs.append(next_line)

if (i == max_tries-1):
raise Exception("Error in startup of systemDS gateway process: \n gateway StdOut: " + str(outputs) + " \n gateway StdErr" + process.stderr.readline().decode("utf-8") )

assert (b"GatewayServer Started" in first_stdout), "Error JMLC Server not Started first message was: " + first_stdout.decode("utf-8")

# Handle Std out from the subprocess.
self.__stdout = Queue()
self.__stderr = Queue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.instructions.spark.utils.RDDConverterUtilsExt;
import org.apache.sysds.test.AutomatedTestBase;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.instructions.spark.utils.RDDConverterUtilsExt;
import org.apache.sysds.test.AutomatedTestBase;

public class RDDConverterUtilsExtTest extends AutomatedTestBase {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.sysds.test.component.pythonapi;

import org.apache.sysds.api.PythonDMLScript;
import org.junit.Test;

/** Simple tests to verify startup of Python Gateway server happens without crashes */
public class StartupTest {

@Test(expected = IllegalArgumentException.class)
public void testStartupIncorrect_1(){
PythonDMLScript.main(new String[]{});
}

@Test(expected = IllegalArgumentException.class)
public void testStartupIncorrect_2(){
PythonDMLScript.main(new String[]{""});
}

@Test(expected = IllegalArgumentException.class)
public void testStartupIncorrect_3(){
PythonDMLScript.main(new String[]{"131","131"});
}

@Test(expected = NumberFormatException.class)
public void testStartupIncorrect_4(){
PythonDMLScript.main(new String[]{"Hello"});
}

@Test(expected = IllegalArgumentException.class)
public void testStartupIncorrect_5(){
// Number out of range
PythonDMLScript.main(new String[]{"918757"});
}

@Test
public void testStartup(){
// Number out of range
PythonDMLScript.main(new String[]{"49100"});
}
}