Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
# under the License.
#
# -------------------------------------------------------------
# Import numpy and SystemDS

import logging

import numpy as np
from systemds.context import SystemDSContext
from systemds.operator.algorithm import l2svm
Expand All @@ -41,4 +43,4 @@
with SystemDSContext() as sds:
model = l2svm(sds.from_numpy(features),
sds.from_numpy(labels)).compute()
print(model)
logging.info(model)
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
# under the License.
#
# -------------------------------------------------------------
# Import SystemDS
import logging

from systemds.context import SystemDSContext
from systemds.operator.algorithm import l2svm

Expand All @@ -28,7 +29,7 @@
# Add value to all cells in features
features += 1.1
# Generate labels of all ones and zeros
labels = sds.rand(10, 1, 1, 1, sparsity = 0.5)
labels = sds.rand(10, 1, 1, 1, sparsity=0.5)

model = l2svm(features, labels).compute()
print(model)
logging.info(model)
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
# under the License.
#
# -------------------------------------------------------------
# Import SystemDSContext

import logging

from systemds.context import SystemDSContext

# Create a context and if necessary (no SystemDS py4j instance running)
# it starts a subprocess which does the execution in SystemDS
with SystemDSContext() as sds:
Expand All @@ -30,5 +33,5 @@
m_res = m * 3.1
# Do the calculation in SystemDS by calling compute().
# The returned value is an numpy array that can be directly printed.
print(m_res.compute())
logging.info(m_res.compute())
# context will automatically be closed and process stopped
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
# under the License.
#
# -------------------------------------------------------------
import logging

import numpy as np
from systemds.context import SystemDSContext

Expand All @@ -34,4 +36,4 @@
m_res = sds.from_numpy(m1) * sds.from_numpy(m2)
# lets do the actual computation in SystemDS! The result is an numpy array
m_res_np = m_res.compute()
print(m_res_np)
logging.info(m_res_np)
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
# under the License.
#
# -------------------------------------------------------------
import logging

from systemds.context import SystemDSContext
from systemds.operator.algorithm import multiLogReg, multiLogRegPredict
from systemds.examples.tutorials.mnist import DataManager
from systemds.operator.algorithm import multiLogReg, multiLogRegPredict

d = DataManager()

Expand All @@ -39,4 +41,4 @@
Yt_ds = sds.from_numpy(Yt) + 1.0
[m, y_pred, acc] = multiLogRegPredict(Xt_ds, bias, Yt_ds).compute()

print(acc)
logging.info(acc)
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
#
# -------------------------------------------------------------
# Python
import numpy as np
import os

import numpy as np

if not os.path.isdir("temp"):
os.mkdir("temp")
a = np.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
#
# -------------------------------------------------------------
# Python
import numpy as np
import logging

from systemds.context import SystemDSContext

# Create a federated matrix
Expand All @@ -35,5 +36,5 @@
with SystemDSContext() as sds:
fed_a = sds.federated([address], [dims])
# Sum the federated matrix and call compute to execute
print(fed_a.sum().compute())
logging.info(fed_a.sum().compute())
# Result should be 45.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#
# -------------------------------------------------------------
# Python
import logging

import numpy as np
from systemds.context import SystemDSContext

Expand All @@ -37,11 +39,11 @@
([6, 0], [9, 3])])
# local matrix to multiply with
loc = sds.from_numpy(np.array([
[1,2,3,4,5,6,7,8,9],
[1,2,3,4,5,6,7,8,9],
[1,2,3,4,5,6,7,8,9]
]))
[1, 2, 3, 4, 5, 6, 7, 8, 9],
[1, 2, 3, 4, 5, 6, 7, 8, 9],
[1, 2, 3, 4, 5, 6, 7, 8, 9]
]))
# Multiply local and federated
ret = loc @ fed
# execute the lazy script and print
print(ret.compute())
logging.info(ret.compute())
90 changes: 61 additions & 29 deletions src/main/python/systemds/context/systemds_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,30 @@
__all__ = ["SystemDSContext"]

import json
import logging
import os
import socket
import sys
from contextlib import contextmanager
from glob import glob
from queue import Queue
from subprocess import PIPE, Popen
from threading import Thread
from time import sleep
from typing import Dict, Iterable, Sequence, Tuple, Union
from contextlib import contextmanager

import numpy as np
import pandas as pd
from py4j.java_gateway import GatewayParameters, JavaGateway
from systemds.operator import (Frame, List, Matrix, OperationNode, Scalar,
Source)
from systemds.script_building import OutputType
from systemds.script_building import DMLScript, OutputType
from systemds.utils.consts import VALID_INPUT_TYPES
from systemds.utils.helpers import get_module_dir


class SystemDSContext(object):
"""A context with a connection to a java instance with which SystemDS operations are executed.
"""A context with a connection to a java instance with which SystemDS operations are executed.
The java process is started and is running using a random tcp port for instruction parsing.

This class is used as the starting point for all SystemDS execution. It gives the ability to create
Expand All @@ -54,22 +55,35 @@ class SystemDSContext(object):
java_gateway: JavaGateway
_capture_statistics: bool
_statistics: str
_log: logging.Logger

def __init__(self, port: int = -1, capture_statistics: bool = False):
def __init__(self, port: int = -1, capture_statistics: bool = False, logging_level: int = 20):
"""Starts a new instance of SystemDSContext, in which the connection to a JVM systemds instance is handled
Any new instance of this SystemDS Context, would start a separate new JVM.

Standard out and standard error form the JVM is also handled in this class, filling up Queues,
that can be read from to get the printed statements from the JVM.

:param port: default -1, giving a random port for communication with JVM
:param capture_statistics: If the statistics of the execution in SystemDS should be captured
:param logging_level: Specify the logging level used for informative messages, default 20 indicating INFO.
The logging levels are as follows: 10 DEBUG, 20 INFO, 30 WARNING, 40 ERROR, 50 CRITICAL.
"""
actual_port = self.__start(port)
process = self.__process
self._statistics = ""
self._capture_statistics = capture_statistics

self._log = logging.Logger(self.__class__.__name__)
self._log.setLevel(logging_level)

if process.poll() is None:
self.__start_gateway(actual_port)
else:
self.exception_and_close("Java process stopped before gateway could connect")
self.exception_and_close(
"Java process stopped before gateway could connect")

self._log.debug("Started JVM and SystemDS python context manager")

def get_stdout(self, lines: int = -1):
"""Getter for the stdout of the java subprocess
Expand Down Expand Up @@ -189,25 +203,25 @@ def __build_startup_command(self, port: int):

files = glob(os.path.join(root, "conf", "log4j*.properties"))
if len(files) > 1:
print(
"WARNING: Multiple logging files found selecting: " + files[0])
self._log.warning(
"Multiple logging files found selecting: " + files[0])
if len(files) == 0:
print("WARNING: No log4j file found at: "
+ os.path.join(root, "conf")
+ " therefore using default settings")
self._log.warning("No log4j file found at: "
+ os.path.join(root, "conf")
+ " therefore using default settings")
else:
command.append("-Dlog4j.configuration=file:" + files[0])

command.append("org.apache.sysds.api.PythonDMLScript")

files = glob(os.path.join(root, "conf", "SystemDS*.xml"))
if len(files) > 1:
print(
"WARNING: Multiple config files found selecting: " + files[0])
self._log.warning(
"Multiple config files found selecting: " + files[0])
if len(files) == 0:
print("WARNING: No log4j file found at: "
+ os.path.join(root, "conf")
+ " therefore using default settings")
self._log.warning("No log4j file found at: "
+ os.path.join(root, "conf")
+ " therefore using default settings")
else:
command.append("-config")
command.append(files[0])
Expand Down Expand Up @@ -279,7 +293,7 @@ def close(self):
self.__kill_Popen(self.java_gateway.java_process)
self.java_gateway.shutdown()
if hasattr(self, '__process'):
print("Has process variable")
logging.error("Has process variable")
self.__kill_Popen(self.__process)
if hasattr(self, '__stdout_thread') and self.__stdout_thread.is_alive():
self.__stdout_thread.join(0)
Expand Down Expand Up @@ -311,7 +325,7 @@ def __get_open_port(self):
s.close()
return port

def _execution_completed(self, script: 'DMLScript'):
def _execution_completed(self, script: DMLScript):
"""
Should/will be called after execution of a script.
Used to update statistics.
Expand Down Expand Up @@ -448,40 +462,59 @@ def rand(self, rows: int, cols: int,

return Matrix(self, 'rand', [], named_input_nodes=named_input_nodes)

def __fix_string_args(self, arg: str) -> str:
nf = str(arg).replace('"', "").replace("'", "")
return f'"{nf}"'

def read(self, path: os.PathLike, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> OperationNode:
""" Read an file from disk. Supportted types include:
CSV, Matrix Market(coordinate), Text(i,j,v), SystemDS Binary, etc.
See: http://apache.github.io/systemds/site/dml-language-reference#readwrite-built-in-functions for more details
:return: an Operation Node, containing the read data the operationNode read can be of types, Matrix, Frame or Scalar.
"""
mdt_filepath = path + ".mtd"
# If metadata file is existing, then simply use that and force data type to mtd file
if os.path.exists(mdt_filepath):
with open(mdt_filepath) as jspec_file:
mtd = json.load(jspec_file)
kwargs["data_type"] = mtd["data_type"]
if kwargs.get("format", None):
kwargs["format"] = self.__fix_string_args(kwargs["format"])
elif kwargs.get("format", None): # If format is specified. Then use that format
kwargs["format"] = self.__fix_string_args(kwargs["format"])
else: # Otherwise guess at what format the file is based on file extension
if ".csv" in path[-4:]:
kwargs["format"] = '"csv"'
self._log.warning(
"Guessing '"+path+"' is a csv file, please add a mtd file, or specify in arguments")
if not ("header" in kwargs) and "data_type" in kwargs and kwargs["data_type"] == "frame":
kwargs["header"] = True

data_type = kwargs.get("data_type", None)
file_format = kwargs.get("format", None)

if data_type == "matrix":
kwargs["data_type"] = f'"{data_type}"'
return Matrix(self, "read", [f'"{path}"'], named_input_nodes=kwargs)
elif data_type == "frame":
kwargs["data_type"] = f'"{data_type}"'
if isinstance(file_format, str):
kwargs["format"] = f'"{kwargs["format"]}"'
return Frame(self, "read", [f'"{path}"'], named_input_nodes=kwargs)
elif data_type == "scalar":
kwargs["data_type"] = f'"{data_type}"'
output_type = OutputType.from_str(kwargs.get("value_type", None))
kwargs["value_type"] = f'"{output_type.name}"'
return Scalar(self, "read", [f'"{path}"'], named_input_nodes=kwargs, output_type=output_type)
if output_type:
kwargs["value_type"] = f'"{output_type.name}"'
return Scalar(self, "read", [f'"{path}"'], named_input_nodes=kwargs, output_type=output_type)
else:
raise ValueError(
"Invalid arguments for reading scalar, value_type must be specified")
elif data_type == "list":
# Reading a list have no extra arguments.
return List(self, "read", [f'"{path}"'])

kwargs["data_type"] = None
print("WARNING: Unknown type read please add a mtd file, or specify in arguments")
return OperationNode(self, "read", [f'"{path}"'], named_input_nodes=kwargs)
else:
kwargs["data_type"] = '"matrix"'
self._log.warning(
"Unknown type read please add a mtd file, or specify in arguments, defaulting to matrix")
return Matrix(self, "read", [f'"{path}"'], named_input_nodes=kwargs)

def scalar(self, v: Dict[str, VALID_INPUT_TYPES]) -> Scalar:
""" Construct an scalar value, this can contain str, float, double, integers and booleans.
Expand Down Expand Up @@ -572,7 +605,7 @@ def federated(self, addresses: Iterable[str],
named_params.update(kwargs)
return Matrix(self, 'federated', args, named_params)

def source(self, path: str, name: str, print_imported_methods: bool = False) -> Source:
def source(self, path: str, name: str) -> Source:
"""Import methods from a given dml file.

The importing is done thorugh the DML command source, and adds all defined methods from
Expand All @@ -587,9 +620,8 @@ def source(self, path: str, name: str, print_imported_methods: bool = False) ->

:param path: The absolute or relative path to the file to import
:param name: The name to give the imported file in the script, this name must be unique
:param print_imported_methods: boolean specifying if the imported methods should be printed.
"""
return Source(self, path, name, print_imported_methods)
return Source(self, path, name)

def list(self, *args: Sequence[VALID_INPUT_TYPES], **kwargs: Dict[str, VALID_INPUT_TYPES]) -> List:
""" Create a List object containing the given nodes.
Expand Down
Loading