Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
EAGLE-66 Typesafe Streaming DSL and KeyValue based Grouping
Browse files Browse the repository at this point in the history
- Decouple StreamProducer = StreamInfo + StreamProtocol
- Support typesafe DSL for StreamProducer
- Support KeyedStream and groupByKey
- Decouple ExecutionEnvironment

https://issues.apache.org/jira/browse/EAGLE-66

Author: @haoch <hao@apache.org>
Reviewer: @RalphSu <suliangfei@gmail.com>

Closes #26 #17
  • Loading branch information
haoch committed Dec 16, 2015
1 parent 2734b42 commit 52b8e58
Show file tree
Hide file tree
Showing 99 changed files with 2,535 additions and 1,363 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ public String getTplFileName() {
public void setTplFileName(String tplFileName) { public void setTplFileName(String tplFileName) {
this.tplFileName = tplFileName; this.tplFileName = tplFileName;
} }
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -409,4 +409,4 @@ public void onAlerts(EagleAlertContext context, List<AlertAPIEntity> alerts) {
} }
} }
} }
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@
* Normally storm spout is a special part of storm topology and it is implemented in underlying spout implementation * Normally storm spout is a special part of storm topology and it is implemented in underlying spout implementation
* which can be retrieved from getSpout method. * which can be retrieved from getSpout method.
*/ */
public abstract class AbstractStormSpoutProvider{ public interface StormSpoutProvider {
public abstract BaseRichSpout getSpout(Config context); public BaseRichSpout getSpout(Config context);
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */

package org.apache.eagle.dataproc.impl.storm.hdfs; package org.apache.eagle.dataproc.impl.storm.hdfs;


import com.typesafe.config.Config; import com.typesafe.config.Config;
import org.apache.eagle.dataproc.impl.storm.AbstractStormSpoutProvider; import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.topology.base.BaseRichSpout;


public class HDFSSourcedStormSpoutProvider extends AbstractStormSpoutProvider { public class HDFSSourcedStormSpoutProvider implements StormSpoutProvider {
private static final Logger LOG = LoggerFactory.getLogger(HDFSSourcedStormSpoutProvider.class); private static final Logger LOG = LoggerFactory.getLogger(HDFSSourcedStormSpoutProvider.class);


public abstract static class HDFSSpout extends BaseRichSpout{ public abstract static class HDFSSpout extends BaseRichSpout{
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.topology.base.BaseRichSpout;


import org.apache.eagle.dataproc.impl.storm.AbstractStormSpoutProvider; import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;


public class KafkaSourcedSpoutProvider extends AbstractStormSpoutProvider{ public class KafkaSourcedSpoutProvider implements StormSpoutProvider {
private final static Logger LOG = LoggerFactory.getLogger(KafkaSourcedSpoutProvider.class); private final static Logger LOG = LoggerFactory.getLogger(KafkaSourcedSpoutProvider.class);


public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) { public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
Expand Down Expand Up @@ -89,7 +89,6 @@ public BaseRichSpout getSpout(Config context){
} }


spoutConfig.scheme = getStreamScheme(deserClsName, context); spoutConfig.scheme = getStreamScheme(deserClsName, context);
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); return new KafkaSpout(spoutConfig);
return kafkaSpout;
} }
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
import backtype.storm.spout.Scheme; import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields; import backtype.storm.tuple.Fields;
import com.typesafe.config.Config; import com.typesafe.config.Config;
import org.apache.eagle.datastream.utils.NameConstants;


import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.Map;


/** /**
* This scheme defines how a kafka message is deserialized and the output field name for storm stream * This scheme defines how a kafka message is deserialized and the output field name for storm stream
Expand Down Expand Up @@ -56,10 +56,16 @@ public List<Object> deserialize(byte[] ser) {
// the following tasks are executed within the same process of kafka spout // the following tasks are executed within the same process of kafka spout
return Arrays.asList(tmp); return Arrays.asList(tmp);
} }


/**
* Default only f0, but it requires to be overrode if different
*
* TODO: Handle the schema with KeyValue based structure
*
* @return Fields
*/
@Override @Override
public Fields getOutputFields() { public Fields getOutputFields() {
// return new Fields(deserializer.getOutputFields()); return new Fields(NameConstants.FIELD_PREFIX()+"0");
throw new UnsupportedOperationException("output fields should be declared in sub class of KafkaSourcedSpoutProvider");
} }
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;


public class EagleCustomGrouping implements CustomStreamGrouping { public class CustomPartitionGrouping implements CustomStreamGrouping {


public List<Integer> targetTasks; public List<Integer> targetTasks;
public PartitionStrategy strategy; public PartitionStrategy strategy;


public EagleCustomGrouping(PartitionStrategy strategy) { public CustomPartitionGrouping(PartitionStrategy strategy) {
this.strategy = strategy; this.strategy = strategy;
} }


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields; import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Tuple;
import org.apache.eagle.datastream.utils.NameConstants;


public class JavaMapperStormExecutor extends BaseRichBolt{ public class JavaMapperStormExecutor extends BaseRichBolt{
public static class e1 extends JavaMapperStormExecutor { public static class e1 extends JavaMapperStormExecutor {
Expand Down Expand Up @@ -75,7 +76,7 @@ public void execute(Tuple input) {
public void declareOutputFields(OutputFieldsDeclarer declarer) { public void declareOutputFields(OutputFieldsDeclarer declarer) {
List<String> fields = new ArrayList<String>(); List<String> fields = new ArrayList<String>();
for(int i=0; i<numOutputFields; i++){ for(int i=0; i<numOutputFields; i++){
fields.add(OutputFieldNameConst.FIELD_PREFIX() + i); fields.add(NameConstants.FIELD_PREFIX() + i);
} }
declarer.declare(new Fields(fields)); declarer.declare(new Fields(fields));
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -52,4 +52,5 @@ public void collect(Object o) {
}; };
delegate.flatMap(input, delegateCollector); delegate.flatMap(input, delegateCollector);
} }

} }
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.datastream.utils;

import java.lang.reflect.ParameterizedType;

/**
* @since 12/7/15
*/
class JavaReflections {
@SuppressWarnings("unchecked")
public static Class<?> getGenericTypeClass(final Object obj,int index) {
return (Class<?>) ((ParameterizedType) obj
.getClass()
.getGenericSuperclass()).getActualTypeArguments()[index];
}
}

This file was deleted.

This file was deleted.

Loading

0 comments on commit 52b8e58

Please sign in to comment.