From b06546ce79dc189758b3bea0b4e9326fc8273f14 Mon Sep 17 00:00:00 2001 From: Evyatar Date: Sun, 12 Aug 2018 09:03:31 +0300 Subject: [PATCH 1/6] fix mechanism that delays inactive schedules in order to reduce number of un-needed requests to Siri. Changed schema of JSON file (also in GTFS reader micro service) --- .../hasadna/bus/config/SchedulerConfig.java | 2 +- .../bus/service/ScheduleRetrieval.java | 92 +++++++---- .../bus/service/SiriMockServiceImpl.java | 14 ++ .../org/hasadna/bus/service/SortedQueue.java | 10 +- .../bus/service/gtfs/ReadMakatFileImpl.java | 132 ++++++++++------ .../org/hasadna/bus/util/DateTimeUtils.java | 3 + .../application-production.properties | 2 +- .../hasadna/bus/ReadSchedulingDataTest.java | 3 +- .../hasadna/bus/ValidateSchedulingTest.java | 144 ++++++++++++++++++ .../resources/application-test.properties | 1 + 10 files changed, 314 insertions(+), 89 deletions(-) create mode 100644 java/siri-0.1/src/test/java/org/hasadna/bus/ValidateSchedulingTest.java diff --git a/java/siri-0.1/src/main/java/org/hasadna/bus/config/SchedulerConfig.java b/java/siri-0.1/src/main/java/org/hasadna/bus/config/SchedulerConfig.java index 5449619a..56b1e39d 100644 --- a/java/siri-0.1/src/main/java/org/hasadna/bus/config/SchedulerConfig.java +++ b/java/siri-0.1/src/main/java/org/hasadna/bus/config/SchedulerConfig.java @@ -19,7 +19,7 @@ @Configuration @EnableScheduling -@Profile({"production", "dev"}) +@Profile({"production", "dev", "test"}) public class SchedulerConfig implements SchedulingConfigurer { protected final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class); diff --git a/java/siri-0.1/src/main/java/org/hasadna/bus/service/ScheduleRetrieval.java b/java/siri-0.1/src/main/java/org/hasadna/bus/service/ScheduleRetrieval.java index da119e93..a7f525b8 100644 --- a/java/siri-0.1/src/main/java/org/hasadna/bus/service/ScheduleRetrieval.java +++ b/java/siri-0.1/src/main/java/org/hasadna/bus/service/ScheduleRetrieval.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectWriter; import org.hasadna.bus.entity.GetStopMonitoringServiceResponse; import org.hasadna.bus.service.gtfs.DepartureTimes; +import org.hasadna.bus.service.gtfs.ReadMakatFileImpl; import org.hasadna.bus.service.gtfs.ReadRoutesFile; import org.hasadna.bus.util.DateTimeUtils; import org.slf4j.Logger; @@ -65,16 +66,19 @@ public class ScheduleRetrieval { @PostConstruct public void init() { + logger.debug("init started"); List data = readSchedulingDataAllFiles(dataFileFullPath); - addDepartures(data); + //addDepartures(data, LocalDate.now(DEFAULT_CLOCK)); // we currently don't filter out entries - only log warnings - validateScheduling(data, LocalDateTime.now(DEFAULT_CLOCK)); + //validateScheduling(data, LocalDateTime.now(DEFAULT_CLOCK)); for (Command c : data) { queue.put(c); } logger.info("scheduler initialized. status: " + status()); + logger.debug("init completed, queue size:" + queue.getAll().length); + } // return true - means do NOT change nextExecution @@ -90,16 +94,27 @@ private boolean checkNecessityOfThisSchedulingUntilFirstDeparture(Command c, Loc // we postpone until first departure, but ONLY after 1AM !! // (this is because until 1AM we still have services from yesterday that we want to track) - if (currentTime.getHour() > 1) { // ugly, will be replaced soon + if (currentTime.getHour() > 0) { // ugly, will be replaced soon if (timeOfFirstDeparture.isAfter(currentTime.plusMinutes(30))) { // timeOfFirstDeparture is more than 30 minutes from now // set nextExecution to 30 minutes before firstDeparture LocalDateTime nextExecution = timeOfFirstDeparture.minusMinutes(30); + //c.nextExecution = nextExecution; logger.info("route {} - postpone next execution to {}, firstDeparture only at {}", c.lineRef, nextExecution, timeOfFirstDeparture); return disabled; //notNeeded } + else { + if (timeOfFirstDeparture.isBefore(currentTime)) { + logger.debug("route {} - not postponed! next execution {}, firstDeparture was at {}", c.lineRef, c.nextExecution, timeOfFirstDeparture); + } + else { + logger.debug("route {} - not postponed! next execution {}, firstDeparture is expected in less than 30 minutes at {}", c.lineRef, c.nextExecution, timeOfFirstDeparture); + } + } + } + else { + logger.debug("route {} - not postponed! hoer is before 1:00 AM. next execution {}, firstDeparture at {}", c.lineRef, c.nextExecution, timeOfFirstDeparture); } - logger.info("route {} - not postponed! next execution {}, firstDeparture at {}", c.lineRef, c.nextExecution, timeOfFirstDeparture); } return true; } @@ -132,17 +147,22 @@ private boolean checkNecessityOfThisSchedulingForRestOfToday(Command c, LocalDat if (currentTime.isAfter(lastArrival.plusMinutes(30))) { //now is more than 30 minutes after planned last arrival - stopQuerying LocalDateTime nextExecution = LocalTime.of(23, 45).atDate(currentTime.toLocalDate()); - logger.info("route {} - postpone next execution to {}, last Arrival will be at {}", c.lineRef, nextExecution, lastArrival); + logger.info("route {} - postpone next execution to {}, last Arrival was at {}", c.lineRef, nextExecution, lastArrival); return disabled; //notNeeded } + else { + logger.debug("route {} - not postponed! next execution {}, lastDeparture was at {}, expecting last arrival at {}", c.lineRef, c.nextExecution, timeOfLastDeparture, lastArrival); + } + } + else { + logger.debug("route {} - not postponed! next execution {}, lastDeparture expected at {}", c.lineRef, c.nextExecution, timeOfLastDeparture); } - logger.info("route {} - not postponed! next execution {}, firstDeparture at {}", c.lineRef, c.nextExecution, timeOfFirstDeparture); List activeRanges = calculateActiveRanges(c.weeklyDepartureTimes, today, c.lastArrivalTimes); - logger.info("active range: {}", displayActiveRanges(activeRanges)); + logger.info("route {}, active range (currently not usable): {}", c.lineRef, displayActiveRanges(activeRanges)); } else { - logger.info("route {} - not postponed! next execution {}, weekly departure times unknown", c.lineRef, c.nextExecution); + logger.debug("route {} - not postponed! next execution {}, weekly departure times unknown", c.lineRef, c.nextExecution); } return true; // keep scheduling } @@ -157,7 +177,7 @@ private String displayActiveRanges(List activeRanges) { } public ValidationResults validateScheduling(List data, LocalDateTime currentTime) { - logger.info("validating ..."); + logger.info("validating ..., currentTime={}", currentTime.toString()); data = data.stream() .filter(c -> c.isActive) // this will filter out those that their nextExecution was already changed .collect(Collectors.toList()); @@ -166,23 +186,28 @@ public ValidationResults validateScheduling(List data, LocalDateTime cu List notNeeded = new ArrayList<>(); List delayTillFirstDeparture = new ArrayList<>(); for (Command c : data) { + // schedulerInactiveMechanismEnabled=false means that this method will only log its result if (false == checkNecessityOfThisSchedulingUntilFirstDeparture(c, currentTime, !schedulerInactiveMechanismEnabled)) { delayTillFirstDeparture.add(c); - c.isActive = false; + //c.isActive = false; } else if (false == checkNecessityOfThisSchedulingForRestOfToday(c, currentTime, !schedulerInactiveMechanismEnabled)) { notNeeded.add(c); - // disabled = true means that this method will only log its result } } - logger.debug("validateScheduling - return following routes that are not active any more today: {}", notNeeded.stream().map(c -> c.lineRef).collect(Collectors.toList())); + if (!delayTillFirstDeparture.isEmpty()) { + logger.debug("validateScheduling - return following routes that can be delayed until their first departure: {}", delayTillFirstDeparture.stream().map(c -> c.lineRef).collect(Collectors.toList())); + } + if (!notNeeded.isEmpty()) { + logger.debug("validateScheduling - return following routes that are not active any more today: {}", notNeeded.stream().map(c -> c.lineRef).collect(Collectors.toList())); + } // Note that there is only one place where this returned list is actually used // to reduce number of active schedulings (in updateSchedulingDataPeriodically) return new ValidationResults(delayTillFirstDeparture, notNeeded); } - private class ValidationResults { + public class ValidationResults { public List delayTillFirstDeparture; public List notNeeded; @@ -344,6 +369,7 @@ public List findActive() { @Scheduled(fixedRate=300000) // every 5 minutes. @Async public void updateSchedulingDataPeriodically() { + logger.debug("UPDATE SCHEDULING"); try { // from validate we get : // 1.list of all routeIds that will not depart any more TODAY. @@ -360,6 +386,7 @@ public void updateSchedulingDataPeriodically() { .stream().map(c -> c.lineRef).collect(Collectors.toList()); // so we change their nextExecution to about 23:45 queue.stopQueryingToday(notNeededToday); + logger.warn("Current scheduler status: " + status()); } catch (Exception ex) { @@ -422,10 +449,10 @@ public void reReadSchedulingAndReplace() { logger.info("{} schedules were read", data.size()); - addDepartures(data); + //addDepartures(data, LocalDate.now(DEFAULT_CLOCK)); // changes data by possibly setting nextExecution to a later time - validateScheduling(data, LocalDateTime.now(DEFAULT_CLOCK)); + //validateScheduling(data, LocalDateTime.now(DEFAULT_CLOCK)); // remove ALL current schedules logger.info("removing all schedules"); @@ -440,22 +467,25 @@ public void reReadSchedulingAndReplace() { logger.warn("Queue status now: " + status()); } - public void addDepartures(List allScheduledCommands) { - try { - if (makatFile != null && makatFile.getStatus()) { - for (Command c : allScheduledCommands) { - DepartureTimes dt = makatFile.findDeparturesByRouteId(c.lineRef); - if (dt != null) { - c.weeklyDepartureTimes = dt.departures; - c.activeRanges = calculateActiveRanges(dt.departures, DayOfWeek.THURSDAY, c.lastArrivalTimes); - } - } - logger.info("departure times updated from makat file"); - } - } - catch (Exception ex) { - logger.error("absorbing exception during readin from makatFile", ex); - } + public void addDepartures(List allScheduledCommands, LocalDate date) { +// try { +// if (makatFile != null && makatFile.getStatus()) { +// for (Command c : allScheduledCommands) { +// DepartureTimes dt = ((ReadMakatFileImpl)makatFile).findDeparturesByRouteId(c.lineRef, date); +// if (dt != null) { +// c.weeklyDepartureTimes = dt.departures; +// c.activeRanges = calculateActiveRanges(dt.departures, date.getDayOfWeek(), c.lastArrivalTimes); +// } +// } +// logger.info("departure times updated from makat file"); +// } +// else { +// logger.info("makat file data not available"); +// } +// } +// catch (Exception ex) { +// logger.error("absorbing exception during readin from makatFile", ex); +// } } diff --git a/java/siri-0.1/src/main/java/org/hasadna/bus/service/SiriMockServiceImpl.java b/java/siri-0.1/src/main/java/org/hasadna/bus/service/SiriMockServiceImpl.java index dd53215a..08a9284a 100644 --- a/java/siri-0.1/src/main/java/org/hasadna/bus/service/SiriMockServiceImpl.java +++ b/java/siri-0.1/src/main/java/org/hasadna/bus/service/SiriMockServiceImpl.java @@ -16,6 +16,10 @@ import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; +import uk.org.siri.siri.LineRefStructure; +import uk.org.siri.siri.MonitoredVehicleJourneyStructure; +import uk.org.siri.siri.NaturalLanguageStringStructure; + import static org.hasadna.bus.util.DateTimeUtils.DEFAULT_CLOCK; import javax.annotation.PostConstruct; @@ -153,6 +157,16 @@ public GetStopMonitoringServiceResponse retrieveSiri(String stopCode, String pre GetStopMonitoringServiceResponse response = (GetStopMonitoringServiceResponse)je.getValue(); logger.trace("converting done"); + MonitoredVehicleJourneyStructure x = + response.getAnswer().getStopMonitoringDelivery(). + get(0).getMonitoredStopVisit(). + get(0).getMonitoredVehicleJourney(); + NaturalLanguageStringStructure a = new NaturalLanguageStringStructure(); + a.setValue("420"); + x.setPublishedLineName(a); + LineRefStructure b = new LineRefStructure(); + b.setValue("15531"); + x.setLineRef(b); return response; } catch (Exception e) { diff --git a/java/siri-0.1/src/main/java/org/hasadna/bus/service/SortedQueue.java b/java/siri-0.1/src/main/java/org/hasadna/bus/service/SortedQueue.java index 681dc3f3..59bff224 100644 --- a/java/siri-0.1/src/main/java/org/hasadna/bus/service/SortedQueue.java +++ b/java/siri-0.1/src/main/java/org/hasadna/bus/service/SortedQueue.java @@ -36,7 +36,7 @@ public void init() { this.registry.gaugeCollectionSize("siri.scheduler.queue", Tags.empty(), this.queue); } - void put(Command command) { + public void put(Command command) { queue.offer(command); } @@ -160,7 +160,7 @@ public List getAllSchedules() { List removeByLineRef(String lineRef) { - logger.debug("removing all schedules of lineRef {}", lineRef); + logger.debug("removing route {} from scheduler queue (will be added again at midnight or whenever schedule file is re-read)", lineRef); List candidatesToRemoval = new ArrayList<>(); Iterator iter = queue.iterator(); while (iter.hasNext()) { @@ -179,8 +179,8 @@ List removeByLineRef(String lineRef) { logger.warn("removal of {} returned false", c); } } - logger.debug("return a list of {} schedules", removed.size()); - logger.trace("return {}", removed); +// logger.debug("return a list of {} schedules", removed.size()); +// logger.trace("return {}", removed); return removed; } @@ -193,6 +193,6 @@ public void removeAll() { queue.removeIf(c -> true); } - private Queue queue = new PriorityBlockingQueue<>(20, (c1, c2) -> c1.nextExecution.isBefore(c2.nextExecution)?-1:1); + private Queue queue = new PriorityBlockingQueue<>(1000, (c1, c2) -> c1.nextExecution.isBefore(c2.nextExecution)?-1:1); } diff --git a/java/siri-0.1/src/main/java/org/hasadna/bus/service/gtfs/ReadMakatFileImpl.java b/java/siri-0.1/src/main/java/org/hasadna/bus/service/gtfs/ReadMakatFileImpl.java index 67fbe7e6..04d6ac61 100644 --- a/java/siri-0.1/src/main/java/org/hasadna/bus/service/gtfs/ReadMakatFileImpl.java +++ b/java/siri-0.1/src/main/java/org/hasadna/bus/service/gtfs/ReadMakatFileImpl.java @@ -17,8 +17,10 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.time.DayOfWeek; +import java.time.LocalDate; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; +import java.time.format.TextStyle; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; @@ -29,7 +31,7 @@ //@Profile({"production", "dev"}) public class ReadMakatFileImpl implements ReadRoutesFile { - @Value("${gtfs.dir.location:/home/evyatar/work/hasadna/open-bus/gtfs/GTFS-2018-06-20/}") + @Value("${gtfs.dir.location:/home/evyatar/logs/}") public String dirOfGtfsFiles ; @@ -40,10 +42,8 @@ public class ReadMakatFileImpl implements ReadRoutesFile { public Map > mapByRoute = new HashMap<>(); - public Map mapDepartueTimesOfRoute = new HashMap<>(); + public Map mapDepartureTimesOfRoute = new HashMap<>(); - @Autowired - DataInit dataInit; boolean status = false; @PostConstruct @@ -55,16 +55,18 @@ public void init() { } makatFullPath = dirOfGtfsFiles + makatFileName; - try { - logger.warn("makatFile read starting"); - mapByRoute = readMakatFile(); - mapDepartueTimesOfRoute = processMapByRoute(mapByRoute); - logger.warn("makatFile read done"); - status = true; - } - catch (Exception ex) { - logger.error("absorbing unhandled exception during reading makat file"); - } +// try { +// logger.warn("makatFile read starting"); +// mapByRoute = readMakatFile(); +// logger.info("makat file processing"); +// LocalDate today = LocalDate.now(DEFAULT_CLOCK); +// mapDepartureTimesOfRoute = processMapByRoute(mapByRoute, today); +// logger.warn("makatFile done"); +// status = true; +// } +// catch (Exception ex) { +// logger.error("absorbing unhandled exception during reading makat file"); +// } logger.info("init in PostConstruct completed"); } @@ -73,28 +75,54 @@ public boolean getStatus() { return status; } - private Map processMapByRoute(Map> mapByRoute) { - // key is routeId - Map mapDepartueTimesOfRoute = new HashMap<>(); + private Map processMapByRoute(Map> mapByRoute, LocalDate startAtDate) { + // key is routeId+date + Map mapDepartureTimesOfRoute = new HashMap<>(); // find all makats for (String routeId : mapByRoute.keySet()) { - Map> departueTimesForDay = new HashMap<>(); - for (DayOfWeek day : DayOfWeek.values()) { - departueTimesForDay.put(day, new ArrayList<>()); + logger.trace("processing route {}", routeId); + Map> departureTimesForDate = new HashMap<>(); + for (int count = 0 ; count < 7 ; count++) { + LocalDate date = startAtDate.plusDays(count); + logger.trace("day {}, date {}, weekday {}", count, date, date.getDayOfWeek()); + departureTimesForDate.put(date, new ArrayList<>()); + List linesOfRoute = mapByRoute.get(routeId).stream() + .filter(makatData -> !makatData.fromDate.isAfter(date) && + !makatData.toDate.isBefore(date) + ).collect(Collectors.toList()); + logger.trace("found {} records for route {}", linesOfRoute.size(), routeId); + if (!linesOfRoute.isEmpty()) { + for (MakatData makatData : linesOfRoute) { + departureTimesForDate.get(date).add(makatData.departure); + } + logger.trace("departureTimesForDate has {} departures", departureTimesForDate.get(date).size()); + logger.trace("departures for date {}: {}", date, departureTimesForDate.get(date)); + MakatData template = linesOfRoute.get(0); + template.departureTimesForDate = departureTimesForDate; + template.tripId = ""; + String key = generateKey(routeId, date); + mapDepartureTimesOfRoute.put(key, template); + logger.trace("added to mapDepartureTimesOfRoute for key {}: {}", key, template); + } } - List linesOfRoute = mapByRoute.get(routeId); - for (MakatData makatData : linesOfRoute) { - departueTimesForDay.get(makatData.dayOfWeek).add(makatData.departure); - } - for (DayOfWeek day : DayOfWeek.values()) { - departueTimesForDay.get(day).sort(Comparator.naturalOrder()); // sorts the list in-place - } - MakatData template = linesOfRoute.get(0); - template.departureTimesForDay = departueTimesForDay; - template.tripId = ""; - mapDepartueTimesOfRoute.put(routeId, template); +// +// Map> departueTimesForDay = new HashMap<>(); +// for (DayOfWeek day : DayOfWeek.values()) { +// departueTimesForDay.put(day, new ArrayList<>()); +// } +// List linesOfRoute = mapByRoute.get(routeId); +// for (MakatData makatData : linesOfRoute) { +// departueTimesForDay.get(makatData.dayOfWeek).add(makatData.departure); +// } +// for (DayOfWeek day : DayOfWeek.values()) { +// departueTimesForDay.get(day).sort(Comparator.naturalOrder()); // sorts the list in-place +// } } - return mapDepartueTimesOfRoute; + return mapDepartureTimesOfRoute; + } + + private String generateKey(String routeId, LocalDate date) { + return routeId + "@" + date.toString(); } private Map > readMakatFile() { @@ -119,8 +147,8 @@ private MakatData parseMakatLine(String line) { fields[0], fields[2], fields[3], - LocalDateTime.parse(fromDate, formatter), - LocalDateTime.parse(toDate, formatter), + LocalDateTime.parse(fromDate, formatter).toLocalDate(), + LocalDateTime.parse(toDate, formatter).toLocalDate(), fields[6], dayFromNumber(dayInWeek), fields[8] @@ -177,24 +205,28 @@ public List findLastStopCodeByRouteId(String routeId, boolean shortProce @Override public DepartureTimes findDeparturesByRouteId(String routeId) { - if ( (mapDepartueTimesOfRoute == null) || mapDepartueTimesOfRoute.isEmpty()) { + logger.warn("not implemented for Makat file. Use findDeparturesByRouteId(RouteId, LocalDate"); + return null; + } + + public DepartureTimes findDeparturesByRouteId(String routeId, LocalDate date) { + if ( (mapDepartureTimesOfRoute == null) || mapDepartureTimesOfRoute.isEmpty()) { return null; } if (StringUtils.isEmpty(routeId)) { return null; } - MakatData md = mapDepartueTimesOfRoute.get(routeId); + MakatData md = mapDepartureTimesOfRoute.get(generateKey(routeId, date)); if (md == null) { return null; } - // TODO check if now is between fromDate and toDate - - DepartureTimes dt = new DepartureTimes(routeId, ""); - for (DayOfWeek day : md.departureTimesForDay.keySet()) { - dt.addDepartureTimes(day, md.departureTimesForDay.get(day)); + Map> departures = new ConcurrentHashMap<>(); + for (LocalDate theDate : md.departureTimesForDate.keySet()) { + departures.put(theDate.getDayOfWeek(), md.departureTimesForDate.get(theDate)); } - - return dt; + DepartureTimes x = new DepartureTimes(routeId, md.makat); + x.departures = departures; + return x; } // @Override @@ -240,14 +272,14 @@ private class MakatData { public String routeId; public String direction; public String alternative; - public LocalDateTime fromDate; - public LocalDateTime toDate; + public LocalDate fromDate; + public LocalDate toDate; public String tripId; public DayOfWeek dayOfWeek; public String departure; - public Map> departureTimesForDay; + public Map> departureTimesForDate; - public MakatData(String makat, String routeId, String direction, String alternative, LocalDateTime fromDate, LocalDateTime toDate, String tripId, DayOfWeek dayOfWeek, String departure) { + public MakatData(String makat, String routeId, String direction, String alternative, LocalDate fromDate, LocalDate toDate, String tripId, DayOfWeek dayOfWeek, String departure) { this.makat = makat; this.routeId = routeId; this.direction = direction; @@ -260,7 +292,7 @@ public MakatData(String makat, String routeId, String direction, String alternat } public MakatData() { - new MakatData("", "", "", "", LocalDateTime.now(DEFAULT_CLOCK), LocalDateTime.now(DEFAULT_CLOCK), "", DayOfWeek.SATURDAY, ""); + new MakatData("", "", "", "", LocalDate.now(DEFAULT_CLOCK), LocalDate.now(DEFAULT_CLOCK), "", DayOfWeek.SATURDAY, ""); } @@ -278,13 +310,13 @@ public String toString() { ", fromDate=" + fromDate + ", toDate=" + toDate + ", tripId='" + tripId + '\''; - if (departureTimesForDay == null) { + if (departureTimesForDate == null) { s = s + ", dayOfWeek=" + dayOfWeek + ", departure='" + departure + '\'' + '}'; } else { - s = s + ", departures='" + departureTimesForDay.toString() + '\'' + + s = s + ", departures='" + departureTimesForDate.toString() + '\'' + '}'; } return s ; diff --git a/java/siri-0.1/src/main/java/org/hasadna/bus/util/DateTimeUtils.java b/java/siri-0.1/src/main/java/org/hasadna/bus/util/DateTimeUtils.java index 513488a7..504b87d7 100644 --- a/java/siri-0.1/src/main/java/org/hasadna/bus/util/DateTimeUtils.java +++ b/java/siri-0.1/src/main/java/org/hasadna/bus/util/DateTimeUtils.java @@ -18,6 +18,9 @@ public class DateTimeUtils { public static Clock DEFAULT_CLOCK = Clock.systemDefaultZone(); + // for Debugging in dev env, you can use this to simulate a different date-time then the current time + //public static Clock DEFAULT_CLOCK = Clock.offset( Clock.systemDefaultZone(), Duration.ofDays(-2)); + public static final DateTimeFormatter TIME_FORMAT = DateTimeFormatter.ofPattern("HH:mm"); public static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd"); public static final String START_OF_DAY = "00:00"; diff --git a/java/siri-0.1/src/main/resources/application-production.properties b/java/siri-0.1/src/main/resources/application-production.properties index 1e966907..4efc36cf 100644 --- a/java/siri-0.1/src/main/resources/application-production.properties +++ b/java/siri-0.1/src/main/resources/application-production.properties @@ -5,7 +5,7 @@ pool.http.retrieve.max.pool.size=15 pool.http.retrieve.core.pool.size=15 #following line: value must end with / -gtfs.dir.location=/home/evyatar/logs/ +gtfs.dir.location=/tmp/ #read all files siri.schedule.*json in this dir scheduler.data.file=/home/evyatar/logs/ diff --git a/java/siri-0.1/src/test/java/org/hasadna/bus/ReadSchedulingDataTest.java b/java/siri-0.1/src/test/java/org/hasadna/bus/ReadSchedulingDataTest.java index 61a9acba..6f2cd156 100644 --- a/java/siri-0.1/src/test/java/org/hasadna/bus/ReadSchedulingDataTest.java +++ b/java/siri-0.1/src/test/java/org/hasadna/bus/ReadSchedulingDataTest.java @@ -15,6 +15,7 @@ import org.springframework.test.context.junit4.SpringRunner; import java.time.DayOfWeek; +import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.util.List; @@ -54,7 +55,7 @@ public void test1() { public void test2() { String file = this.getClass().getClassLoader().getResource("siri.schedule.18.Thursday.json").getFile(); List list = scheduleRetrieval.readSchedulingData(file); - scheduleRetrieval.addDepartures(list); + scheduleRetrieval.addDepartures(list, LocalDate.now()); logger.info("weeklyDepartureTimes {}", list.get(0).weeklyDepartureTimes ); logger.info("departureTimes {}", list.get(0).departureTimes ); logger.info("activeRanges {} to {}", list.get(0).activeRanges.get(0)[0], list.get(0).activeRanges.get(0)[1] ); diff --git a/java/siri-0.1/src/test/java/org/hasadna/bus/ValidateSchedulingTest.java b/java/siri-0.1/src/test/java/org/hasadna/bus/ValidateSchedulingTest.java new file mode 100644 index 00000000..9bca7db3 --- /dev/null +++ b/java/siri-0.1/src/test/java/org/hasadna/bus/ValidateSchedulingTest.java @@ -0,0 +1,144 @@ +package org.hasadna.bus; + +import org.assertj.core.api.Assertions; +import org.hasadna.bus.config.SchedulerConfig; +import org.hasadna.bus.service.Command; +import org.hasadna.bus.service.ScheduleRetrieval; +import org.hasadna.bus.service.SortedQueue; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.test.context.junit4.SpringRunner; + +import java.time.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import static org.hasadna.bus.util.DateTimeUtils.DEFAULT_CLOCK; +import static org.hasadna.bus.util.DateTimeUtils.timeAsStr; +import static org.hasadna.bus.util.DateTimeUtils.toDateTime; + +@RunWith(SpringRunner.class) +@SpringBootTest //(classes = {SchedulerConfig.class, BusApplication.class}) +public class ValidateSchedulingTest { + + @Autowired + ScheduleRetrieval scheduleRetrieval; + + @Autowired + SortedQueue sq; + + List data = new ArrayList<>(); + LocalDateTime currentTime; + + @Before + public void setup() { + + currentTime = LocalDateTime.of(2018, 8, 7, 10, 0); + DayOfWeek dayOfWeek = currentTime.getDayOfWeek(); + + Command c = new Command("stopCode1"); + c.lineRef = "lineRef1"; + c.isActive = true; + c.weeklyDepartureTimes = new HashMap<>(); + c.weeklyDepartureTimes.put(dayOfWeek, Arrays.asList("06:00", "07:00")); + c.lastArrivalTimes = new HashMap<>(); + c.lastArrivalTimes.put(dayOfWeek, "09:00"); + data.add(c); + + } + + @Test + public void test1() { + + ScheduleRetrieval.ValidationResults validationResults = scheduleRetrieval.validateScheduling( + data, + currentTime); + + Assertions.assertThat( validationResults.delayTillFirstDeparture ).isNotNull().isEmpty(); + Assertions.assertThat( validationResults.notNeeded).isNotNull().isNotEmpty(); + Assertions.assertThat( validationResults.notNeeded.get(0).stopCode).isEqualTo("stopCode1"); + } + + @Test + public void test2() { + + currentTime = LocalDateTime.of(2018, 8, 7, 2, 0); + ScheduleRetrieval.ValidationResults validationResults = scheduleRetrieval.validateScheduling( + data, + currentTime); + + Assertions.assertThat( validationResults.notNeeded ).isNotNull().isEmpty(); + Assertions.assertThat( validationResults.delayTillFirstDeparture).isNotNull().isNotEmpty(); + Assertions.assertThat( validationResults.delayTillFirstDeparture.get(0).stopCode).isEqualTo("stopCode1"); +//// Assertions.assertThat( validationResults.delayTillFirstDeparture.get(0).nextExecution).isNotNull(); +// Assertions.assertThat( currentTime).isNotNull(); +// Assertions.assertThat( timeAsStr( currentTime.withHour(5).toLocalTime())).isEqualTo("05:00"); +// Assertions.assertThat( validationResults.delayTillFirstDeparture.get(0).nextExecution) +// .isAfter(currentTime.withHour(5)) +// .isBefore(currentTime.withHour(6)); + } + + @Test + public void test3() throws InterruptedException { + currentTime = LocalDateTime.of(2018, 8, 7, 2, 0); + DEFAULT_CLOCK = Clock.fixed(currentTime.toInstant(ZoneOffset.UTC), ZoneOffset.UTC); + sq.removeAll(); + //System.out.println("a "+sq.getAllSchedules()); + data.forEach(c -> sq.put(c)); + + //System.out.println("b "+sq.getAllSchedules()); + scheduleRetrieval.updateSchedulingDataPeriodically(); + Thread.sleep(2000); // updateScheduling happens in a different thread, so give it some time to happen + //System.out.println("c "+sq.getAllSchedules()); + Assertions.assertThat(sq.getAllSchedules()).isNotNull().isNotEmpty(); + //System.out.println("1 "+sq.getAllSchedules()); + //List list = sq.getAllSchedules(); + Assertions.assertThat( currentTime).isNotNull(); + Assertions.assertThat( timeAsStr( currentTime.withHour(5).toLocalTime())).isEqualTo("05:00"); + //System.out.println("2 "+list); + Assertions.assertThat(sq.getAllSchedules().get(0).nextExecution).isNotNull(); + //System.out.println("3 "+sq.getAllSchedules()); + Assertions.assertThat(sq.getAllSchedules().get(0).nextExecution).isNotNull(); + LocalDateTime ne = sq.getAllSchedules().get(0).nextExecution; + LocalDateTime be = toDateTime("05:31", currentTime.toLocalDate()); + LocalDateTime af = toDateTime("05:29", currentTime.toLocalDate()); + Assertions.assertThat(ne) + .isBefore(be) + .isAfter(af); +// Assertions.assertThat(sq.getAllSchedules().get(0).nextExecution) +// .isBefore(toDateTime("05:31", currentTime.toLocalDate())) +// .isAfter(toDateTime("05:29", currentTime.toLocalDate())); + + } + + @Test + public void test4() throws InterruptedException { + //currentTime = LocalDateTime.of(2018, 8, 7, 2, 0); + DEFAULT_CLOCK = + Clock.offset(Clock.systemDefaultZone(), Duration.ofHours(-55) ); + //Clock.fixed(Instant.parse("2018-04-29T10:15:30.00Z"), + // ZoneId.of("Asia/Calcutta")); + //Clock myClock = DEFAULT_CLOCK; + System.out.println(DEFAULT_CLOCK); + currentTime = LocalDateTime.now(DEFAULT_CLOCK); + System.out.println("now is " + currentTime.toString()); + + while (currentTime.toString().equals("1")) { + try { + if (sq.getAll() == null) System.out.println("sq.getAll is null"); + System.out.println("" + LocalDateTime.now() + " queue:" + sq.getAll().length + ", active:" + sq.showActive().size()); + System.out.println(" next at:" + sq.peek().nextExecution.toLocalTime().toString()); + } + catch ( Exception ex) { + System.out.println("absorbing exception " + ex.getMessage()); + } + Thread.sleep(10000); + } + } +} diff --git a/java/siri-0.1/src/test/resources/application-test.properties b/java/siri-0.1/src/test/resources/application-test.properties index c9f06cdd..a5899ce3 100644 --- a/java/siri-0.1/src/test/resources/application-test.properties +++ b/java/siri-0.1/src/test/resources/application-test.properties @@ -11,5 +11,6 @@ management.metrics.export.datadog.apiKey=dummy datadog.apiKey=dummy management.metrics.export.datadog.application-key=dummy +spring.profiles.active=test From 927ec3eadac5add05553b4bf23cf648138576053 Mon Sep 17 00:00:00 2001 From: Evyatar Date: Sun, 12 Aug 2018 09:22:03 +0300 Subject: [PATCH 2/6] reduce logging --- java/siri-0.1/src/main/resources/logback.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/siri-0.1/src/main/resources/logback.xml b/java/siri-0.1/src/main/resources/logback.xml index aa30b6ad..ba21bb26 100644 --- a/java/siri-0.1/src/main/resources/logback.xml +++ b/java/siri-0.1/src/main/resources/logback.xml @@ -49,7 +49,7 @@ - + From 78984c5baaecc512322d030995cfb4b91ed47e66 Mon Sep 17 00:00:00 2001 From: evyatar Date: Sun, 12 Aug 2018 09:24:13 +0300 Subject: [PATCH 3/6] fix logging, reduce level --- java/siri-0.1/src/main/resources/logback.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/java/siri-0.1/src/main/resources/logback.xml b/java/siri-0.1/src/main/resources/logback.xml index aa30b6ad..66a52bf7 100644 --- a/java/siri-0.1/src/main/resources/logback.xml +++ b/java/siri-0.1/src/main/resources/logback.xml @@ -49,7 +49,7 @@ - + @@ -61,4 +61,4 @@ - \ No newline at end of file + From 941e716eb2bdd81b9b79af24e042ff97a1b0db3b Mon Sep 17 00:00:00 2001 From: Evyatar Date: Mon, 13 Aug 2018 07:42:45 +0300 Subject: [PATCH 4/6] fix bug - handle exception that might cause schedules to not be added to queue --- .../bus/service/ScheduleRetrieval.java | 11 ++++++++++- .../org/hasadna/bus/service/SortedQueue.java | 19 ++++++++++++++----- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/java/siri-0.1/src/main/java/org/hasadna/bus/service/ScheduleRetrieval.java b/java/siri-0.1/src/main/java/org/hasadna/bus/service/ScheduleRetrieval.java index a7f525b8..ff6dab67 100644 --- a/java/siri-0.1/src/main/java/org/hasadna/bus/service/ScheduleRetrieval.java +++ b/java/siri-0.1/src/main/java/org/hasadna/bus/service/ScheduleRetrieval.java @@ -27,6 +27,7 @@ import java.time.format.DateTimeParseException; import java.time.temporal.ChronoUnit; import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static org.hasadna.bus.util.DateTimeUtils.*; @@ -61,7 +62,8 @@ public class ScheduleRetrieval { @Autowired ReadRoutesFile makatFile ; - + // a switch that enables other methods to signify that we need a reRead of schedule files + private AtomicBoolean requireReRead = new AtomicBoolean(false); @PostConstruct @@ -391,6 +393,12 @@ public void updateSchedulingDataPeriodically() { } catch (Exception ex) { logger.error("absorbing exception when updating Scheduling Data. You should initiate re-read of all schedules", ex); + requireReRead.set(true); + } + // if anyone wants to reRead all schedule files, do it now + if (requireReRead.get()) { + reReadSchedulingAndReplace(); + requireReRead.set(false); } } @@ -463,6 +471,7 @@ public void reReadSchedulingAndReplace() { for (Command c : data) { queue.put(c); } + requireReRead.set(false); logger.warn("Queue status now: " + status()); } diff --git a/java/siri-0.1/src/main/java/org/hasadna/bus/service/SortedQueue.java b/java/siri-0.1/src/main/java/org/hasadna/bus/service/SortedQueue.java index 59bff224..6ab1a147 100644 --- a/java/siri-0.1/src/main/java/org/hasadna/bus/service/SortedQueue.java +++ b/java/siri-0.1/src/main/java/org/hasadna/bus/service/SortedQueue.java @@ -82,11 +82,20 @@ public void delayNextExecution(final List delayTillFirstDeparture) { LocalDateTime currentTime = LocalDateTime.now(DEFAULT_CLOCK); DayOfWeek today = currentTime.getDayOfWeek(); for (Command c : candidatesForUpdatingNextExecution) { - String firstDeparture = c.weeklyDepartureTimes.get(today).get(0); - String evaluateAt = subtractMinutesStopAtMidnight(firstDeparture, 30); - c.nextExecution = toDateTime(evaluateAt); - c.isActive = false; - updatedNextExecution.add(c); + try { + if ((c.weeklyDepartureTimes != null) && + c.weeklyDepartureTimes.containsKey(today) && + !c.weeklyDepartureTimes.get(today).isEmpty()) { + String firstDeparture = c.weeklyDepartureTimes.get(today).get(0); + String evaluateAt = subtractMinutesStopAtMidnight(firstDeparture, 30); + c.nextExecution = toDateTime(evaluateAt); + c.isActive = false; + updatedNextExecution.add(c); + } + } + catch (Exception ex) { + logger.error("absorbing unhandled exception while calculating next execution of route " + c.lineRef, ex); + } } int count = addBackToQueue(updatedNextExecution); From fc3e4a32eb02d41bc0f967901b008b37e0784cff Mon Sep 17 00:00:00 2001 From: Evyatar Date: Thu, 16 Aug 2018 21:30:17 +0300 Subject: [PATCH 5/6] increase throughput by increasing scheduler frequency to 1 every 20 ms. increasing number of threads --- .../main/java/org/hasadna/bus/service/ScheduleRetrieval.java | 2 +- .../src/main/resources/application-production.properties | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/java/siri-0.1/src/main/java/org/hasadna/bus/service/ScheduleRetrieval.java b/java/siri-0.1/src/main/java/org/hasadna/bus/service/ScheduleRetrieval.java index ff6dab67..35a86e24 100644 --- a/java/siri-0.1/src/main/java/org/hasadna/bus/service/ScheduleRetrieval.java +++ b/java/siri-0.1/src/main/java/org/hasadna/bus/service/ScheduleRetrieval.java @@ -414,7 +414,7 @@ public void updateSchedulingDataPeriodically() { * Before executing, it adds the same task again to the queue (with an updated * date for next execution) */ - @Scheduled(fixedRate=50) // every 50 ms. + @Scheduled(fixedRate=20) // every 20 ms. ==> max throughput 3000 per minute @Async("http-retrieve") public void retrieveCommandPeriodically() { if (!schedulerEnable) return; diff --git a/java/siri-0.1/src/main/resources/application-production.properties b/java/siri-0.1/src/main/resources/application-production.properties index 4efc36cf..ed55887e 100644 --- a/java/siri-0.1/src/main/resources/application-production.properties +++ b/java/siri-0.1/src/main/resources/application-production.properties @@ -1,8 +1,8 @@ server.port=8080 scheduler.default.interval.between.executions.minutes=1 scheduler.thread.pool.size=20 -pool.http.retrieve.max.pool.size=15 -pool.http.retrieve.core.pool.size=15 +pool.http.retrieve.max.pool.size=20 +pool.http.retrieve.core.pool.size=20 #following line: value must end with / gtfs.dir.location=/tmp/ From d868f2350796171bb1c3c76d0d3a063f8812b22d Mon Sep 17 00:00:00 2001 From: Evyatar Date: Mon, 20 Aug 2018 09:57:26 +0300 Subject: [PATCH 6/6] increase throughput by increasing scheduler frequency to 1 every 10 ms = max 6000 requests per minute. --- .../main/java/org/hasadna/bus/service/ScheduleRetrieval.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/siri-0.1/src/main/java/org/hasadna/bus/service/ScheduleRetrieval.java b/java/siri-0.1/src/main/java/org/hasadna/bus/service/ScheduleRetrieval.java index 35a86e24..7584e9ab 100644 --- a/java/siri-0.1/src/main/java/org/hasadna/bus/service/ScheduleRetrieval.java +++ b/java/siri-0.1/src/main/java/org/hasadna/bus/service/ScheduleRetrieval.java @@ -414,7 +414,7 @@ public void updateSchedulingDataPeriodically() { * Before executing, it adds the same task again to the queue (with an updated * date for next execution) */ - @Scheduled(fixedRate=20) // every 20 ms. ==> max throughput 3000 per minute + @Scheduled(fixedRate=10) // every 20 ms. ==> max throughput 3000 per minute @Async("http-retrieve") public void retrieveCommandPeriodically() { if (!schedulerEnable) return;