diff --git a/hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java b/hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java index 79c89aee..4d7158f8 100644 --- a/hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java +++ b/hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java @@ -19,15 +19,20 @@ package org.apache.flink.training.exercises.hourlytips; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.training.exercises.common.datatypes.TaxiFare; import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator; -import org.apache.flink.training.exercises.common.utils.MissingSolutionException; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.util.Collector; +import org.apache.flink.streaming.api.windowing.time.Time; /** * The Hourly Tips exercise from the Flink training. @@ -73,13 +78,30 @@ public JobExecutionResult execute() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // start the data generator - DataStream fares = env.addSource(source); - - // replace this with your solution - if (true) { - throw new MissingSolutionException(); - } - + DataStream fares = + env.addSource(source) + .assignTimestampsAndWatermarks( + // taxi fares are in order + WatermarkStrategy.forMonotonousTimestamps() + .withTimestampAssigner( + (fare, t) -> fare.getEventTimeMillis())); + // compute tips per hour for each driver + DataStream> hourlyTips = + fares.keyBy((TaxiFare fare) -> fare.driverId) + .window(TumblingEventTimeWindows.of(Time.hours(1))) + .process(new AddTips()); + + // find the driver with the highest sum of tips for each hour + DataStream> hourlyMax = + hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2); + + /* You should explore how this alternative (commented out below) behaves. + * In what ways is the same as, and different from, the solution above (using a windowAll)? + */ + + // DataStream> hourlyMax = hourlyTips.keyBy(t -> t.f0).maxBy(2); + + hourlyMax.addSink(sink); // the results should be sent to the sink that was passed in // (otherwise the tests won't work) // you can end the pipeline with something like this: @@ -90,4 +112,21 @@ public JobExecutionResult execute() throws Exception { // execute the pipeline and return the result return env.execute("Hourly Tips"); } + + /* + * Wraps the pre-aggregated result into a tuple along with the window's timestamp and key. + */ + public static class AddTips + extends ProcessWindowFunction, Long, TimeWindow> { + + @Override + public void process(Long key, ProcessWindowFunction, Long, TimeWindow>.Context context, Iterable elements, Collector> out) throws Exception { + + float sumOfTips = 0F; + for (TaxiFare f : elements) { + sumOfTips += f.tip; + } + out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips)); + } + } } diff --git a/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java b/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java index f555e5da..f6e55272 100644 --- a/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java +++ b/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java @@ -21,6 +21,10 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -30,7 +34,6 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.training.exercises.common.datatypes.TaxiRide; import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator; -import org.apache.flink.training.exercises.common.utils.MissingSolutionException; import org.apache.flink.util.Collector; import java.time.Duration; @@ -98,17 +101,78 @@ public static void main(String[] args) throws Exception { @VisibleForTesting public static class AlertFunction extends KeyedProcessFunction { + private ValueState rideState; + @Override public void open(Configuration config) throws Exception { - throw new MissingSolutionException(); + ValueStateDescriptor rideStateDescriptor = + new ValueStateDescriptor<>("ride event", TaxiRide.class); + //expiring state after 3 hrs to clear any state leaks + StateTtlConfig ttlConfig = StateTtlConfig + .newBuilder(Time.hours(3)) + .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) + .build(); + rideStateDescriptor.enableTimeToLive(ttlConfig); + rideState = getRuntimeContext().getState(rideStateDescriptor); } @Override public void processElement(TaxiRide ride, Context context, Collector out) - throws Exception {} + throws Exception { + TaxiRide firstRideEvent = rideState.value(); + + if (firstRideEvent == null) { + // first event should be saved + rideState.update(ride); + + if (ride.isStart) { + // we will use this timer to check for rides that have gone on too long and may + // not yet have an END event (or the END event could be missing) + context.timerService().registerEventTimeTimer(getTimerTime(ride)); + } + } else { + if (ride.isStart) { + if (rideTooLong(ride, firstRideEvent)) { + out.collect(ride.rideId); + } + } else { + // the first ride was a START event, delete the timer + context.timerService().deleteEventTimeTimer(getTimerTime(firstRideEvent)); + + // check if ride has gone too long + if (rideTooLong(firstRideEvent, ride)) { + out.collect(ride.rideId); + } + } + + // both events have now been seen, we can clear the state + rideState.clear(); + } + } @Override public void onTimer(long timestamp, OnTimerContext context, Collector out) - throws Exception {} + throws Exception { + // the timer only fires if the ride was too long + out.collect(rideState.value().rideId); + + // clearing state to prevent duplicate alerts + rideState.clear(); + } + private boolean rideTooLong(TaxiRide startEvent, TaxiRide endEvent) { + return Duration.between(startEvent.eventTime, endEvent.eventTime) + .compareTo(Duration.ofHours(2)) + > 0; + } + + private long getTimerTime(TaxiRide ride) throws RuntimeException { + if (ride.isStart) { + return ride.eventTime.plusSeconds(120 * 60).toEpochMilli(); + } else { + throw new RuntimeException("Can not get start time from END event."); + } + } } } + diff --git a/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java b/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java index 1f07312f..c340f93c 100644 --- a/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java +++ b/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java @@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.training.exercises.common.datatypes.TaxiRide; import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator; +import org.apache.flink.training.exercises.common.utils.GeoUtils; import org.apache.flink.training.exercises.common.utils.MissingSolutionException; /** @@ -80,7 +81,8 @@ public JobExecutionResult execute() throws Exception { public static class NYCFilter implements FilterFunction { @Override public boolean filter(TaxiRide taxiRide) throws Exception { - throw new MissingSolutionException(); + return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat) && + GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat); } } } diff --git a/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java b/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java index 0662dfc0..ee02ead8 100644 --- a/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java +++ b/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java @@ -19,6 +19,8 @@ package org.apache.flink.training.exercises.ridesandfares; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -99,19 +101,40 @@ public static void main(String[] args) throws Exception { public static class EnrichmentFunction extends RichCoFlatMapFunction { + private ValueState rideState; + private ValueState fareState; + @Override public void open(Configuration config) throws Exception { - throw new MissingSolutionException(); + rideState = + getRuntimeContext() + .getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class)); + fareState = + getRuntimeContext() + .getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class)); } @Override public void flatMap1(TaxiRide ride, Collector out) throws Exception { - throw new MissingSolutionException(); + + TaxiFare fare = fareState.value(); + if (fare != null) { + fareState.clear(); + out.collect(new RideAndFare(ride, fare)); + } else { + rideState.update(ride); + } } @Override public void flatMap2(TaxiFare fare, Collector out) throws Exception { - throw new MissingSolutionException(); + TaxiRide ride = rideState.value(); + if (ride != null) { + rideState.clear(); + out.collect(new RideAndFare(ride, fare)); + } else { + fareState.update(fare); + } } } }