Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@
package org.apache.flink.training.exercises.hourlytips;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator;
import org.apache.flink.training.exercises.common.utils.MissingSolutionException;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.util.Collector;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
* The Hourly Tips exercise from the Flink training.
Expand Down Expand Up @@ -73,13 +78,30 @@ public JobExecutionResult execute() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start the data generator
DataStream<TaxiFare> fares = env.addSource(source);

// replace this with your solution
if (true) {
throw new MissingSolutionException();
}

DataStream<TaxiFare> fares =
env.addSource(source)
.assignTimestampsAndWatermarks(
// taxi fares are in order
WatermarkStrategy.<TaxiFare>forMonotonousTimestamps()
.withTimestampAssigner(
(fare, t) -> fare.getEventTimeMillis()));
// compute tips per hour for each driver
DataStream<Tuple3<Long, Long, Float>> hourlyTips =
fares.keyBy((TaxiFare fare) -> fare.driverId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.process(new AddTips());

// find the driver with the highest sum of tips for each hour
DataStream<Tuple3<Long, Long, Float>> hourlyMax =
hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);

/* You should explore how this alternative (commented out below) behaves.
* In what ways is the same as, and different from, the solution above (using a windowAll)?
*/

// DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips.keyBy(t -> t.f0).maxBy(2);

hourlyMax.addSink(sink);
// the results should be sent to the sink that was passed in
// (otherwise the tests won't work)
// you can end the pipeline with something like this:
Expand All @@ -90,4 +112,21 @@ public JobExecutionResult execute() throws Exception {
// execute the pipeline and return the result
return env.execute("Hourly Tips");
}

/*
* Wraps the pre-aggregated result into a tuple along with the window's timestamp and key.
*/
public static class AddTips
extends ProcessWindowFunction<TaxiFare, Tuple3<Long, Long, Float>, Long, TimeWindow> {

@Override
public void process(Long key, ProcessWindowFunction<TaxiFare, Tuple3<Long, Long, Float>, Long, TimeWindow>.Context context, Iterable<TaxiFare> elements, Collector<Tuple3<Long, Long, Float>> out) throws Exception {

float sumOfTips = 0F;
for (TaxiFare f : elements) {
sumOfTips += f.tip;
}
out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -30,7 +34,6 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
import org.apache.flink.training.exercises.common.utils.MissingSolutionException;
import org.apache.flink.util.Collector;

import java.time.Duration;
Expand Down Expand Up @@ -98,17 +101,78 @@ public static void main(String[] args) throws Exception {
@VisibleForTesting
public static class AlertFunction extends KeyedProcessFunction<Long, TaxiRide, Long> {

private ValueState<TaxiRide> rideState;

@Override
public void open(Configuration config) throws Exception {
throw new MissingSolutionException();
ValueStateDescriptor<TaxiRide> rideStateDescriptor =
new ValueStateDescriptor<>("ride event", TaxiRide.class);
//expiring state after 3 hrs to clear any state leaks
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(3))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
rideStateDescriptor.enableTimeToLive(ttlConfig);
rideState = getRuntimeContext().getState(rideStateDescriptor);
}

@Override
public void processElement(TaxiRide ride, Context context, Collector<Long> out)
throws Exception {}
throws Exception {
TaxiRide firstRideEvent = rideState.value();

if (firstRideEvent == null) {
// first event should be saved
rideState.update(ride);

if (ride.isStart) {
// we will use this timer to check for rides that have gone on too long and may
// not yet have an END event (or the END event could be missing)
context.timerService().registerEventTimeTimer(getTimerTime(ride));
}
} else {
if (ride.isStart) {
if (rideTooLong(ride, firstRideEvent)) {
out.collect(ride.rideId);
}
} else {
// the first ride was a START event, delete the timer
context.timerService().deleteEventTimeTimer(getTimerTime(firstRideEvent));

// check if ride has gone too long
if (rideTooLong(firstRideEvent, ride)) {
out.collect(ride.rideId);
}
}

// both events have now been seen, we can clear the state
rideState.clear();
}
}

@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<Long> out)
throws Exception {}
throws Exception {
// the timer only fires if the ride was too long
out.collect(rideState.value().rideId);

// clearing state to prevent duplicate alerts
rideState.clear();
}
private boolean rideTooLong(TaxiRide startEvent, TaxiRide endEvent) {
return Duration.between(startEvent.eventTime, endEvent.eventTime)
.compareTo(Duration.ofHours(2))
> 0;
}

private long getTimerTime(TaxiRide ride) throws RuntimeException {
if (ride.isStart) {
return ride.eventTime.plusSeconds(120 * 60).toEpochMilli();
} else {
throw new RuntimeException("Can not get start time from END event.");
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
import org.apache.flink.training.exercises.common.utils.GeoUtils;
import org.apache.flink.training.exercises.common.utils.MissingSolutionException;

/**
Expand Down Expand Up @@ -80,7 +81,8 @@ public JobExecutionResult execute() throws Exception {
public static class NYCFilter implements FilterFunction<TaxiRide> {
@Override
public boolean filter(TaxiRide taxiRide) throws Exception {
throw new MissingSolutionException();
return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat) &&
GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.training.exercises.ridesandfares;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -99,19 +101,40 @@ public static void main(String[] args) throws Exception {
public static class EnrichmentFunction
extends RichCoFlatMapFunction<TaxiRide, TaxiFare, RideAndFare> {

private ValueState<TaxiRide> rideState;
private ValueState<TaxiFare> fareState;

@Override
public void open(Configuration config) throws Exception {
throw new MissingSolutionException();
rideState =
getRuntimeContext()
.getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));
fareState =
getRuntimeContext()
.getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));
}

@Override
public void flatMap1(TaxiRide ride, Collector<RideAndFare> out) throws Exception {
throw new MissingSolutionException();

TaxiFare fare = fareState.value();
if (fare != null) {
fareState.clear();
out.collect(new RideAndFare(ride, fare));
} else {
rideState.update(ride);
}
}

@Override
public void flatMap2(TaxiFare fare, Collector<RideAndFare> out) throws Exception {
throw new MissingSolutionException();
TaxiRide ride = rideState.value();
if (ride != null) {
rideState.clear();
out.collect(new RideAndFare(ride, fare));
} else {
fareState.update(fare);
}
}
}
}