Skip to content
This repository has been archived by the owner on Feb 26, 2024. It is now read-only.

ENH: right and full outer join for Bloom filter strategy #9

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.lib.join;

import org.junit.AfterClass;
import org.junit.BeforeClass;

public class BloomFilterFullOuterJoinIT extends AbstractFullOuterJoinIT {

private static String saveTempDir;

@BeforeClass
public static void setUpClass(){

// Ensure a consistent temporary directory for use of the DistributedCache.

// The DistributedCache technically isn't supported when running in local mode, and the default
// temporary directiory "/tmp" is used as its location. This typically only causes an issue when
// running integration tests on Mac OS X, as OS X doesn't use "/tmp" as it's default temporary
// directory. The following call ensures that "/tmp" is used as the temporary directory on all platforms.
saveTempDir = System.setProperty("java.io.tmpdir", "/tmp");
}

@AfterClass
public static void tearDownClass(){
System.setProperty("java.io.tmpdir", saveTempDir);
}

@Override
protected <K, U, V> JoinStrategy<K, U, V> getJoinStrategy() {
return new BloomFilterJoinStrategy<K, U, V>(20000);
}

}
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.lib.join;

import org.junit.AfterClass;
import org.junit.BeforeClass;

public class BloomFilterRightOuterJoinIT extends AbstractRightOuterJoinIT {

private static String saveTempDir;

@BeforeClass
public static void setUpClass(){

// Ensure a consistent temporary directory for use of the DistributedCache.

// The DistributedCache technically isn't supported when running in local mode, and the default
// temporary directiory "/tmp" is used as its location. This typically only causes an issue when
// running integration tests on Mac OS X, as OS X doesn't use "/tmp" as it's default temporary
// directory. The following call ensures that "/tmp" is used as the temporary directory on all platforms.
saveTempDir = System.setProperty("java.io.tmpdir", "/tmp");
}

@AfterClass
public static void tearDownClass(){
System.setProperty("java.io.tmpdir", saveTempDir);
}

@Override
protected <K, U, V> JoinStrategy<K, U, V> getJoinStrategy() {
return new BloomFilterJoinStrategy<K, U, V>(20000);
}

}
Expand Up @@ -23,8 +23,9 @@
import org.apache.hadoop.mapreduce.TaskInputOutputContext;

/**
* Wrapper function for converting a {@code MapFn} into a key-value pair that is
* used to convert from a {@code PCollection<V>} to a {@code PTable<K, V>}.
* Wrapper function for converting a key-from-value extractor {@code MapFn<V, K>} into a
* key-value pair extractor that is used to convert from a {@code PCollection<V>} to a
* {@code PTable<K, V>}.
*/
public class ExtractKeyFn<K, V> extends MapFn<V, Pair<K, V>> {

Expand Down
Expand Up @@ -28,8 +28,7 @@ public static <T> IdentityFn<T> getInstance() {
return (IdentityFn<T>) INSTANCE;
}

// Non-instantiable
private IdentityFn() {
protected IdentityFn() {
}

@Override
Expand Down
Expand Up @@ -33,6 +33,9 @@
import org.apache.crunch.Pair;
import org.apache.crunch.ParallelDoOptions;
import org.apache.crunch.ReadableData;
import org.apache.crunch.fn.ExtractKeyFn;
import org.apache.crunch.fn.FilterFns;
import org.apache.crunch.fn.IdentityFn;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
import org.apache.crunch.types.avro.AvroMode;
Expand All @@ -57,6 +60,10 @@
* This strategy is useful in cases where the right-side table contains many keys that are not
* present in the left-side table. In this case, the use of the Bloom filter avoids a
* potentially costly shuffle phase for data that would never be joined to the left side.
* <p>
* Implementation Note: right and full outer join type are handled by splitting the right-side
* table (the bigger one) into two disjunctive streams: negatively filtered (right outer part)
* and positively filtered (passed to delegate strategy).
*/
public class BloomFilterJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {

Expand Down Expand Up @@ -120,30 +127,42 @@ private static int getOptimalNumHash(int numElements, float vectorSize) {
@Override
public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinType joinType) {

if (joinType != JoinType.INNER_JOIN && joinType != JoinType.LEFT_OUTER_JOIN) {
throw new IllegalStateException("JoinType " + joinType + " is not supported for BloomFilter joins");
}

PTable<K,V> filteredRightSide;
PType<BloomFilter> bloomFilterType = getBloomFilterType(left.getTypeFamily());
PCollection<BloomFilter> bloomFilters = left.keys().parallelDo(
"Create bloom filters",
new CreateBloomFilterFn(vectorSize, nbHash, left.getKeyType()),
new CreateBloomFilterFn<>(vectorSize, nbHash, left.getKeyType()),
bloomFilterType);

ReadableData<BloomFilter> bloomData = bloomFilters.asReadable(true);
FilterKeysWithBloomFilterFn<K, V> filterKeysFn = new FilterKeysWithBloomFilterFn<K, V>(
bloomData,
vectorSize, nbHash,
left.getKeyType());
FilterKeysWithBloomFilterFn<K, V> filterKeysFn = new FilterKeysWithBloomFilterFn<>(
bloomData, vectorSize, nbHash, left.getKeyType());

if (joinType != JoinType.INNER_JOIN && joinType != JoinType.LEFT_OUTER_JOIN) {
right = right.parallelDo(
"disable deep copy", new DeepCopyDisablerFn<Pair<K, V>>(), right.getPTableType());
}

ParallelDoOptions.Builder optionsBuilder = ParallelDoOptions.builder();
optionsBuilder.sourceTargets(bloomData.getSourceTargets());
ParallelDoOptions options = ParallelDoOptions.builder()
.sourceTargets(bloomData.getSourceTargets()).build();
PTable<K, V> filteredRightSide = right.parallelDo(
"Filter right-side with BloomFilters",
filterKeysFn, right.getPTableType(), options);

filteredRightSide = right.parallelDo("Filter right side with BloomFilters",
filterKeysFn, right.getPTableType(), optionsBuilder.build());
PTable<K, Pair<U, V>> leftJoinedWithFilteredRight = delegateJoinStrategy
.join(left, filteredRightSide, joinType);

return delegateJoinStrategy.join(left, filteredRightSide, joinType);
if (joinType == JoinType.INNER_JOIN || joinType == JoinType.LEFT_OUTER_JOIN) {
return leftJoinedWithFilteredRight;
}

return leftJoinedWithFilteredRight.union(
right
.parallelDo(
"Negatively filter right-side with BloomFilters",
FilterFns.not(filterKeysFn), right.getPTableType(), options)
.mapValues(
"Right outer join: attach null as left-value",
new NullKeyFn<U, V>(), leftJoinedWithFilteredRight.getValueType()));
}

/**
Expand Down Expand Up @@ -324,4 +343,36 @@ public byte[] map(T input) {

}

/**
* Converts value into a null-value pair. It is used to convert negatively filtered
* right-side values into right outer join part.
*/
private static class NullKeyFn<K, V> extends ExtractKeyFn<K, V> {
public NullKeyFn() {
super(new MapFn<V, K>() {
@Override public K map(V input) {
return null;
}

@Override public float scaleFactor() {
return 0.0001f;
}
});
}
}

/**
* Right and full outer join types are handled by splitting the right-side table (the bigger one)
* into two disjunctive streams: negatively filtered (right outer part) and positively filtered.
* To prevent concurrent modification Crunch performs a deep copy of such a splitted stream by
* default (see {@link DoFn#disableDeepCopy()}), which introduces an extra overhead. Since Bloom
* Filter directs every record to exactly one of these streams, making concurrent modification
* impossible, we can safely disable this feature. To achieve this we put the {@code right} PTable
* through a {@code parallelDo} call with this {@code DoFn}.
*/
private static class DeepCopyDisablerFn<T> extends IdentityFn<T> {
@Override public boolean disableDeepCopy() {
return true;
}
}
}