Skip to content

Commit

Permalink
Merge 6b295b8 into 0c4f798
Browse files Browse the repository at this point in the history
  • Loading branch information
dianfu committed Jun 21, 2017
2 parents 0c4f798 + 6b295b8 commit 4c60730
Show file tree
Hide file tree
Showing 9 changed files with 1,269 additions and 38 deletions.
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cep.scala.pattern

import org.apache.flink.cep.pattern.conditions.IterativeCondition
import org.apache.flink.cep.pattern.{GroupPattern => JGroupPattern}

/**
* Base class for a group pattern definition.
*
* @param jGroupPattern Underlying Java API GroupPattern
* @tparam T Base type of the elements appearing in the pattern
* @tparam F Subtype of T to which the current pattern operator is constrained
*/
class GroupPattern[T , F <: T](jGroupPattern: JGroupPattern[T, F])
extends Pattern[T , F](jGroupPattern) {

override def where(condition: IterativeCondition[F] ) =
throw new UnsupportedOperationException ("GroupPattern does not support where clause.")

override def or(condition: IterativeCondition[F] ) =
throw new UnsupportedOperationException ("GroupPattern does not support or clause.")

override def subtype[S <: F](clazz: Class[S]) =
throw new UnsupportedOperationException("GroupPattern does not support subtype clause.")

}

object GroupPattern {

/**
* Constructs a new GroupPattern by wrapping a given Java API GroupPattern
*
* @param jGroupPattern Underlying Java API GroupPattern.
* @tparam T Base type of the elements appearing in the pattern
* @tparam F Subtype of T to which the current pattern operator is constrained
* @return New wrapping GroupPattern object
*/
def apply[T, F <: T](jGroupPattern: JGroupPattern[T, F]) = new GroupPattern[T, F](jGroupPattern)

}
Expand Up @@ -20,7 +20,7 @@ package org.apache.flink.cep.scala.pattern
import org.apache.flink.cep
import org.apache.flink.cep.pattern.conditions.IterativeCondition.{Context => JContext}
import org.apache.flink.cep.pattern.conditions.{IterativeCondition, SimpleCondition}
import org.apache.flink.cep.pattern.{MalformedPatternException, Quantifier, Pattern => JPattern}
import org.apache.flink.cep.pattern.{MalformedPatternException, Quantifier, GroupPattern => JGroupPattern, Pattern => JPattern}
import org.apache.flink.cep.scala.conditions.Context
import org.apache.flink.streaming.api.windowing.time.Time

Expand Down Expand Up @@ -369,6 +369,40 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
this
}

/**
* Appends a new pattern to the existing one. The new pattern enforces non-strict
* temporal contiguity. This means that a matching event of this pattern and the
* preceding matching event might be interleaved with other events which are ignored.
*
* @param pattern the pattern to append
* @return A new pattern which is appended to this one
*/
def followedBy(pattern: Pattern[T, F]): GroupPattern[T, F] =
GroupPattern[T, F](jPattern.followedBy(pattern.jPattern))

/**
* Appends a new pattern to the existing one. The new pattern enforces non-strict
* temporal contiguity. This means that a matching event of this pattern and the
* preceding matching event might be interleaved with other events which are ignored.
*
* @param pattern the pattern to append
* @return A new pattern which is appended to this one
*/
def followedByAny(pattern: Pattern[T, F]): GroupPattern[T, F] =
GroupPattern[T, F](jPattern.followedByAny(pattern.jPattern))

/**
* Appends a new pattern to the existing one. The new pattern enforces strict
* temporal contiguity. This means that the whole pattern sequence matches only
* if an event which matches this pattern directly follows the preceding matching
* event. Thus, there cannot be any events in between two matching events.
*
* @param pattern the pattern to append
* @return A new pattern which is appended to this one
*/
def next(pattern: Pattern[T, F]): GroupPattern[T, F] =
GroupPattern[T, F](jPattern.next(pattern.jPattern))

}

object Pattern {
Expand All @@ -393,4 +427,13 @@ object Pattern {
*/
def begin[X](name: String): Pattern[X, X] = Pattern(JPattern.begin(name))

/**
* Starts a new pattern sequence. The provided pattern is the initial pattern
* of the new sequence.
*
* @param pattern the pattern to begin with
* @return the first pattern of a pattern sequence
*/
def begin[T, F <: T](pattern: Pattern[T, F]): GroupPattern[T, F] =
GroupPattern[T, F](new JGroupPattern[T, F](null, pattern.wrappedPattern))
}

0 comments on commit 4c60730

Please sign in to comment.