Skip to content

Commit

Permalink
Handle train change of stops #114
Browse files Browse the repository at this point in the history
  • Loading branch information
AaronClaydon committed Feb 17, 2024
1 parent c8ed00e commit c919d31
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 93 deletions.
3 changes: 0 additions & 3 deletions deploy/charts/travigo-dbwatch/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ spec:
{{- with .Values.env }}
{{- toYaml . | nindent 12 }}
{{- end }}
# TODO testing this probably isnt a good idea
- name: GOMAXPROCS
value: "24"
- name: TRAVIGO_LOG_FORMAT
value: JSON
- name: TRAVIGO_MONGODB_CONNECTION
Expand Down
14 changes: 7 additions & 7 deletions pkg/ctdf/departureboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func GenerateDepartureBoardFromJourneys(journeys []*Journey, stopRefs []string,
go func(journey *Journey) {
defer wg.Done()

var stopDeperatureTime time.Time
var stopDepartureTime time.Time
var stopPlatform string
var stopPlatformType string
var destinationDisplay string
Expand Down Expand Up @@ -85,7 +85,7 @@ func GenerateDepartureBoardFromJourneys(journeys []*Journey, stopRefs []string,
departureBoardRecordType = DepartureBoardRecordTypeCancelled
}

stopDeperatureTime = time.Date(
stopDepartureTime = time.Date(
dateTime.Year(), dateTime.Month(), dateTime.Day(), refTime.Hour(), refTime.Minute(), refTime.Second(), refTime.Nanosecond(), dateTime.Location(),
)

Expand All @@ -94,18 +94,18 @@ func GenerateDepartureBoardFromJourneys(journeys []*Journey, stopRefs []string,
}
}

if stopDeperatureTime.Before(dateTime) {
if stopDepartureTime.Before(dateTime) {
return
}

journey.GetReferences()

// If the departure is within 45 minutes then attempt to do an estimated arrival based on current vehicle realtime journey
// We estimate the current vehicle realtime journey based on the Block Number
stopDeperatureTimeFromNow := stopDeperatureTime.Sub(dateTime).Minutes()
stopDepartureTimeFromNow := stopDepartureTime.Sub(dateTime).Minutes()
if doEstimates &&
departureBoardRecordType == DepartureBoardRecordTypeScheduled &&
stopDeperatureTimeFromNow <= 45 && stopDeperatureTimeFromNow >= 0 &&
stopDepartureTimeFromNow <= 45 && stopDepartureTimeFromNow >= 0 &&
journey.OtherIdentifiers["BlockNumber"] != "" {

var blockJourneys []string
Expand Down Expand Up @@ -136,7 +136,7 @@ func GenerateDepartureBoardFromJourneys(journeys []*Journey, stopRefs []string,
if blockRealtimeJourney != nil {
// Ignore negative offsets as we assume bus will right itself when turning over
if blockRealtimeJourney.Offset.Minutes() > 0 {
stopDeperatureTime = stopDeperatureTime.Add(blockRealtimeJourney.Offset)
stopDepartureTime = stopDepartureTime.Add(blockRealtimeJourney.Offset)
}
departureBoardRecordType = DepartureBoardRecordTypeEstimated
}
Expand All @@ -153,7 +153,7 @@ func GenerateDepartureBoardFromJourneys(journeys []*Journey, stopRefs []string,
departureBoardGenerationMutex.Lock()
departureBoard = append(departureBoard, &DepartureBoard{
Journey: journey,
Time: stopDeperatureTime,
Time: stopDepartureTime,
DestinationDisplay: destinationDisplay,
Type: departureBoardRecordType,
Platform: stopPlatform,
Expand Down
2 changes: 2 additions & 0 deletions pkg/ctdf/realtimejourney.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ type RealtimeJourneyStops struct {
DepartureTime time.Time `groups:"basic"`

TimeType RealtimeJourneyStopTimeType `groups:"basic"`

Cancelled bool `groups:"basic"`
}

type RealtimeJourneyStopTimeType string
Expand Down
162 changes: 79 additions & 83 deletions pkg/realtime/nationalrail/darwin/pushport.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (p *PushPortData) UpdateRealtimeJourneys(queue *railutils.BatchProcessingQu
}

if journey == nil {
log.Error().Str("uid", trainStatus.UID).Msg("Failed to find respective Journey for this train")
log.Error().Str("uid", trainStatus.UID).Msg("Failed to find respective Journey for this train status update")
continue
}

Expand Down Expand Up @@ -196,116 +196,112 @@ func (p *PushPortData) UpdateRealtimeJourneys(queue *railutils.BatchProcessingQu

// Schedules
for _, schedule := range p.Schedules {
if schedule.CancelReason != "" {
pretty.Println(schedule)

realtimeJourneyID := fmt.Sprintf("GB:NATIONALRAIL:%s:%s", schedule.SSD, schedule.UID)
searchQuery := bson.M{"primaryidentifier": realtimeJourneyID}

var realtimeJourney *ctdf.RealtimeJourney
realtimeJourneyID := fmt.Sprintf("GB:NATIONALRAIL:%s:%s", schedule.SSD, schedule.UID)
searchQuery := bson.M{"primaryidentifier": realtimeJourneyID}

realtimeJourneysCollection.FindOne(context.Background(), searchQuery).Decode(&realtimeJourney)
var realtimeJourney *ctdf.RealtimeJourney

newRealtimeJourney := false
if realtimeJourney == nil {
// Find the journey for this train
var journey *ctdf.Journey
cursor, _ := journeysCollection.Find(context.Background(), bson.M{"otheridentifiers.TrainUID": schedule.UID})
realtimeJourneysCollection.FindOne(context.Background(), searchQuery).Decode(&realtimeJourney)

journeyDate, _ := time.Parse("2006-01-02", schedule.SSD)
newRealtimeJourney := false
if realtimeJourney == nil {
// Find the journey for this train
var journey *ctdf.Journey
cursor, _ := journeysCollection.Find(context.Background(), bson.M{"otheridentifiers.TrainUID": schedule.UID})

for cursor.Next(context.TODO()) {
var potentialJourney *ctdf.Journey
err := cursor.Decode(&potentialJourney)
if err != nil {
log.Error().Err(err).Msg("Failed to decode Journey")
}
journeyDate, _ := time.Parse("2006-01-02", schedule.SSD)

if potentialJourney.Availability.MatchDate(journeyDate) {
journey = potentialJourney
}
for cursor.Next(context.TODO()) {
var potentialJourney *ctdf.Journey
err := cursor.Decode(&potentialJourney)
if err != nil {
log.Error().Err(err).Msg("Failed to decode Journey")
}

if journey == nil {
log.Error().Str("uid", schedule.UID).Msg("Failed to find respective Journey for this train")
continue
if potentialJourney.Availability.MatchDate(journeyDate) {
journey = potentialJourney
}
}

// Construct the base realtime journey
realtimeJourney = &ctdf.RealtimeJourney{
PrimaryIdentifier: realtimeJourneyID,
ActivelyTracked: false,
TimeoutDurationMinutes: 90,
CreationDateTime: now,
Reliability: ctdf.RealtimeJourneyReliabilityExternalProvided,

Cancelled: true,
if journey == nil {
log.Error().Str("uid", schedule.UID).Msg("Failed to find respective Journey for this train schedule update")
continue
}

DataSource: datasource,
// Construct the base realtime journey
realtimeJourney = &ctdf.RealtimeJourney{
PrimaryIdentifier: realtimeJourneyID,
ActivelyTracked: false,
TimeoutDurationMinutes: 90,
CreationDateTime: now,
Reliability: ctdf.RealtimeJourneyReliabilityExternalProvided,

Journey: journey,
JourneyRunDate: journeyDate,
DataSource: datasource,

Stops: map[string]*ctdf.RealtimeJourneyStops{},
}
Journey: journey,
JourneyRunDate: journeyDate,

newRealtimeJourney = true
Stops: map[string]*ctdf.RealtimeJourneyStops{},
}

updateMap := bson.M{
"modificationdatetime": now,
}
newRealtimeJourney = true
}

// Update database
if newRealtimeJourney {
updateMap["primaryidentifier"] = realtimeJourney.PrimaryIdentifier
updateMap["activelytracked"] = realtimeJourney.ActivelyTracked
updateMap["timeoutdurationminutes"] = realtimeJourney.TimeoutDurationMinutes
updateMap := bson.M{
"modificationdatetime": now,
}

updateMap["reliability"] = realtimeJourney.Reliability
// Calculate the new stops
scheduleStops := []ScheduleStop{
schedule.Origin,
}
scheduleStops = append(scheduleStops, schedule.Intermediate...)
scheduleStops = append(scheduleStops, schedule.Destination)

updateMap["creationdatetime"] = realtimeJourney.CreationDateTime
updateMap["datasource"] = realtimeJourney.DataSource
for _, scheduleStop := range scheduleStops {
stop := stopCache.Get("Tiploc", scheduleStop.Tiploc)

updateMap["journey"] = realtimeJourney.Journey
updateMap["journeyrundate"] = realtimeJourney.JourneyRunDate
} else {
updateMap["datasource.identifier"] = datasource.Identifier
if stop == nil {
log.Debug().Str("tiploc", scheduleStop.Tiploc).Msg("Failed to find stop")
continue
}

updateMap["cancelled"] = true

createServiceAlert(ctdf.ServiceAlert{
PrimaryIdentifier: fmt.Sprintf("GB:RAILCANCEL:%s:%s", schedule.SSD, realtimeJourney.Journey.PrimaryIdentifier),
CreationDateTime: now,
ModificationDateTime: now,

DataSource: &ctdf.DataSource{},
if scheduleStop.Cancelled == "true" {
updateMap[fmt.Sprintf("stops.%s.cancelled", stop.PrimaryIdentifier)] = true
pretty.Println(realtimeJourneyID, stop.PrimaryIdentifier)
}
}

AlertType: ctdf.ServiceAlertTypeJourneyCancelled,
// Update database
if newRealtimeJourney {
updateMap["primaryidentifier"] = realtimeJourney.PrimaryIdentifier
updateMap["activelytracked"] = realtimeJourney.ActivelyTracked
updateMap["timeoutdurationminutes"] = realtimeJourney.TimeoutDurationMinutes

Text: railutils.CancelledReasons[schedule.CancelReason],
updateMap["reliability"] = realtimeJourney.Reliability

MatchedIdentifiers: []string{fmt.Sprintf("DAYINSTANCEOF:%s:%s", schedule.SSD, realtimeJourney.Journey.PrimaryIdentifier)},
updateMap["creationdatetime"] = realtimeJourney.CreationDateTime
updateMap["datasource"] = realtimeJourney.DataSource

ValidFrom: realtimeJourney.JourneyRunDate,
ValidUntil: realtimeJourney.JourneyRunDate.Add(48 * time.Hour),
})
updateMap["journey"] = realtimeJourney.Journey
updateMap["journeyrundate"] = realtimeJourney.JourneyRunDate
} else {
updateMap["datasource.identifier"] = datasource.Identifier
}

// Create update
bsonRep, _ := bson.Marshal(bson.M{"$set": updateMap})
updateModel := mongo.NewUpdateOneModel()
updateModel.SetFilter(searchQuery)
updateModel.SetUpdate(bsonRep)
updateModel.SetUpsert(true)
// Create update
bsonRep, _ := bson.Marshal(bson.M{"$set": updateMap})
updateModel := mongo.NewUpdateOneModel()
updateModel.SetFilter(searchQuery)
updateModel.SetUpdate(bsonRep)
updateModel.SetUpsert(true)

queue.Add(updateModel)
queue.Add(updateModel)

log.Info().
Str("realtimejourneyid", realtimeJourneyID).
Str("reason", schedule.CancelReason).
Msg("Train cancelled")
}
log.Info().
Str("realtimejourneyid", realtimeJourneyID).
Str("journeyid", realtimeJourney.Journey.PrimaryIdentifier).
Msg("Train schedule updated")
}

// Station Messages
Expand Down
18 changes: 18 additions & 0 deletions pkg/realtime/nationalrail/darwin/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,24 @@ type Schedule struct {
RID string `xml:"rid,attr"`
UID string `xml:"uid,attr"`
SSD string `xml:"ssd,attr"`
TOC string `xml:"toc,attr"`

CancelReason string `xml:"cancelReason"`

Origin ScheduleStop `xml:"OR"`
Intermediate []ScheduleStop `xml:"IP"`
Destination ScheduleStop `xml:"DT"`
}

type ScheduleStop struct {
Tiploc string `xml:"tpl,attr"`
Activity string `xml:"act,attr"`

PublicDeparture string `xml:"ptd,attr"`
WorkingDeparture string `xml:"wtd,attr"`

PublicArrival string `xml:"pta,attr"`
WorkingArrival string `xml:"wta,attr"`

Cancelled string `xml:"can,attr"`
}

0 comments on commit c919d31

Please sign in to comment.