Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

net: each Read, ReadFrom variants doesn't take care of a shared buffer which is used by multiple goroutines #8858

Closed
gopherbot opened this issue Oct 3, 2014 · 5 comments

Comments

Projects
None yet
3 participants
@gopherbot
Copy link

commented Oct 3, 2014

by rpzrpzrpz:

[root@rpzcentos rpzcache]# go version
go version go1.3.1 linux/amd64

net.ListenUDP("udp4",@gsrvaddr)
glen,grxadd, gerr := gconn.ReadFromUDP(gbuf[0:])

go funcprocessbytes(grxadd,gbuf[0:glen])

There is no way to call a go routine after calling ReadFromUDP since those bytes are not
safe in the go routine.  

The go routine will pass along the proper values in grxadd, but the bytes in gbuf 
are sometimes duplicated on successive calls to the funcprocessbytes( ).

Changing the code and removing the "go" directive, the bytes are processed OK
because the bytes are left in the network stack and each call to funcprocessbytes() is
called serially instead of in parallel.

I will try to work on a sample code program for this.
@gopherbot

This comment has been minimized.

Copy link
Author

commented Oct 3, 2014

Comment 1 by rpzrpzrpz:

Here is a sample Google Go Code Plus Python Code to generate the test.
You must set up a linux server with 5 IP addresses on the ethernet port. I am sure you
could probably just change the ports around and use a single IP address, but that is not
the test that I ran to produce the bugs.
You will observe the "Mangled Bytes" error message and that will indicate that the UDP
payload does not match the IP Address.
##############GO CODE##################
// testudp project main.go
package main
import (
    "fmt"
    "net"
    "os"
    "os/signal"
    "strings"
    "syscall"
    "time"
)
var gmax int = 0
var gstop bool = false
func main() {
    fmt.Println("TestUdp Starting up:", time.Now().UTC().String())
    gsigchan := make(chan os.Signal, 3)
    signal.Notify(gsigchan, os.Interrupt, syscall.SIGTERM)
    signal.Notify(gsigchan, os.Interrupt, syscall.SIGUSR1)
    go func() {
        <-gsigchan
        fmt.Println("TestUDP SIGTERM:", time.Now().UTC().String())
        gstop = true
    }()
    go udplisten("192.168.7.20", 8500, &gstop)
    for {
        time.Sleep(1000 * time.Millisecond)
        if gstop == true {
            break
        }
        //fmt.Println("TestUdp Running:", time.Now().UTC().String())
    }
    fmt.Println("TestUdp Exiting:", time.Now().UTC().String())
    time.Sleep(100 * time.Millisecond)
}
func udplisten(gip string, gport int, gstop *bool) bool {
    //Parameters
    //gip - IP Address will listen to
    //gport - Port we will listen on
    //gstop - pointer to control variable to terminate thread
    var gbuf [1024]byte
    if gip == "" {
        fmt.Println("Invalid IP Address")
        return false
    }
    if gport < 3000 {
        fmt.Println("Invalid Port")
        return false
    }
    if gstop == nil {
        fmt.Println("Invalid Stop variable")
        return false
    }
    gsrvaddr := net.UDPAddr{
        Port: gport,
        IP:   net.ParseIP(gip),
    }
    fmt.Println("UDP Listen ENTER:", gsrvaddr)
    for *gstop == false {
        gconn, gerr := net.ListenUDP("udp4", &gsrvaddr)
        if gerr != nil {
            fmt.Println("Cluster UDP Listen Error:", gerr)
            time.Sleep(1000 * time.Millisecond)
            continue
        }
        for *gstop == false {
            //#give us 1 second to read the bytes
            gconn.SetReadDeadline(time.Now().Add(time.Second))
            glen, grxadd, gerr := gconn.ReadFromUDP(gbuf[0:])
            if gerr != nil {
                //Remove timeout from this loop
                if strings.Contains(gerr.Error(), "i/o timeout") == true {
                    continue
                }
                fmt.Println("UDP Read Error:", gerr)
                gconn.Close()
                break
            }
            if grxadd == nil {
                time.Sleep(200 * time.Millisecond)
                fmt.Println("UDP No Address returned on RX")
                continue
            }
            go udpcallbackfunc(gconn, grxadd, glen, gbuf[0:glen])
        }
    }
    return true
}
func udpcallbackfunc(grxconn *net.UDPConn, grxadd *net.UDPAddr, grxlen int, grx []byte) {
    gstr := string(grx)
    if gstr == grxadd.IP.String() {
        //Our grx payload is good
        return
    }
    fmt.Println("Bytes Mangled:", gstr, grxadd.IP.String())
    gmax++
    if gmax > 200 {
        gstop = true
    }
    return
}
##############PYTHON CODE##################
import time
from socket import *
from threading import Thread
gstop = 0
def udpcli(srvaddr,cliaddr,payload):
    sock = socket(AF_INET,SOCK_DGRAM)
    sock.bind(cliaddr)
    print "UDP Thread going..."
    print cliaddr
    print srvaddr
    print payload
    while True:
        #Just send it a lot
        sock.sendto(payload,srvaddr)
        if gstop == 1:
            break
    print "UDP Thread stopping"
    print payload
saddr = ("192.168.7.20",8500)
caddr = ("192.168.7.21",8500)
ta = Thread(target=udpcli,args=(saddr,caddr,"192.168.7.21"))
ta.start()
caddr = ("192.168.7.22",8500)
tb = Thread(target=udpcli,args=(saddr,caddr,"192.168.7.22"))
tb.start()
caddr = ("192.168.7.23",8500)
tc = Thread(target=udpcli,args=(saddr,caddr,"192.168.7.23"))
tc.start()
caddr = ("192.168.7.24",8500)
td = Thread(target=udpcli,args=(saddr,caddr,"192.168.7.24"))
td.start()
print "Started UDP client"
while 1:
    if gstop == 1:
        break
    try:
        time.sleep(1)
        print "Running UDP client"
    except KeyboardInterrupt:
        print "Keyboard Break"
        gstop = 1
        break 
print "Broken Out Break"
time.sleep(1)
print "Stopped..."
@gopherbot

This comment has been minimized.

Copy link
Author

commented Oct 3, 2014

Comment 2 by rpzrpzrpz:

If you want to "fix" the sample, just remove the word "go" in front of the callbackfunc
in the go program.
go udpcallbackfunc(gconn, grxadd, glen, gbuf[0:glen])
to --> udpcallbackfunc(gconn, grxadd, glen, gbuf[0:glen])
without the word "go" infront of it.
@gopherbot

This comment has been minimized.

Copy link
Author

commented Oct 3, 2014

Comment 3 by rpzrpzrpz:

I did not mean to suggest that "fix" meant the bug was gone. Its just that a go routine
enabled server is out of the question.  
It would just be easier for the rx bytes to get copied correctly into the gbuf from
ReadFromUDP( ), but that does not seem to be the case.
@minux

This comment has been minimized.

Copy link
Member

commented Oct 3, 2014

Comment 4:

each udpcallback goroutine is using the same underlying backing array that is backing
gbuf, so data corruption is expected if you introduce concurrency.
run you code under the race detector and this bug will be pointed out.
go build -race

Status changed to WorkingAsIntended.

@mikioh

This comment has been minimized.

Copy link
Contributor

commented Oct 7, 2014

Comment 5:

Status changed to Duplicate.

Merged into issue #8881.

@gopherbot gopherbot added the duplicate label Oct 7, 2014

@golang golang locked and limited conversation to collaborators Jun 25, 2016

This issue was closed.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
You can’t perform that action at this time.