Skip to content

Commit

Permalink
Improving concurrently, synchronizing as little as possible.
Browse files Browse the repository at this point in the history
  • Loading branch information
datumbox committed Jan 11, 2016
1 parent b5f3f3e commit 47e3fd8
Show file tree
Hide file tree
Showing 28 changed files with 577 additions and 738 deletions.
6 changes: 4 additions & 2 deletions TODO.txt
@@ -1,14 +1,16 @@
CODE IMPROVEMENTS CODE IMPROVEMENTS
================= =================


- Profiling & benchmarking.
- Add multithreading support on the training for the following models: - Add multithreading support on the training for the following models:
- OrdinalRegression - OrdinalRegression
- SoftMaxRegression - SoftMaxRegression
- HierarchicalAgglomerative - HierarchicalAgglomerative
- Kmeans - Kmeans
- NLMS - NLMS
- Search for getFeatureIds() and see it the estimation should change. We can use the Dataframe.xDataTypes instead? - Try if possible to use atomic int in dataframe: a.updateAndGet((x) -> max(x+1,rId+1));
- Add a from-to or limit option on the Dataframe.Builder
- Consider changing the policy of copying the data on the constructor of Record and start using unmodifiable without copy instead.
- Profiling & benchmarking.


- Update all maven plugins and dependencies to their latest versions. - Update all maven plugins and dependencies to their latest versions.
- Add support for MapDB 2.0 once a stable version is released. - Add support for MapDB 2.0 once a stable version is released.
Expand Down
Expand Up @@ -72,10 +72,9 @@ public static <T> Stream<T> stream(Iterator<T> iterator, boolean parallel) {
* *
* @param <T> * @param <T>
* @param stream * @param stream
* @param parallel
* @return * @return
*/ */
public static <T> Stream<Map.Entry<Integer, T>> enumerate(Stream<T> stream, boolean parallel) { public static <T> Stream<Map.Entry<Integer, T>> enumerate(Stream<T> stream) {
Iterator<Map.Entry<Integer, T>> iterator = new Iterator<Map.Entry<Integer, T>>() { Iterator<Map.Entry<Integer, T>> iterator = new Iterator<Map.Entry<Integer, T>>() {
private int counter = 0; private int counter = 0;


Expand All @@ -99,7 +98,7 @@ public void remove() {
throw new UnsupportedOperationException("This is a read-only iterator, remove operation is not supported."); throw new UnsupportedOperationException("This is a read-only iterator, remove operation is not supported.");
} }
}; };
return stream(iterator, parallel); return stream(iterator, stream.isParallel());
} }


} }
124 changes: 62 additions & 62 deletions src/main/java/com/datumbox/common/concurrency/ThreadMethods.java 100644 → 100755
@@ -1,62 +1,62 @@
/** /**
* Copyright (C) 2013-2016 Vasilis Vryniotis <bbriniotis@datumbox.com> * Copyright (C) 2013-2016 Vasilis Vryniotis <bbriniotis@datumbox.com>
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package com.datumbox.common.concurrency; package com.datumbox.common.concurrency;


import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Stream; import java.util.stream.Stream;


/** /**
* This class contains a number of helper methods for Java 8 Threads. * This class contains a number of helper methods for Java 8 Threads.
* *
* @author Vasilis Vryniotis <bbriniotis@datumbox.com> * @author Vasilis Vryniotis <bbriniotis@datumbox.com>
*/ */
public class ThreadMethods { public class ThreadMethods {


/** /**
* Takes the items of the stream in a throttled way and provides them to the * Takes the items of the stream in a throttled way and provides them to the
* consumer. It uses as many threads as the available processors and it does * consumer. It uses as many threads as the available processors and it does
* not start more tasks than 2 times the previous number. * not start more tasks than 2 times the previous number.
* *
* @param <T> * @param <T>
* @param stream * @param stream
* @param consumer * @param consumer
*/ */
public static <T> void throttledExecution(Stream<T> stream, Consumer<T> consumer) { public static <T> void throttledExecution(Stream<T> stream, Consumer<T> consumer) {
int maxThreads = Runtime.getRuntime().availableProcessors(); int maxThreads = Runtime.getRuntime().availableProcessors();
int maxTasks = 2*maxThreads; int maxTasks = 2*maxThreads;


ExecutorService executorService = Executors.newFixedThreadPool(maxThreads); ExecutorService executorService = Executors.newFixedThreadPool(maxThreads);
ThrottledExecutor executor = new ThrottledExecutor(executorService, maxTasks); ThrottledExecutor executor = new ThrottledExecutor(executorService, maxTasks);


stream.forEach(i -> { stream.sequential().forEach(i -> {
executor.execute(() -> { executor.execute(() -> {
consumer.accept(i); consumer.accept(i);
}); });
}); });


executorService.shutdown(); executorService.shutdown();
try { try {
executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
} }
catch (InterruptedException ex) { catch (InterruptedException ex) {
throw new RuntimeException(ex); throw new RuntimeException(ex);
} }
} }


} }
144 changes: 72 additions & 72 deletions src/main/java/com/datumbox/common/concurrency/ThrottledExecutor.java 100644 → 100755
@@ -1,73 +1,73 @@
/** /**
* Copyright (C) 2013-2016 Vasilis Vryniotis <bbriniotis@datumbox.com> * Copyright (C) 2013-2016 Vasilis Vryniotis <bbriniotis@datumbox.com>
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package com.datumbox.common.concurrency; package com.datumbox.common.concurrency;


import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;


/** /**
* The ThrottledExecutor enables us to throttle the input of the executor. This * The ThrottledExecutor enables us to throttle the input of the executor. This
* can be useful when we don't wish to submit all the tasks at once in order to * can be useful when we don't wish to submit all the tasks at once in order to
* preserve memory. * preserve memory.
* *
* @author Vasilis Vryniotis <bbriniotis@datumbox.com> * @author Vasilis Vryniotis <bbriniotis@datumbox.com>
*/ */
public class ThrottledExecutor implements Executor { public class ThrottledExecutor implements Executor {


private final Executor wrappedExecutor; private final Executor wrappedExecutor;


private final Semaphore semaphore; private final Semaphore semaphore;


/** /**
* This Executor will block the main thread (when execute() is called) if the * This Executor will block the main thread (when execute() is called) if the
* number of submitted and unfinished tasks reaches the provided limit. This * number of submitted and unfinished tasks reaches the provided limit. This
* enabled us to pace the input and reduce memory consumption. * enabled us to pace the input and reduce memory consumption.
* *
* @param executor * @param executor
* @param maxConcurrentTasks * @param maxConcurrentTasks
*/ */
public ThrottledExecutor(Executor executor, int maxConcurrentTasks) { public ThrottledExecutor(Executor executor, int maxConcurrentTasks) {
this.wrappedExecutor = executor; this.wrappedExecutor = executor;
this.semaphore = new Semaphore(maxConcurrentTasks); this.semaphore = new Semaphore(maxConcurrentTasks);
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
public void execute(final Runnable command) { public void execute(final Runnable command) {
try { try {
semaphore.acquire(); semaphore.acquire();
} }
catch (InterruptedException ex) { catch (InterruptedException ex) {
throw new RuntimeException(ex); throw new RuntimeException(ex);
} }


try { try {
wrappedExecutor.execute(() -> { wrappedExecutor.execute(() -> {
try { try {
command.run(); command.run();
} }
finally { finally {
semaphore.release(); semaphore.release();
} }
}); });
} }
catch (RejectedExecutionException ex) { catch (RejectedExecutionException ex) {
semaphore.release(); semaphore.release();
throw new RuntimeException(ex); throw new RuntimeException(ex);
} }
} }
} }
8 changes: 4 additions & 4 deletions src/main/java/com/datumbox/common/dataobjects/Dataframe.java
Expand Up @@ -98,7 +98,7 @@ public static Dataframe parseTextFiles(Map<Object, URI> textFilesMap, AbstractTe


try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(new File(datasetURI)), "UTF8"))) { try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(new File(datasetURI)), "UTF8"))) {
final int baseCounter = dataset.size(); //because we read multiple files we need to keep track of all records added earlier final int baseCounter = dataset.size(); //because we read multiple files we need to keep track of all records added earlier
ThreadMethods.throttledExecution(StreamMethods.enumerate(br.lines(), false), e -> { //WARNING: Do NOT turn on the parallel flags. Any parallelist is taken care of by the throttledExecution() method ThreadMethods.throttledExecution(StreamMethods.enumerate(br.lines()), e -> {
Integer rId = baseCounter + e.getKey(); Integer rId = baseCounter + e.getKey();
String line = e.getValue(); String line = e.getValue();


Expand Down Expand Up @@ -164,7 +164,7 @@ public static Dataframe parseCSVFile(Reader reader, String yVariable, Map<String
.withRecordSeparator(recordSeparator); .withRecordSeparator(recordSeparator);


try (final CSVParser parser = new CSVParser(reader, format)) { try (final CSVParser parser = new CSVParser(reader, format)) {
ThreadMethods.throttledExecution(StreamMethods.enumerate(StreamMethods.stream(parser.spliterator(), false), false), e -> { //WARNING: Do NOT turn on the parallel flags. Any parallelist is taken care of by the throttledExecution() method ThreadMethods.throttledExecution(StreamMethods.enumerate(StreamMethods.stream(parser.spliterator(), false)), e -> {
Integer rId = e.getKey(); Integer rId = e.getKey();
CSVRecord row = e.getValue(); CSVRecord row = e.getValue();


Expand Down Expand Up @@ -311,9 +311,9 @@ public boolean contains(Object o) {
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
public boolean addAll(Collection<? extends Record> c) { public boolean addAll(Collection<? extends Record> c) {
for(Record r : c) { c.stream().forEach(r -> {
add(r); add(r);
} });
return true; return true;
} }


Expand Down

0 comments on commit 47e3fd8

Please sign in to comment.