Skip to content

Commit

Permalink
Read msg from stdin and reduce sleeps
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanJarv committed Oct 26, 2021
1 parent 59b5e34 commit b8b589a
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 81 deletions.
53 changes: 29 additions & 24 deletions lib/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"fmt"
"io"
"io/ioutil"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -55,28 +54,32 @@ func (c *Client) sendWorker() {
c.wg.Done()
}()

b, err := ioutil.ReadAll(c.sendReader)
if err != nil {
return
}

// Iterate over each byte in the string.
for _, b := range b {
// Iterate over each bit in the byte.
for bit := 1; bit <= 8; bit++ {
// We want to check if the bit we are checking is set for the current byte. To do this we can shift it
// right n-1, bitwise and it with 1 to clear bits on the left, then check if it equals 1.
if (b >> (bit-1) & 1) == 1 {
// TODO: Debug logging of errors returned here.
Ping(&syscall.SockaddrInet4{Addr: c.Addr, Port: SEND_BASE_PORT + bit})
}
}

// Send clock ping, notifies the server we finished the last byte.
// TODO: Debug logging of errors returned here.
_, _ = Ping(&syscall.SockaddrInet4{Addr: c.Addr, Port: SEND_BASE_PORT})
time.Sleep(1)
for {
b := make([]byte, 8)
_, err := c.sendReader.Read(b)
if err == io.EOF {
break
} else if err != nil {
fmt.Printf("error reading from send pipe: %s\n", err)
continue
}

// Iterate over each byte in the string.
for _, b := range b {
// Iterate over each bit in the byte.
for bit := 1; bit <= 8; bit++ {
// We want to check if the bit we are checking is set for the current byte. To do this we can shift it
// right n-1, bitwise and it with 1 to clear bits on the left, then check if it equals 1.
if (b >> (bit - 1) & 1) == 1 {
// TODO: Debug logging of errors returned here.
Ping(&syscall.SockaddrInet4{Addr: c.Addr, Port: SEND_BASE_PORT + bit})
}
}

// Send clock ping, notifies the server we finished the last byte.
// TODO: Debug logging of errors returned here.
_, _ = Ping(&syscall.SockaddrInet4{Addr: c.Addr, Port: SEND_BASE_PORT})
}
}
return
}
Expand Down Expand Up @@ -121,17 +124,19 @@ func (c *Client) receiveWorker() {
}

func (c *Client) receiveClockPing() error {
checks := 10
checks := 9
var err error
var open bool

wait := time.Millisecond
// Try sending the Clock ping 10 times, if the port isn't open by then something went wrong.
for i := 1; i <= checks; i++ {
// Server will close this port when it is ready to send data.
// TODO: Debug logging of retries.
open, err = Ping(&syscall.SockaddrInet4{Addr: c.Addr, Port: RECEIVE_BASE_PORT})
if err != nil || !open {
time.Sleep(time.Millisecond * 10)
time.Sleep(wait)
wait *= 3
} else {
break
}
Expand Down
60 changes: 31 additions & 29 deletions lib/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package lib
import (
"fmt"
"io"
"io/ioutil"
"log"
"net"
"sync"
Expand Down Expand Up @@ -47,7 +46,6 @@ func NewServer() (*Server, error) {
func (s *Server) receiveWorker() (err error) {
err = s.handleSendConn(SEND_BASE_PORT, func() {
if s.b == byte(0) {
fmt.Println("r.b is nil")
return
}

Expand Down Expand Up @@ -139,33 +137,37 @@ func (s *Server) sendWorker() {
}

go func() {
buf, err := ioutil.ReadAll(s.sendReader)
if err != nil {
fmt.Printf("reading from send pipe: %s\n", err)
}
for _, b := range buf {
// Opening the clock port means we're ready for the client to check the bit ports.

bCopy := b
for bit := 1; bit <= 8; bit++ {
isSet := ((bCopy >> (bit - 1)) & 1) == byte(1)
if isSet {
shouldOpen[bit] <- 1
}
}

// Opening the start clock port means we're ready for the client to check the bit ports.
err = s.waitForConn(RECEIVE_BASE_PORT)
if err != nil {
fmt.Printf("server send clock start: %s\n", err)
}

// A connection on the end clock port means the client has finished iterating the ports.
err = s.waitForConn(RECEIVE_BASE_PORT + 9)
if err != nil {
fmt.Printf("server send clock end: %s\n", err)
}
}
for {
buf := make([]byte, 8)
_, err := s.sendReader.Read(buf)
if err == io.EOF {
break
} else if err != nil {
fmt.Printf("reading from send pipe: %s\n", err)
continue
}

for _, b := range buf {
for bit := 1; bit <= 8; bit++ {
isSet := ((b >> (bit - 1)) & 1) == byte(1)
if isSet {
shouldOpen[bit] <- 1
}
}

// Opening the start clock port means we're ready for the client to check the bit ports.
err = s.waitForConn(RECEIVE_BASE_PORT)
if err != nil {
fmt.Printf("server send clock start: %s\n", err)
}

// A connection on the end clock port means the client has finished iterating the ports.
err = s.waitForConn(RECEIVE_BASE_PORT + 9)
if err != nil {
fmt.Printf("server send clock end: %s\n", err)
}
}
}
}()
}

Expand Down
74 changes: 46 additions & 28 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,42 +23,60 @@ func main() {
}

if flag.Args()[0] == "client" {
if len(flag.Args()) < 2 {
log.Fatalf("USAGE: %s client <ip>\n", os.Args[0])
}
ip := parseIp(flag.Args()[1])
client(flag.Args()[1:]...)
} else if (len(os.Args) >= 1) && (flag.Args()[0] == "server") {
server()
} else {
help()
}
}

client := lib.NewClient(ip)
_, err := io.WriteString(client.Send, "Hello from client")
if err != nil {
log.Fatalf("failed to write to connection: %s\n", err)
}
client.Send.Close()
func server() {
server, err := lib.NewServer()
if err != nil {
log.Fatalf("starting server: %s\n", err)
}

io.Copy(os.Stdout, client.Receive)
go func() {
_, err = io.Copy(server.Send, os.Stdin)
if err != nil {
log.Fatalf("error writing to send pipe: %s\n", err)
}
err := server.Send.Close()
if err != nil {
fmt.Printf("failed to close send pipe: %s\n", err)
}
}()

//err = client.Wait()
//if err != nil {
// log.Println(err)
//}
} else if (len(os.Args) >= 1) && (flag.Args()[0] == "server") {
server, err := lib.NewServer()
_, err = io.Copy(os.Stdout, server.Recieve)
if err != nil {
log.Fatalf("error copying to stdout: %s\n", err)
}
}

func client(args ...string) {
if len(args) != 1 {
log.Fatalf("USAGE: %s client <ip>\n", os.Args[0])
}
ip := parseIp(args[0])

client := lib.NewClient(ip)

go func() {
_, err := io.Copy(client.Send, os.Stdin)
if err != nil {
log.Fatalf("starting server: %s\n", err)
log.Fatalf("error writing to send pipe: %s\n", err)
}

_, err = io.Copy(server.Send, strings.NewReader("Hello from server"))
if err != nil {
log.Fatalf("copying to server.Send: %s\n", err)
}
server.Send.Close()

_, err = io.Copy(os.Stdout, server.Recieve)
err = client.Send.Close()
if err != nil {
log.Fatalf("copying output to stdout: %s\n", err)
fmt.Printf("error closing send pipe: %s\n", err)
}
} else {
help()
}()

_, err := io.Copy(os.Stdout, client.Receive)
if err != nil {
fmt.Printf("error copying to stdout: %s\n", err)
}
}

Expand Down

0 comments on commit b8b589a

Please sign in to comment.