Skip to content

Commit

Permalink
feat(knetty): add listener ,dialer for tcp protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
Softwarekang committed Apr 16, 2023
1 parent 444212a commit 55c249e
Show file tree
Hide file tree
Showing 19 changed files with 294 additions and 197 deletions.
21 changes: 8 additions & 13 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package knetty
import (
"context"
"fmt"
"net"

"github.com/Softwarekang/knetty/net/connection"
"github.com/Softwarekang/knetty/net/poll"
merr "github.com/Softwarekang/knetty/pkg/err"
"github.com/Softwarekang/knetty/internal/net"
"github.com/Softwarekang/knetty/internal/net/connection"
"github.com/Softwarekang/knetty/internal/net/poll"
errors "github.com/Softwarekang/knetty/pkg/err"
"github.com/Softwarekang/knetty/pkg/log"
"github.com/Softwarekang/knetty/session"
)
Expand Down Expand Up @@ -53,7 +53,7 @@ func NewClient(network, address string, opts ...ClientOption) *Client {

func (c *Client) Run() error {
if !c.isActive() {
return merr.ClientClosedErr
return errors.ClientClosedErr
}

switch c.network {
Expand Down Expand Up @@ -91,16 +91,11 @@ func (c *Client) dicTcp() (connection.Connection, error) {
return nil, err
}

tcpConn, err := connection.NewTcpConn(netConn)
if err != nil {
return nil, err
}

if err := tcpConn.Register(poll.Read); err != nil {
if err := netConn.Register(poll.Read); err != nil {
return nil, err
}

return tcpConn, nil
return netConn, nil
}

func (c *Client) waitQuit() {
Expand All @@ -127,7 +122,7 @@ func (c *Client) Shutdown(ctx context.Context) error {
case <-ctx.Done():
return fmt.Errorf("server shutdown caused by:%s", ctx.Err())
case <-c.closeCh:
return merr.ClientClosedErr
return errors.ClientClosedErr
default:
c.quit(nil)
if c.session != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
// Package connection implements tcp, udp and other protocols for network connection.
package connection

import "go.uber.org/atomic"
import (
"github.com/Softwarekang/knetty/internal/net/poll"

"go.uber.org/atomic"
)

type ConnType int

Expand All @@ -43,6 +47,8 @@ type EventTrigger interface {
type Connection interface {
// ID return a uin-type value that uniquely identifies each stream connection。
ID() uint64
// FD return socket fd.
FD() int
// LocalAddr return the actual local connection address.
LocalAddr() string
// RemoteAddr return the actual remote connection address.
Expand All @@ -60,6 +66,8 @@ type Connection interface {
Len() int
// Type Return the current connection network type tcp/udp/ws.
Type() ConnType
// Register register conn in poller with event.
Register(eventType poll.EventType) error
// Close the network connection, regardless of the ongoing blocking non-blocking read and write will return an error.
Close() error
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package connection

import (
"github.com/Softwarekang/knetty/net/poll"
"github.com/Softwarekang/knetty/internal/net/poll"
"github.com/Softwarekang/knetty/pkg/buffer"

"go.uber.org/atomic"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package connection

import (
"github.com/Softwarekang/knetty/net/poll"
"github.com/Softwarekang/knetty/internal/net/poll"

"golang.org/x/sys/unix"
)
Expand Down
39 changes: 13 additions & 26 deletions net/connection/tcp.go → internal/net/connection/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,29 @@
package connection

import (
"errors"
"net"
"syscall"

"github.com/Softwarekang/knetty/net/poll"
"github.com/Softwarekang/knetty/internal/net/poll"
"github.com/Softwarekang/knetty/pkg/buffer"
netutil "github.com/Softwarekang/knetty/pkg/net"
syscallutil "github.com/Softwarekang/knetty/pkg/syscall"
)

// TcpConn tcp connection implements the Connection interface.
type TcpConn struct {
knettyConn

conn net.Conn
}

// NewTcpConn create a new tcp connection, conn implements net.Conn in the standard library.
func NewTcpConn(conn net.Conn) (*TcpConn, error) {
if conn == nil {
return nil, errors.New("NewTcpConn(conn net.Conn): conn is nil")
}

// NewTcpConn create a new tcp connection, conn implements Connection.
func NewTcpConn(fd int, lsr, rsr net.Addr) *TcpConn {
var localAddress, remoteAddress string
if conn.LocalAddr() != nil {
localAddress = conn.LocalAddr().String()
}

if conn.RemoteAddr() != nil {
remoteAddress = conn.RemoteAddr().String()
if lsr != nil {
localAddress = lsr.String()
}

// get real fd from conn
fd, err := netutil.ResolveConnFileDesc(conn)
if err != nil {
return nil, err
if rsr != nil {
remoteAddress = rsr.String()
}

// set conn no block
_ = syscallutil.SetConnectionNoBlock(fd)
return &TcpConn{
knettyConn: knettyConn{
id: idBuilder.Inc(),
Expand All @@ -68,15 +51,19 @@ func NewTcpConn(conn net.Conn) (*TcpConn, error) {
inputBuffer: buffer.NewRingBuffer(),
outputBuffer: buffer.NewRingBuffer(),
},
conn: conn,
}, nil
}
}

// ID implements Connection.
func (t *TcpConn) ID() uint64 {
return t.id
}

// FD implements Connection.
func (t *TcpConn) FD() int {
return t.fd
}

// LocalAddr implements Connection.
func (t *TcpConn) LocalAddr() string {
return t.localAddress
Expand Down
96 changes: 96 additions & 0 deletions internal/net/listener/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
Copyright 2022 Phoenix
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package listener

import (
"net"
"time"

"github.com/Softwarekang/knetty/internal/net/connection"
errors "github.com/Softwarekang/knetty/pkg/err"
netutil "github.com/Softwarekang/knetty/pkg/net"

"golang.org/x/sys/unix"
)

// Listener A Listener is a generic network listener for stream-oriented protocols.
type Listener interface {
// Accept waits for and returns the next connection to the listener.
Accept() (connection.Connection, error)

// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.
Close() error

// Addr returns the listener's network address.
Addr() net.Addr

// FD returns the listener's fd
FD() int
}

// TcpListener tcp network listener.
type TcpListener struct {
Fd int
TcpAddr *net.TCPAddr
keepAlive time.Duration

Check failure on line 50 in internal/net/listener/listener.go

View workflow job for this annotation

GitHub Actions / Lint

field `keepAlive` is unused (unused)
}

// Accept implements Listener.
func (t *TcpListener) Accept() (connection.Connection, error) {
if !t.ok() {
return nil, errors.IllegalListenerErr("tcp")
}

cfd, sa, err := unix.Accept(t.Fd)
if err != nil {
if err == unix.EAGAIN {
return nil, nil
}
return nil, err
}

rsa := netutil.SocketAddrToAddr(sa)
return connection.NewTcpConn(cfd, t.TcpAddr, rsa), unix.SetNonblock(cfd, true)
}

// Close implements Listener.
func (t *TcpListener) Close() error {
if t.Fd != 0 {
return unix.Close(t.Fd)
}

return nil
}

// Addr implements Listener.
func (t *TcpListener) Addr() net.Addr {
return t.TcpAddr
}

// FD implements Listener.
func (t *TcpListener) FD() int {
return t.Fd
}

func (t *TcpListener) ok() bool {
if t.Fd != 0 && t.TcpAddr != nil {
return true
}

return false
}
103 changes: 103 additions & 0 deletions internal/net/net.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
Copyright 2022 Phoenix
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package net

import (
"fmt"
"net"

"github.com/Softwarekang/knetty/internal/net/connection"
"github.com/Softwarekang/knetty/internal/net/listener"
errors "github.com/Softwarekang/knetty/pkg/err"
netutil "github.com/Softwarekang/knetty/pkg/net"

"golang.org/x/sys/unix"
)

func Listen(network, address string) (listener.Listener, error) {
switch network {
case "tcp":
return listenTcp(network, address)
default:
return nil, errors.UnKnowNetworkErr(network)
}

}

func listenTcp(network, address string) (*listener.TcpListener, error) {
tcpAddr, err := net.ResolveTCPAddr(network, address)
if err != nil {
return nil, err
}

fd, err := unix.Socket(unix.AF_INET, unix.SOCK_STREAM, 0)
if err != nil {
return nil, err
}

if err := unix.Bind(fd, &unix.SockaddrInet4{
Port: tcpAddr.Port,
Addr: tcpAddr.AddrPort().Addr().As4(),
}); err != nil {
return nil, err
}

if err := unix.Listen(fd, unix.SOMAXCONN); err != nil {
return nil, err
}
return &listener.TcpListener{
Fd: fd,
TcpAddr: tcpAddr,
}, unix.SetNonblock(fd, true)
}

func Dial(network, address string) (connection.Connection, error) {
switch network {
case "tcp":
return dialTcp(network, address)
default:
return nil, errors.UnKnowNetworkErr(network)
}

}

func dialTcp(network string, address string) (*connection.TcpConn, error) {
tcpAddr, err := net.ResolveTCPAddr(network, address)
if err != nil {
return nil, err
}

fd, err := unix.Socket(unix.AF_INET, unix.SOCK_STREAM, 0)
if err != nil {
return nil, err
}

rsa, err := netutil.ResolveNetAddrToSocketAddr(tcpAddr)
if err != nil {
return nil, err
}
fmt.Println(rsa)
if err = unix.Connect(fd, rsa); err != nil {
return nil, err
}

lsa, err := unix.Getsockname(fd)
if err != nil {
return nil, err
}
return connection.NewTcpConn(fd, netutil.SocketAddrToAddr(lsa), netutil.SocketAddrToAddr(rsa)), unix.SetNonblock(fd, true)
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion knetty.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package knetty

import (
"github.com/Softwarekang/knetty/net/poll"
"github.com/Softwarekang/knetty/internal/net/poll"
"github.com/Softwarekang/knetty/pkg/log"
)

Expand Down

0 comments on commit 55c249e

Please sign in to comment.