Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add segments to Flow to update traffic on part of way #322

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-FORK.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Changes from v10.2.0
- CHANGED for internal refactoring, move `integration/pkg/api` to `integration/api`, and `integration/pkg/backend` to `integration/util/backend` [#315](https://github.com/Telenav/osrm-backend/pull/315)
- CHANGED for internal refactoring, rename `cmd/osrm-ranking` to `cmd/osrm-rankd` [#317](https://github.com/Telenav/osrm-backend/pull/317)
- ADDED versioning on golang binaries [#320](https://github.com/Telenav/osrm-backend/pull/320)
- ADDED segments to `Flow` to update traffic on part of way [#322](https://github.com/Telenav/osrm-backend/pull/322)
- Bugfix:
- CHANGED `osrm-ranking` parsing of OSRM route response to compatible with `string` array `annotation/nodes` [#296](https://github.com/Telenav/osrm-backend/pull/296)
- FIXED wrong variable `docker-entrypoint.sh` [#311](https://github.com/Telenav/osrm-backend/pull/311)
Expand Down
2 changes: 1 addition & 1 deletion integration/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ cmd/oasis/oasis
cmd/trafficproxy-cli/*_flows.csv
cmd/trafficproxy-cli/*_incidents.csv
cmd/historicalspeed-timezone-builder/*mapping.csv
cmd/osrm-traffic-updater/testdata/target.csv
cmd/osrm-traffic-updater/testdata/target*.csv
8 changes: 6 additions & 2 deletions integration/cmd/osrm-traffic-updater/dumper_statistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type dumperStatisticItems struct {
nodeMatchedCnt uint64
fwdTrafficMatchedCnt uint64
bwdTrafficMatchedCnt uint64
skippedSegmentsCnt uint64
}

type dumperStatistic struct {
Expand Down Expand Up @@ -41,6 +42,7 @@ func (d *dumperStatistic) Close() {
d.sum.nodeMatchedCnt += item.nodeMatchedCnt
d.sum.fwdTrafficMatchedCnt += item.fwdTrafficMatchedCnt
d.sum.bwdTrafficMatchedCnt += item.bwdTrafficMatchedCnt
d.sum.skippedSegmentsCnt += item.skippedSegmentsCnt
}
d.close = true
}
Expand All @@ -51,12 +53,12 @@ func (d *dumperStatistic) Sum() dumperStatisticItems {

func (d *dumperStatistic) Update(wayCnt uint64, nodeCnt uint64, fwdRecordCnt uint64,
bwdRecordCnt uint64, wayMatchedCnt uint64, nodeMatchedCnt uint64,
fwdTrafficMatchedCnt uint64, bwdTrafficMatchedCnt uint64) {
fwdTrafficMatchedCnt uint64, bwdTrafficMatchedCnt uint64, skippedSegmentsCnt uint64) {
if !d.init {
fmt.Printf("dumperStatistic->Update() failed, please call Init() first otherwise will block all functions. \n")
return
}
d.c <- (dumperStatisticItems{wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatchedCnt, nodeMatchedCnt, fwdTrafficMatchedCnt, bwdTrafficMatchedCnt})
d.c <- (dumperStatisticItems{wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatchedCnt, nodeMatchedCnt, fwdTrafficMatchedCnt, bwdTrafficMatchedCnt, skippedSegmentsCnt})
}

func (d *dumperStatistic) Output() {
Expand All @@ -69,6 +71,8 @@ func (d *dumperStatistic) Output() {
fmt.Printf("Load %d way from data with %d nodes.\n", d.sum.wayCnt, d.sum.nodeCnt)
fmt.Printf("%d way with %d nodes matched with traffic record.\n",
d.sum.wayMatchedCnt, d.sum.nodeMatchedCnt)
fmt.Printf("%d traffic segments have been skipped.\n",
d.sum.skippedSegmentsCnt)
fmt.Printf("%d traffic records(%d forward and %d backward) have been matched.\n",
d.sum.fwdTrafficMatchedCnt+d.sum.bwdTrafficMatchedCnt, d.sum.fwdTrafficMatchedCnt, d.sum.bwdTrafficMatchedCnt)
fmt.Printf("Generate %d records in final result with %d of them from forward traffic and %d from backword.\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ func TestDumperStatistic(t *testing.T) {
}

func accumulateDumper(d *dumperStatistic, wg *sync.WaitGroup) {
d.Update(1, 2, 3, 4, 5, 6, 7, 8)
d.Update(1, 2, 3, 4, 5, 6, 7, 8, 9)
wg.Done()
}
10 changes: 6 additions & 4 deletions integration/cmd/osrm-traffic-updater/osrm_traffic_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,16 @@ func main() {

isFlowDoneChan := make(chan bool, 1)
wayid2speed := make(map[int64]int)
segmentsOfWay := make(map[int64][]*trafficproxy.SegmentedFlow)

go func() {
trafficData, err := trafficproxyclient.GetFlowsIncidents(nil)
if err != nil {
log.Println(err)
isFlowDoneChan <- false
return
}

trafficData2map(*trafficData, wayid2speed)
trafficData2map(*trafficData, wayid2speed, segmentsOfWay)
isFlowDoneChan <- true
}()

Expand All @@ -60,7 +61,7 @@ func main() {
if isFlowDone {
var ds dumperStatistic
ds.Init(TASKNUM)
dumpSpeedTable4Customize(wayid2speed, sources, flags.csvFile, &ds)
dumpSpeedTable4Customize(wayid2speed, segmentsOfWay, sources, flags.csvFile, &ds)
ds.Output()
}
}
Expand All @@ -83,7 +84,7 @@ loop:
return isFlowDone
}

func trafficData2map(trafficData trafficproxy.TrafficResponse, m map[int64]int) {
func trafficData2map(trafficData trafficproxy.TrafficResponse, m map[int64]int, s map[int64][]*trafficproxy.SegmentedFlow) {
startTime := time.Now()
defer func() {
log.Printf("Processing time for building traffic map takes %f seconds\n", time.Now().Sub(startTime).Seconds())
Expand All @@ -102,6 +103,7 @@ func trafficData2map(trafficData trafficproxy.TrafficResponse, m map[int64]int)

wayid := flow.Flow.WayID
m[wayid] = int(flow.Flow.Speed)
s[wayid] = flow.Flow.SegmentedFlow

if wayid > 0 {
fwdCnt++
Expand Down
62 changes: 52 additions & 10 deletions integration/cmd/osrm-traffic-updater/speed_table_dumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@ import (
"bufio"
"fmt"
"log"
"math"
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/Telenav/osrm-backend/integration/traffic/livetraffic/trafficproxy"
"github.com/golang/glog"
)

var tasksWg sync.WaitGroup
var dumpFinishedWg sync.WaitGroup

func dumpSpeedTable4Customize(wayid2speed map[int64]int, sources [TASKNUM]chan string,
func dumpSpeedTable4Customize(wayid2speed map[int64]int, segmentsOfWay map[int64][]*trafficproxy.SegmentedFlow, sources [TASKNUM]chan string,
outputPath string, ds *dumperStatistic) {
startTime := time.Now()

Expand All @@ -23,19 +27,19 @@ func dumpSpeedTable4Customize(wayid2speed map[int64]int, sources [TASKNUM]chan s
}

sink := make(chan string)
startTasks(wayid2speed, sources, sink, ds)
startTasks(wayid2speed, segmentsOfWay, sources, sink, ds)
startDump(outputPath, sink)
wait4AllTasksFinished(sink, ds)

endTime := time.Now()
fmt.Printf("Processing time for dumpSpeedTable4Customize takes %f seconds\n", endTime.Sub(startTime).Seconds())
}

func startTasks(wayid2speed map[int64]int, sources [TASKNUM]chan string,
func startTasks(wayid2speed map[int64]int, segmentsOfWay map[int64][]*trafficproxy.SegmentedFlow, sources [TASKNUM]chan string,
sink chan<- string, ds *dumperStatistic) {
tasksWg.Add(TASKNUM)
for i := 0; i < TASKNUM; i++ {
go task(wayid2speed, sources[i], sink, ds)
go task(wayid2speed, segmentsOfWay, sources[i], sink, ds)
}
}

Expand All @@ -51,13 +55,15 @@ func wait4AllTasksFinished(sink chan string, ds *dumperStatistic) {
dumpFinishedWg.Wait()
}

func task(wayid2speed map[int64]int, source <-chan string, sink chan<- string, ds *dumperStatistic) {
var wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatched, nodeMatched, fwdTrafficMatched, bwdTrafficMatched uint64
func task(wayid2speed map[int64]int, segmentsOfWay map[int64][]*trafficproxy.SegmentedFlow, source <-chan string, sink chan<- string, ds *dumperStatistic) {
var wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatched, nodeMatched, fwdTrafficMatched, bwdTrafficMatched, skippedSegmentsCnt uint64
var err error
for str := range source {
elements := strings.Split(str, ",")
wayCnt += 1
nodeCnt += (uint64)(len(elements) - 1)
nodesInWayCnt := (uint64)(len(elements) - 1)
nodeCnt += nodesInWayCnt

if len(elements) < 3 {
continue
}
Expand All @@ -71,6 +77,22 @@ func task(wayid2speed map[int64]int, source <-chan string, sink chan<- string, d
speedFwd, okFwd := wayid2speed[(int64)(wayid)]
speedBwd, okBwd := wayid2speed[(int64)(-wayid)]

speedsFwd := make([]int, nodesInWayCnt)
speedsBwd := make([]int, nodesInWayCnt)

for i := range elements[1:] {
speedsFwd[i] = speedFwd
speedsBwd[i] = speedBwd
}

segmentsFwd, okSegFwd := segmentsOfWay[(int64)(wayid)]
segmentsBwd, okSegBwd := segmentsOfWay[(int64)(-wayid)]

if okSegFwd || okSegBwd {
getSpeedOfSegments(segmentsFwd, speedsFwd, nodesInWayCnt, &skippedSegmentsCnt)
getSpeedOfSegments(segmentsBwd, speedsBwd, nodesInWayCnt, &skippedSegmentsCnt)
}

if okFwd || okBwd {
var nodes []string = elements[1:]
wayMatched += 1
Expand All @@ -94,21 +116,41 @@ func task(wayid2speed map[int64]int, source <-chan string, sink chan<- string, d
}
if okFwd {
fwdRecordCnt += 1
sink <- generateSingleRecord(n1, n2, speedFwd, true)
sink <- generateSingleRecord(n1, n2, speedsFwd[i], true)
}
if okBwd {
bwdRecordCnt += 1
sink <- generateSingleRecord(n1, n2, speedBwd, false)
sink <- generateSingleRecord(n1, n2, speedsBwd[i], false)
}

}
}
}

ds.Update(wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatched, nodeMatched, fwdTrafficMatched, bwdTrafficMatched)
ds.Update(wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatched, nodeMatched, fwdTrafficMatched, bwdTrafficMatched, skippedSegmentsCnt)
tasksWg.Done()
}

func getSpeedOfSegments(segments []*trafficproxy.SegmentedFlow, speeds []int, nodesCnt uint64, skippedSegmentsCnt *uint64) {
for _, segment := range segments {
if 0 > segment.Begin || segment.Begin > segment.End || segment.End > 100 {
glog.Warningf("Unexpected segment length begin: %v, end: %v, should be between 0..100 \n", segment.Begin, segment.End)
continue
}

indexOfBegin := int(math.Floor(float64(nodesCnt) * float64(segment.Begin) / 100))
indexOfEnd := int(math.Floor(float64(nodesCnt) * float64(segment.End) / 100))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here might be some problem due to floating calculation. E.g., totally 11 nodes and 3 segmented flows {Speed: 10, Begin: 0, End: 27},{Speed: 25, Begin: 28, End: 75},{Speed: 60, Begin: 76, End: 100} on a way, and the segmented flows will be applied to node 0~2, 3~8, 8~11, but the 2~3 is missed.
To solve the problem, it might be better to sort the segmented flows of the way first, then adjust the nodes offset, i.e. 3~8 -> 2~8.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangyoucao577 I got it. But maybe it is ok.

For example, if first End and second Begin have the same osm nodeID, the segments will be {Speed: 10, Begin: 0, End: 27}, {Speed: 20, Begin: 27, End: 75}. It works for the same dataset on each side (traffic-updater and traffic-proxy).

On the other hand, I could add this condition

if lastEnd - curBegin == 1 {
	indexOfBegin + 1
}

But it will affect all segments i.e 8~11 -> 7~11

May be

if lastEnd - curBegin == 1 && lastIndexOfEnd != indexOfBegin {
	indexOfBegin + 1
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! Maybe this issue will gone if we simply to make sure next begin == last end. I.e.,
first segment: {Speed: 10, Begin: 0, End: 27}, then {Speed: 25, Begin: 27, End: 75} instead of {Speed: 25, Begin: 28, End: 75} as second segment(Begin: 28 ->Begin: 27).
How's your opinion?


if indexOfBegin == indexOfEnd {
*skippedSegmentsCnt += 1
}

for i := indexOfBegin; i < indexOfEnd; i++ {
speeds[i] = int(segment.Speed)
}
}
}

// format
// if dir = true, means traffic for forward, generate: from, to, speed
// if dir = false, means this speed is for backward flow, generate: to, from, speed
Expand Down
42 changes: 41 additions & 1 deletion integration/cmd/osrm-traffic-updater/speed_table_dumper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"strconv"
"strings"
"testing"

"github.com/Telenav/osrm-backend/integration/traffic/livetraffic/trafficproxy"
)

func TestSpeedTableDumper1(t *testing.T) {
Expand All @@ -23,16 +25,41 @@ func TestSpeedTableDumper1(t *testing.T) {

// construct mock traffic
wayid2speed := make(map[int64]int)
segmentsOfWay := make(map[int64][]*trafficproxy.SegmentedFlow)
loadMockTrafficFlow2Map(wayid2speed)

var ds dumperStatistic
ds.Init(TASKNUM)
dumpSpeedTable4Customize(wayid2speed, sources, "./testdata/target.csv", &ds)
dumpSpeedTable4Customize(wayid2speed, segmentsOfWay, sources, "./testdata/target.csv", &ds)

compareFileContentUnstable("./testdata/target.csv", "./testdata/expect.csv", t)
validateStatistic(&ds, t)
}

func TestSpeedTableDumper2(t *testing.T) {
// load result into sources
var sources [TASKNUM]chan string
for i := range sources {
sources[i] = make(chan string, 10000)
}
go loadWay2NodeidsTable("./testdata/id-mapping-segment.csv.snappy", sources)

// construct mock traffic
wayid2speed := make(map[int64]int)
wayid2speed[733690162] = 60
wayid2speed[-733689924] = 60

segmentsOfWay := make(map[int64][]*trafficproxy.SegmentedFlow)
loadMockTrafficFlowSegment2Map(segmentsOfWay)

var ds dumperStatistic
ds.Init(TASKNUM)
dumpSpeedTable4Customize(wayid2speed, segmentsOfWay, sources, "./testdata/target-segment.csv", &ds)

compareFileContentUnstable("./testdata/target-segment.csv", "./testdata/expect-segment.csv", t)
// validateStatistic(&ds, t)
}

func TestGenerateSingleRecord1(t *testing.T) {
str := generateSingleRecord(12345, 54321, 33, true)
if strings.Compare(str, "12345,54321,33\n") != 0 {
Expand Down Expand Up @@ -62,6 +89,19 @@ func loadMockTrafficFlow2Map(wayid2speed map[int64]int) {
wayid2speed[-24418344] = 59
}

func loadMockTrafficFlowSegment2Map(segmentsOfWay map[int64][]*trafficproxy.SegmentedFlow) {
segmentsOfWay[733690162] = []*trafficproxy.SegmentedFlow{
{Speed: 25, Begin: 25, End: 75},

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It tests part of way have segmented flow. I'm thinking about the case that every segments of way have segmented flow. E.g., {Speed: 10, Begin: 0, End: 24},{Speed: 25, Begin: 25, End: 75},{Speed: 60, Begin: 76, End: 100}. Is it possible in your application?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible. I've added the test case.

{Speed: 42, Begin: 80, End: 175}, // negative case
}
segmentsOfWay[-733689924] = []*trafficproxy.SegmentedFlow{
{Speed: 10, Begin: 0, End: 25},
{Speed: 20, Begin: 26, End: 50},
{Speed: 30, Begin: 51, End: 100},
{Speed: 42, Begin: 50, End: 25}, // negative case
}
}

type tNodePair struct {
f, t uint64
}
Expand Down
11 changes: 11 additions & 0 deletions integration/cmd/osrm-traffic-updater/testdata/expect-segment.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
1253042677,6871726226,10
6871775001,1253042677,20
6871775003,6871775001,30
6871726248,6871775003,30
6871726238,6871744979,60
6871744979,6871744978,60
6871744978,6871744977,25
6871744977,6871744976,25
6871744976,6871744975,25
6871744975,6871744974,25
6871744974,6871744973,60
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
733690162,6871726238,6871744979,6871744978,6871744977,6871744976,6871744975,6871744974,6871744973
733689924,6871726226,1253042677,6871775001,6871775003,6871726248
Binary file not shown.
11 changes: 7 additions & 4 deletions integration/traffic/livetraffic/trafficproxy/csv_string_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ func TestFlowCSVString(t *testing.T) {
humanFriendlyCSVString string
}{
{
Flow{WayID: 829733412, Speed: 20.280001, TrafficLevel: TrafficLevel_FREE_FLOW, Timestamp: 1579419488000},
"829733412,20.280001,7,1579419488000",
"829733412,20.280001,FREE_FLOW,1579419488000",
Flow{WayID: 829733412, Speed: 20.280001, TrafficLevel: TrafficLevel_FREE_FLOW, Timestamp: 1579419488000, SegmentedFlow: []*SegmentedFlow{
{Speed: 25, TrafficLevel: TrafficLevel_NO_LEVELS, Begin: 51, End: 75},
{Speed: 37, TrafficLevel: TrafficLevel_NO_LEVELS, Begin: 25, End: 50}},
},
"829733412,20.280001,7,1579419488000,37.000000,0,25,50,25.000000,0,51,75",
"829733412,20.280001,FREE_FLOW,1579419488000,37.000000,NO_LEVELS,25,50,25.000000,NO_LEVELS,51,75",
},
{
Flow{WayID: -129639168, Speed: 31.389999, TrafficLevel: TrafficLevel_FREE_FLOW, Timestamp: 1579419488000},
Flow{WayID: -129639168, Speed: 31.389999, TrafficLevel: TrafficLevel_FREE_FLOW, Timestamp: 1579419488000, SegmentedFlow: []*SegmentedFlow{}},
"-129639168,31.389999,7,1579419488000",
"-129639168,31.389999,FREE_FLOW,1579419488000",
},
Expand Down
Loading