Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimization of I/O path of python interface #2032

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;

import org.apache.sysds.common.Types;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.frame.data.columns.ArrayFactory;
import org.apache.sysds.runtime.frame.data.columns.BooleanArray;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;


import org.apache.sysds.runtime.frame.data.columns.Array;

/**
* Utils for converting python data to java.
*/
Expand Down Expand Up @@ -114,6 +120,82 @@
return mb;
}

public static Array<?> convert(byte[] data, int numElements, Types.ValueType valueType) {
Baunsgaard marked this conversation as resolved.
Show resolved Hide resolved
if (data == null || valueType == null) {
throw new DMLRuntimeException("Invalid input data or value type.");

Check warning on line 125 in src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java#L125

Added line #L125 was not covered by tests
}

ByteBuffer buffer = ByteBuffer.wrap(data);
buffer.order(ByteOrder.LITTLE_ENDIAN);

Array<?> array = ArrayFactory.allocate(valueType, numElements);

// Process the data based on the value type
switch (valueType) {
case UINT4:
for (int i = 0; i < numElements; i++) {
array.set(i, (int) (buffer.get() & 0xFF));

Check warning on line 137 in src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java#L137

Added line #L137 was not covered by tests
}
break;

Check warning on line 139 in src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java#L139

Added line #L139 was not covered by tests
case UINT8:
for (int i = 0; i < numElements; i++) {
array.set(i, (int) (buffer.get() & 0xFF));
}
break;
case INT32:
for (int i = 0; i < numElements; i++) {
array.set(i, buffer.getInt());
}
break;
case INT64:
for (int i = 0; i < numElements; i++) {
array.set(i, buffer.getLong());

Check warning on line 152 in src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java#L152

Added line #L152 was not covered by tests
}
break;

Check warning on line 154 in src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java#L154

Added line #L154 was not covered by tests
case FP32:
for (int i = 0; i < numElements; i++) {
array.set(i, buffer.getFloat());

Check warning on line 157 in src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java#L157

Added line #L157 was not covered by tests
}
break;

Check warning on line 159 in src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java#L159

Added line #L159 was not covered by tests
case FP64:
for (int i = 0; i < numElements; i++) {
array.set(i, buffer.getDouble());
}
break;
case BOOLEAN:
for (int i = 0; i < numElements; i++) {
((BooleanArray) array).set(i, buffer.get() != 0);
}
break;
case STRING:
for (int i = 0; i < numElements; i++) {
buffer.order(ByteOrder.BIG_ENDIAN);
int strLen = buffer.getInt();
buffer.order(ByteOrder.LITTLE_ENDIAN);
byte[] strBytes = new byte[strLen];
buffer.get(strBytes);
array.set(i, new String(strBytes, StandardCharsets.UTF_8));
}
break;
case CHARACTER:
for (int i = 0; i < numElements; i++) {
array.set(i, (char) buffer.get());

Check warning on line 182 in src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java#L182

Added line #L182 was not covered by tests
}
break;

Check warning on line 184 in src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java#L184

Added line #L184 was not covered by tests
case HASH64:
for (int i = 0; i < numElements; i++) {
array.set(i, buffer.getLong());

Check warning on line 187 in src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java#L187

Added line #L187 was not covered by tests
}
break;

Check warning on line 189 in src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java#L189

Added line #L189 was not covered by tests
default:
throw new DMLRuntimeException("Unsupported value type: " + valueType);

Check warning on line 191 in src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java#L191

Added line #L191 was not covered by tests
}

return array;
}



public static byte[] convertMBtoPy4JDenseArr(MatrixBlock mb) {
byte[] ret = null;
if(mb.isInSparseFormat()) {
Expand Down
66 changes: 49 additions & 17 deletions src/main/python/systemds/utils/converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#
# -------------------------------------------------------------

import struct

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -81,10 +82,10 @@ def matrix_block_to_numpy(jvm: JVMView, mb: JavaObject):


def pandas_to_frame_block(sds, pd_df: pd.DataFrame):
"""Converts a given numpy array, to internal matrix block representation.
"""Converts a given pandas DataFrame to an internal FrameBlock representation.

:param sds: The current systemds context.
:param np_arr: the numpy array to convert to matrixblock.
:param sds: The current SystemDS context.
:param pd_df: The pandas DataFrame to convert to FrameBlock.
"""
assert pd_df.ndim <= 2, "pd_df invalid, because it has more than 2 dimensions"
rows = pd_df.shape[0]
Expand All @@ -100,6 +101,10 @@ def pandas_to_frame_block(sds, pd_df: pd.DataFrame):
np.dtype(np.float64): jvm.org.apache.sysds.common.Types.ValueType.FP64,
np.dtype(np.bool_): jvm.org.apache.sysds.common.Types.ValueType.BOOLEAN,
np.dtype("<M8[ns]"): jvm.org.apache.sysds.common.Types.ValueType.STRING,
np.dtype(np.int32): jvm.org.apache.sysds.common.Types.ValueType.INT32,
np.dtype(np.float32): jvm.org.apache.sysds.common.Types.ValueType.FP32,
np.dtype(np.uint8): jvm.org.apache.sysds.common.Types.ValueType.UINT8,
np.dtype(np.character): jvm.org.apache.sysds.common.Types.ValueType.CHARACTER,
}
schema = []
col_names = []
Expand All @@ -116,20 +121,47 @@ def pandas_to_frame_block(sds, pd_df: pd.DataFrame):
jc_FrameBlock = jvm.org.apache.sysds.runtime.frame.data.FrameBlock
j_valueTypeArray = java_gate.new_array(jc_ValueType, len(schema))
j_colNameArray = java_gate.new_array(jc_String, len(col_names))
j_dataArray = java_gate.new_array(jc_String, rows, cols)
for i in range(len(schema)):
j_valueTypeArray[i] = schema[i]
for i in range(len(col_names)):
j_colNameArray[i] = str(col_names[i])
j = 0
for j, col_name in enumerate(col_names):
col_data = pd_df[col_name].fillna("").to_numpy(dtype=str)
for i in range(col_data.shape[0]):
if col_data[i]:
j_dataArray[i][j] = col_data[i]
fb = jc_FrameBlock(j_valueTypeArray, j_colNameArray, j_dataArray)

return fb

# execution speed increases with optimized code when the number of rows exceeds 4
if rows > 4:
Baunsgaard marked this conversation as resolved.
Show resolved Hide resolved
for i in range(len(schema)):
j_valueTypeArray[i] = schema[i]
for i in range(len(col_names)):
j_colNameArray[i] = str(col_names[i])

fb = jc_FrameBlock(j_valueTypeArray, j_colNameArray, rows)

# convert and set data for each column
for j, col_name in enumerate(col_names):
col_type = schema[j]
if col_type == jvm.org.apache.sysds.common.Types.ValueType.STRING:
byte_data = bytearray()
for value in pd_df[col_name].astype(str):
encoded_value = value.encode('utf-8')
byte_data.extend(struct.pack('>I', len(encoded_value)))
byte_data.extend(encoded_value)
else:
col_data = pd_df[col_name].fillna("").to_numpy()
byte_data = bytearray(col_data.tobytes())

converted_array = jvm.org.apache.sysds.runtime.util.Py4jConverterUtils.convert(byte_data, rows, col_type)
fb.setColumn(j, converted_array)
return fb
else:
j_dataArray = java_gate.new_array(jc_String, rows, cols)
for i in range(len(schema)):
j_valueTypeArray[i] = schema[i]
for i in range(len(col_names)):
j_colNameArray[i] = str(col_names[i])
j = 0
for j, col_name in enumerate(col_names):
col_data = pd_df[col_name].fillna("").to_numpy(dtype=str)
for i in range(col_data.shape[0]):
if col_data[i]:
j_dataArray[i][j] = col_data[i]
fb = jc_FrameBlock(j_valueTypeArray, j_colNameArray, j_dataArray)
return fb

except Exception as e:
sds.exception_and_close(e)

Expand Down
89 changes: 89 additions & 0 deletions src/main/python/tests/iotests/test_io_pandas_systemds.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# -------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# -------------------------------------------------------------


import os
import shutil
import unittest
import pandas as pd
from systemds.context import SystemDSContext

class TestPandasFromToSystemds(unittest.TestCase):

sds: SystemDSContext = None
temp_dir: str = "tests/iotests/temp_write_csv/"
n_cols = 3
n_rows = 5
df = pd.DataFrame(
{
'C1': [f"col1_string_{i}" for i in range(n_rows)],
'C2': [i for i in range(n_rows)],
}
)

@classmethod
def setUpClass(cls):
cls.sds = SystemDSContext()
if not os.path.exists(cls.temp_dir):
os.makedirs(cls.temp_dir)


@classmethod
def tearDownClass(cls):
cls.sds.close()
shutil.rmtree(cls.temp_dir, ignore_errors=True)


def test_into_systemds(self):
# Transfer into SystemDS and write to CSV
frame = self.sds.from_pandas(self.df)
frame.write(self.temp_dir + "into_systemds.csv", format="csv", header=True).compute(verbose=True)

# Read the CSV file using pandas
result_df = pd.read_csv(self.temp_dir + "into_systemds.csv")

# Verify the data
print("result_df:")
print(result_df)
print("self.df:")
print(self.df)
self.assertTrue(isinstance(result_df, pd.DataFrame))
self.assertTrue(self.df.equals(result_df))

def test_out_of_systemds(self):
# Create a CSV file to read into SystemDS
self.df.to_csv(self.temp_dir + "out_of_systemds.csv", header=False, index=False)

# Read the CSV file into SystemDS and then compute back to pandas
frame = self.sds.read(self.temp_dir + "out_of_systemds.csv", data_type="frame", format="csv")
result_df = frame.replace("xyz", "yzx").compute()

# Verify the data
print("result_df:")
result_df['C2'] = result_df['C2'].astype(int)
print(result_df)
print("self.df:")
print(self.df)
self.assertTrue(isinstance(result_df, pd.DataFrame))
self.assertTrue(self.df.equals(result_df))

if __name__ == "__main__":
unittest.main(exit=False)
2 changes: 1 addition & 1 deletion src/main/python/tests/matrix/test_print.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def tearDownClass(cls):
def test_print_01(self):
self.sds.from_numpy(np.array([1])).to_string().print().compute()
sleep(0.2)
self.assertEqual(1,float(self.sds.get_stdout()[0]))
self.assertEqual(1,float(self.sds.get_stdout()[0].replace(",", ".")))

def test_print_02(self):
self.sds.scalar(1).print().compute()
Expand Down
Loading
Loading