Skip to content

Commit

Permalink
FLINK-1420 Small cleanup on code after branch for 0.8 release
Browse files Browse the repository at this point in the history
Small cleanup on code after branch for 0.8 release:
-) Remove semicolons in Scala code for consistencies.
-) Wrap some code in Java that is too long for easy read.
-) Fix constant typo (from GENRAL_OPTIONS to GENERAL_OPTIONS)
-) Remove some unused imports in Scala and Java code.

Author: Henry Saputra <henry.saputra@gmail.com>

Closes #302 from hsaputra/cleanup_code_simple_1 and squashes the following commits:

f98431e [Henry Saputra] Remove not needed semicolons from Scala code for consistency.
73fa587 [Henry Saputra] Move full class name to import for Serializable interface as import in InputSplitSource interface.
a403136 [Henry Saputra] Move full package name to import statement for consistency.
387e0c8 [Henry Saputra] Remove unnecessary parentheses for consistency.
47b6b4c [Henry Saputra] Small cleanup on code after branch for 0.8 release: Remove semicolons in Scala code. Remove some unused imports in Scala and Java code.
  • Loading branch information
hsaputra committed Jan 19, 2015
1 parent bd7819a commit 58e1e44
Show file tree
Hide file tree
Showing 32 changed files with 84 additions and 93 deletions.
Expand Up @@ -284,7 +284,7 @@ class DataStream[T](javaStream: JavaStream[T]) {


val reducer = aggregationType match { val reducer = aggregationType match {
case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(outType.getTypeAt(position). case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(outType.getTypeAt(position).
getTypeClass())); getTypeClass()))
case _ => new agg.ProductComparableAggregator(aggregationType, true) case _ => new agg.ProductComparableAggregator(aggregationType, true)
} }


Expand Down
Expand Up @@ -61,7 +61,7 @@ class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extend
val javaStream = input1.connect(input2).addGeneralWindowCombine( val javaStream = input1.connect(input2).addGeneralWindowCombine(
crossWindowFunction, crossWindowFunction,
returnType, windowSize, returnType, windowSize,
slideInterval, timeStamp1, timeStamp2); slideInterval, timeStamp1, timeStamp2)


new StreamCrossOperator.CrossWindow[I1, I2](this, javaStream) new StreamCrossOperator.CrossWindow[I1, I2](this, javaStream)
} }
Expand Down
Expand Up @@ -49,7 +49,7 @@ object StreamJoinOperator {
class JoinWindow[I1, I2](private[flink]op: StreamJoinOperator[I1, I2]) extends class JoinWindow[I1, I2](private[flink]op: StreamJoinOperator[I1, I2]) extends
TemporalWindow[JoinWindow[I1, I2]] { TemporalWindow[JoinWindow[I1, I2]] {


private[flink] val type1 = op.input1.getType(); private[flink] val type1 = op.input1.getType()


/** /**
* Continues a temporal Join transformation by defining * Continues a temporal Join transformation by defining
Expand Down Expand Up @@ -102,7 +102,7 @@ object StreamJoinOperator {
class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2], class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2],
private[flink] val keys1: KeySelector[I1, _]) { private[flink] val keys1: KeySelector[I1, _]) {
private[flink] var keys2: KeySelector[I2, _] = null private[flink] var keys2: KeySelector[I2, _] = null
private[flink] val type2 = op.input2.getType(); private[flink] val type2 = op.input2.getType()


/** /**
* Creates a temporal join transformation by defining the second join key. * Creates a temporal join transformation by defining the second join key.
Expand Down
Expand Up @@ -238,7 +238,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {


val reducer = aggregationType match { val reducer = aggregationType match {
case AggregationType.SUM => new agg.Sum(SumFunction.getForClass( case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(
outType.getTypeAt(position).getTypeClass())); outType.getTypeAt(position).getTypeClass()))
case _ => new agg.ProductComparableAggregator(aggregationType, true) case _ => new agg.ProductComparableAggregator(aggregationType, true)
} }


Expand Down
Expand Up @@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.scala.windowing


import org.apache.flink.streaming.api.windowing.helper.{ Delta => JavaDelta } import org.apache.flink.streaming.api.windowing.helper.{ Delta => JavaDelta }
import org.apache.commons.lang.Validate import org.apache.commons.lang.Validate
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean; import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction


object Delta { object Delta {
Expand Down
Expand Up @@ -21,8 +21,7 @@ package org.apache.flink.streaming.api.scala.windowing
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.windowing.helper.{ Time => JavaTime } import org.apache.flink.streaming.api.windowing.helper.{ Time => JavaTime }


import org.apache.commons.net.ntp.TimeStamp import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean;
import org.apache.flink.streaming.api.windowing.helper.Timestamp import org.apache.flink.streaming.api.windowing.helper.Timestamp
import org.apache.commons.lang.Validate import org.apache.commons.lang.Validate


Expand Down
Expand Up @@ -104,7 +104,7 @@ public class CliFrontend {
} }


// general options // general options
private static final Options GENRAL_OPTIONS = createGeneralOptions(); private static final Options GENERAL_OPTIONS = createGeneralOptions();


// action options all include the general options // action options all include the general options
private static final Options RUN_OPTIONS = getRunOptions(createGeneralOptions()); private static final Options RUN_OPTIONS = getRunOptions(createGeneralOptions());
Expand Down Expand Up @@ -874,7 +874,7 @@ private void printHelp() {
formatter.setWidth(80); formatter.setWidth(80);
formatter.setLeftPadding(5); formatter.setLeftPadding(5);
formatter.setSyntaxPrefix(" general options:"); formatter.setSyntaxPrefix(" general options:");
formatter.printHelp(" ", GENRAL_OPTIONS); formatter.printHelp(" ", GENERAL_OPTIONS);


printHelpForRun(); printHelpForRun();
printHelpForInfo(); printHelpForInfo();
Expand Down
Expand Up @@ -301,8 +301,7 @@ public JobExecutionResult run(JobGraph jobGraph, boolean wait) throws ProgramInv


ActorRef client = pair._2(); ActorRef client = pair._2();


String hostname = configuration.getString(ConfigConstants String hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
.JOB_MANAGER_IPC_ADDRESS_KEY, null);


FiniteDuration timeout = new FiniteDuration(configuration.getInteger(ConfigConstants FiniteDuration timeout = new FiniteDuration(configuration.getInteger(ConfigConstants
.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS); .AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS);
Expand All @@ -320,8 +319,7 @@ public JobExecutionResult run(JobGraph jobGraph, boolean wait) throws ProgramInv
try { try {


if (wait) { if (wait) {
return JobClient.submitJobAndWait(jobGraph, printStatusDuringExecution, client, return JobClient.submitJobAndWait(jobGraph, printStatusDuringExecution, client, timeout);
timeout);
} }
else { else {
SubmissionResponse response =JobClient.submitJobDetached(jobGraph, client, timeout); SubmissionResponse response =JobClient.submitJobDetached(jobGraph, client, timeout);
Expand Down
Expand Up @@ -491,7 +491,6 @@ private OptimizedPlan compile(Plan program, OptimizerPostPass postPasser) throws
throw new NullPointerException(); throw new NullPointerException();
} }



if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Beginning compilation of program '" + program.getJobName() + '\''); LOG.debug("Beginning compilation of program '" + program.getJobName() + '\'');
} }
Expand Down
Expand Up @@ -18,10 +18,12 @@


package org.apache.flink.core.io; package org.apache.flink.core.io;


import java.io.Serializable;

/** /**
* This interface must be implemented by all kind of input splits that can be assigned to input formats. * This interface must be implemented by all kind of input splits that can be assigned to input formats.
*/ */
public interface InputSplit extends IOReadableWritable, java.io.Serializable { public interface InputSplit extends IOReadableWritable, Serializable {


/** /**
* Returns the number of this input split. * Returns the number of this input split.
Expand Down
Expand Up @@ -18,13 +18,15 @@


package org.apache.flink.core.io; package org.apache.flink.core.io;


import java.io.Serializable;

/** /**
* InputSplitSources create {@link InputSplit}s that define portions of data to be produced * InputSplitSources create {@link InputSplit}s that define portions of data to be produced
* by {@link org.apache.flink.api.common.io.InputFormat}s. * by {@link org.apache.flink.api.common.io.InputFormat}s.
* *
* @param <T> The type of the input splits created by the source. * @param <T> The type of the input splits created by the source.
*/ */
public interface InputSplitSource<T extends InputSplit> extends java.io.Serializable { public interface InputSplitSource<T extends InputSplit> extends Serializable {


/** /**
* Computes the input splits. The given minimum number of splits is a hint as to how * Computes the input splits. The given minimum number of splits is a hint as to how
Expand Down
Expand Up @@ -34,7 +34,7 @@ object DeltaPageRank {


def main(args: Array[String]) { def main(args: Array[String]) {


val maxIterations = 100; val maxIterations = 100


val env = ExecutionEnvironment.getExecutionEnvironment val env = ExecutionEnvironment.getExecutionEnvironment


Expand Down Expand Up @@ -64,10 +64,10 @@ object DeltaPageRank {
} }


// random jump to self // random jump to self
out.collect((adj._1, RANDOM_JUMP)); out.collect((adj._1, RANDOM_JUMP))
} }
} }
.groupBy(0).sum(1); .groupBy(0).sum(1)


val initialDeltas = initialRanks.map { (page) => (page._1, page._2 - INITIAL_RANK) } val initialDeltas = initialRanks.map { (page) => (page._1, page._2 - INITIAL_RANK) }


Expand All @@ -78,16 +78,16 @@ object DeltaPageRank {
val deltas = workset.join(adjacency).where(0).equalTo(0) { val deltas = workset.join(adjacency).where(0).equalTo(0) {
(lastDeltas, adj, out: Collector[Page]) => (lastDeltas, adj, out: Collector[Page]) =>
{ {
val targets = adj._2; val targets = adj._2
val deltaPerTarget = DAMPENING_FACTOR * lastDeltas._2 / targets.length; val deltaPerTarget = DAMPENING_FACTOR * lastDeltas._2 / targets.length


for (target <- targets) { for (target <- targets) {
out.collect((target, deltaPerTarget)); out.collect((target, deltaPerTarget))
} }
} }
} }
.groupBy(0).sum(1) .groupBy(0).sum(1)
.filter(x => Math.abs(x._2) > THRESHOLD); .filter(x => Math.abs(x._2) > THRESHOLD)


val rankUpdates = solutionSet.join(deltas).where(0).equalTo(0) { val rankUpdates = solutionSet.join(deltas).where(0).equalTo(0) {
(current, delta) => (current._1, current._2 + delta._2) (current, delta) => (current._1, current._2 + delta._2)
Expand All @@ -99,6 +99,6 @@ object DeltaPageRank {


iteration.print() iteration.print()


env.execute("Page Rank"); env.execute("Page Rank")
} }
} }
Expand Up @@ -146,7 +146,7 @@ object TPCHQuery10 {
" Due to legal restrictions, we can not ship generated data.\n" + " Due to legal restrictions, we can not ship generated data.\n" +
" You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
" Usage: TPCHQuery10 <customer-csv path> <orders-csv path> " + " Usage: TPCHQuery10 <customer-csv path> <orders-csv path> " +
"<lineitem-csv path> <nation-csv path> <result path>"); "<lineitem-csv path> <nation-csv path> <result path>")
false false
} }
} }
Expand Down
Expand Up @@ -143,7 +143,7 @@ object TPCHQuery3 {
" Due to legal restrictions, we can not ship generated data.\n" + " Due to legal restrictions, we can not ship generated data.\n" +
" You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
" Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" + " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" +
"<orders-csv path> <result path>"); "<orders-csv path> <result path>")
false false
} }
} }
Expand Down
Expand Up @@ -53,11 +53,12 @@ protected MapFunction<IN, OUT> getFunction() {
} }


@Override @Override
protected org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) { protected MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {


String name = getName() != null ? getName() : "Map at "+defaultName; String name = getName() != null ? getName() : "Map at "+defaultName;
// create operator // create operator
MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> po = new MapOperatorBase<IN, OUT, MapFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name); MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> po = new MapOperatorBase<IN, OUT, MapFunction<IN, OUT>>(function,
new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
// set input // set input
po.setInput(input); po.setInput(input);
// set dop // set dop
Expand Down
Expand Up @@ -38,8 +38,8 @@ import scala.concurrent.{TimeoutException, Await}
import scala.concurrent.duration.{FiniteDuration} import scala.concurrent.duration.{FiniteDuration}




class JobClient(jobManagerURL: String, timeout: FiniteDuration) extends Actor with ActorLogMessages class JobClient(jobManagerURL: String, timeout: FiniteDuration)
with ActorLogging{ extends Actor with ActorLogMessages with ActorLogging{
import context._ import context._


val jobManager = AkkaUtils.getReference(jobManagerURL)(system, timeout) val jobManager = AkkaUtils.getReference(jobManagerURL)(system, timeout)
Expand Down Expand Up @@ -100,9 +100,9 @@ object JobClient{
case url: String => url case url: String => url
case _ => case _ =>
val jobManagerAddress = configuration.getString(ConfigConstants val jobManagerAddress = configuration.getString(ConfigConstants
.JOB_MANAGER_IPC_ADDRESS_KEY, null); .JOB_MANAGER_IPC_ADDRESS_KEY, null)
val jobManagerRPCPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, val jobManagerRPCPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)


if (jobManagerAddress == null) { if (jobManagerAddress == null) {
throw new RuntimeException("JobManager address has not been specified in the " + throw new RuntimeException("JobManager address has not been specified in the " +
Expand Down
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.runtime.jobmanager package org.apache.flink.runtime.jobmanager


import java.io.File import java.io.File
import java.net.{InetSocketAddress} import java.net.InetSocketAddress
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit


import akka.actor._ import akka.actor._
Expand All @@ -35,7 +35,7 @@ import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.{JobException, ActorLogMessages} import org.apache.flink.runtime.{JobException, ActorLogMessages}
import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.instance.{InstanceManager} import org.apache.flink.runtime.instance.InstanceManager
import org.apache.flink.runtime.jobgraph.{JobStatus, JobID} import org.apache.flink.runtime.jobgraph.{JobStatus, JobID}
import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager
import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
Expand All @@ -45,18 +45,18 @@ import org.apache.flink.runtime.messages.TaskManagerMessages.{NextInputSplit, He
import org.apache.flink.runtime.profiling.ProfilingUtils import org.apache.flink.runtime.profiling.ProfilingUtils
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory


import scala.concurrent.{Future} import scala.concurrent.Future
import scala.concurrent.duration._ import scala.concurrent.duration._


class JobManager(val configuration: Configuration) extends class JobManager(val configuration: Configuration)
Actor with ActorLogMessages with ActorLogging { extends Actor with ActorLogMessages with ActorLogging {
import context._ import context._
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._


implicit val timeout = FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT, implicit val timeout = FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS) ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)


Execution.timeout = timeout; Execution.timeout = timeout


log.info(s"Starting job manager at ${self.path}.") log.info(s"Starting job manager at ${self.path}.")


Expand Down Expand Up @@ -115,7 +115,7 @@ Actor with ActorLogMessages with ActorLogging {
hardwareInformation, numberOfSlots) hardwareInformation, numberOfSlots)


// to be notified when the taskManager is no longer reachable // to be notified when the taskManager is no longer reachable
context.watch(taskManager); context.watch(taskManager)


taskManager ! AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort) taskManager ! AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)
} }
Expand Down Expand Up @@ -532,7 +532,7 @@ object JobManager {
ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000 ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000


val executionRetries = configuration.getInteger(ConfigConstants val executionRetries = configuration.getInteger(ConfigConstants
.DEFAULT_EXECUTION_RETRIES_KEY, ConfigConstants.DEFAULT_EXECUTION_RETRIES); .DEFAULT_EXECUTION_RETRIES_KEY, ConfigConstants.DEFAULT_EXECUTION_RETRIES)


val delayBetweenRetries = 2 * configuration.getLong( val delayBetweenRetries = 2 * configuration.getLong(
ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY,
Expand Down
Expand Up @@ -123,7 +123,7 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration,
def stop(): Unit = { def stop(): Unit = {
LOG.info("Stopping FlinkMiniCluster.") LOG.info("Stopping FlinkMiniCluster.")
shutdown() shutdown()
awaitTermination(); awaitTermination()
} }


def shutdown(): Unit = { def shutdown(): Unit = {
Expand Down

0 comments on commit 58e1e44

Please sign in to comment.