Skip to content
Permalink
Browse files
Enabling Filters on Active Datasets
Change-Id: I499b1d9a1525ba5ea9396d21118c6f6cf3e870d2
  • Loading branch information
idleft committed Sep 19, 2020
1 parent 0bc1dfd commit 9ffb0122ca9cec651d5f95b0770fd07baebcf019
Showing 33 changed files with 830 additions and 305 deletions.
@@ -1,18 +1,18 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
@@ -135,6 +135,7 @@ public void initializeMetadata(INCServiceContext appCtx)
BuiltinFunctions.addFunction(BADFunctions.CURRENT_CHANNEL_TIME, ADateTimeTypeComputer.INSTANCE, false);
BuiltinFunctions.addFunction(BADFunctions.PREVIOUS_CHANNEL_TIME, ADateTimeTypeComputer.INSTANCE, false);
BuiltinFunctions.addFunction(BADFunctions.IS_NEW, ABooleanTypeComputer.INSTANCE, true);
BuiltinFunctions.addFunction(BADFunctions.ACTIVE_TIMESTAMP, ADateTimeTypeComputer.INSTANCE, true);

// to shadow the master feed rewriter
BuiltinFunctions.addPrivateFunction(BuiltinFunctions.FEED_COLLECT, BADFeedRewriter.INSTANCE, true);
@@ -44,13 +44,15 @@ public BADLSMPrimaryInsertOperatorNodePushable(IHyracksTaskContext ctx, int part

@Override
protected void beforeModification(ITupleReference tuple) {
if ((tuple.getFieldCount() == 3
&& tuple.getFieldData(0)[tuple.getFieldStart(2)] == ATypeTag.SERIALIZED_RECORD_TYPE_TAG)
|| (tuple.getFieldCount() == 4
&& tuple.getFieldData(0)[tuple.getFieldStart(2)] == ATypeTag.SERIALIZED_RECORD_TYPE_TAG)) {
int targetIdx = tuple.getFieldStart(2) + 14;
if (tuple.getFieldCount() >= 3
&& tuple.getFieldData(0)[tuple.getFieldStart(2)] == ATypeTag.SERIALIZED_RECORD_TYPE_TAG
&& tuple.getFieldLength(2) == 22) {
long currMilli = System.currentTimeMillis();
ByteBuffer tupleBuff = ByteBuffer.wrap(tuple.getFieldData(0));
tupleBuff.putLong(targetIdx, System.currentTimeMillis());
tupleBuff.putLong(tuple.getFieldStart(2) + 14, currMilli);
if (tuple.getFieldCount() == 4) {
tupleBuff.putLong(tuple.getFieldStart(3) + 1, currMilli);
}
}
}
}
@@ -37,19 +37,19 @@ public BADLSMPrimaryUpsertOperatorDescriptor(IOperatorDescriptorRegistry spec, R
IMissingWriterFactory missingWriterFactory,
IModificationOperationCallbackFactory modificationOpCallbackFactory,
ISearchOperationCallbackFactory searchOpCallbackFactory,
IFrameOperationCallbackFactory frameOpCallbackFactory, int numPrimaryKeys, ARecordType recordType,
int filterIndex, boolean hasSecondaries) {
IFrameOperationCallbackFactory frameOpCallbackFactory, int numPrimaryKeys, Integer filterSourceIndicator,
ARecordType itemType, int filterIndex, boolean hasSecondaries) {
super(spec, outRecDesc, fieldPermutation, indexHelperFactory, missingWriterFactory,
modificationOpCallbackFactory, searchOpCallbackFactory, frameOpCallbackFactory, numPrimaryKeys,
recordType, filterIndex, hasSecondaries);
filterSourceIndicator, itemType, filterIndex, hasSecondaries);
}

@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
RecordDescriptor intputRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
return new BADLSMPrimaryUpsertOperatorNodePushable(ctx, partition, indexHelperFactory, fieldPermutation,
intputRecDesc, modCallbackFactory, searchOpCallbackFactory, numPrimaryKeys, recordType, filterIndex,
frameOpCallbackFactory, missingWriterFactory, hasSecondaries);
intputRecDesc, modCallbackFactory, searchOpCallbackFactory, numPrimaryKeys, filterSourceIndicator,
filterItemType, filterIndex, frameOpCallbackFactory, missingWriterFactory, hasSecondaries);
}
}
@@ -36,23 +36,25 @@ public class BADLSMPrimaryUpsertOperatorNodePushable extends LSMPrimaryUpsertOpe
public BADLSMPrimaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
IModificationOperationCallbackFactory modCallbackFactory,
ISearchOperationCallbackFactory searchCallbackFactory, int numOfPrimaryKeys, ARecordType recordType,
int filterFieldIndex, IFrameOperationCallbackFactory frameOpCallbackFactory,
ISearchOperationCallbackFactory searchCallbackFactory, int numOfPrimaryKeys, Integer filterSourceIndicator,
ARecordType recordType, int filterFieldIndex, IFrameOperationCallbackFactory frameOpCallbackFactory,
IMissingWriterFactory missingWriterFactory, boolean hasSecondaries) throws HyracksDataException {
super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, modCallbackFactory,
searchCallbackFactory, numOfPrimaryKeys, recordType, filterFieldIndex, frameOpCallbackFactory,
missingWriterFactory, hasSecondaries);
searchCallbackFactory, numOfPrimaryKeys, filterSourceIndicator, recordType, filterFieldIndex,
frameOpCallbackFactory, missingWriterFactory, hasSecondaries);
}

@Override
protected void beforeModification(ITupleReference tuple) {
if ((tuple.getFieldCount() == 3
&& tuple.getFieldData(0)[tuple.getFieldStart(2)] == ATypeTag.SERIALIZED_RECORD_TYPE_TAG)
|| (tuple.getFieldCount() == 4
&& tuple.getFieldData(0)[tuple.getFieldStart(2)] == ATypeTag.SERIALIZED_RECORD_TYPE_TAG)) {
int targetIdx = tuple.getFieldStart(2) + 14;
if (tuple.getFieldCount() >= 3
&& tuple.getFieldData(0)[tuple.getFieldStart(2)] == ATypeTag.SERIALIZED_RECORD_TYPE_TAG
&& tuple.getFieldLength(2) == 22) {
long currMilli = System.currentTimeMillis();
ByteBuffer tupleBuff = ByteBuffer.wrap(tuple.getFieldData(0));
tupleBuff.putLong(targetIdx, System.currentTimeMillis());
tupleBuff.putLong(tuple.getFieldStart(2) + 14, currMilli);
if (tuple.getFieldCount() == 4) {
tupleBuff.putLong(tuple.getFieldStart(3) + 1, currMilli);
}
}
}
}
@@ -1,22 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing,
* * software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* * KIND, either express or implied. See the License for the
* * specific language governing permissions and limitations
* * under the License.
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.asterix.bad.function;
@@ -1,22 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing,
* * software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* * KIND, either express or implied. See the License for the
* * specific language governing permissions and limitations
* * under the License.
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.bad.function;

@@ -1,22 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing,
* * software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* * KIND, either express or implied. See the License for the
* * specific language governing permissions and limitations
* * under the License.
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.asterix.bad.function;
@@ -1,22 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing,
* * software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* * KIND, either express or implied. See the License for the
* * specific language governing permissions and limitations
* * under the License.
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.bad.function.runtime;

@@ -1,22 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing,
* * software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* * KIND, either express or implied. See the License for the
* * specific language governing permissions and limitations
* * under the License.
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.bad.function.runtime;

0 comments on commit 9ffb012

Please sign in to comment.