Skip to content
This repository has been archived by the owner on Aug 15, 2024. It is now read-only.
/ knetty Public archive

Knetty is a network communication framework written in Go based on the event-driven architecture.

License

Notifications You must be signed in to change notification settings

Softwarekang/knetty

Repository files navigation

knetty

English | 中文

Introduction

knetty is a network communication framework written in Go based on the event-driven architecture. It supports TCP, UDP, and websocket protocols and is easy to use like Netty written in Java."

Contents

Installation

To install knetty package,you need to install Go and set your Go workspace first.

  • You first need Go installed (version 1.18+ is required), then you can use the below Go command to install knetty.
go get -u  github.com/Softwarekang/knetty
  • import in your code
import "github.com/Softwarekang/knetty"

Quick Start

# View knetty code examples
# work dir in knetty
cd /example/server
# view server start up code examples
cat server.go
package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/Softwarekang/knetty"
	"github.com/Softwarekang/knetty/session"
)

func main() {
	knetty.SetPollerNums(8)
	// setting optional options for the server
	options := []knetty.ServerOption{
		knetty.WithServiceNewSessionCallBackFunc(newSessionCallBackFn),
	}

	// creating a new server with network settings such as tcp/upd, address such as 127.0.0.1:8000, and optional options
	server := knetty.NewServer("tcp", "127.0.0.1:8000", options...)
	// Initializing the server in a goroutine so that
	// it won't block the graceful shutdown handling below
	go func() {
		if err := server.Server(); err != nil && errors.Is(err, http.ErrServerClosed) {
			log.Printf("run server: %s\n", err)
		}
	}()

	// Wait for interrupt signal to gracefully shut down the server with
	quit := make(chan os.Signal)
	// kill (no param) default send syscall.SIGTERM
	// kill -2 is syscall.SIGINT
	// kill -9 is syscall.SIGKILL but can't be caught, so don't need to add it
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	log.Printf("server pid:%d", os.Getpid())
	<-quit
	log.Println("shutting down server...")

	// The context is used to inform the server it has 5 seconds to finish
	// the request it is currently handling
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	if err := server.Shutdown(ctx); err != nil {
		log.Fatal("server starting shutdown:", err)
	}

	log.Println("server exiting")
}

// set the necessary parameters for the session to run.
func newSessionCallBackFn(s session.Session) error {
	s.SetCodec(&codec{})
	s.SetEventListener(&helloWorldListener{})
	return nil
}

type helloWorldListener struct {
}

func (e *helloWorldListener) OnMessage(s session.Session, pkg interface{}) session.ExecStatus {
	s.WritePkg(pkg)
	s.FlushBuffer()
	return session.Normal
}

func (e *helloWorldListener) OnConnect(s session.Session) {
	fmt.Printf("local:%s get a remote:%s connection\n", s.LocalAddr(), s.RemoteAddr())
}

func (e *helloWorldListener) OnClose(s session.Session) {
	fmt.Printf("server session: %s closed\n", s.Info())
}

func (e *helloWorldListener) OnError(s session.Session, err error) {
	fmt.Printf("session: %s got err :%v\n", s.Info(), err)
}

type codec struct {
}

func (c codec) Encode(pkg interface{}) ([]byte, error) {

	data, _ := pkg.(string)

	return []byte(data), nil
}

func (c codec) Decode(bytes []byte) (interface{}, int, error) {
	if bytes == nil {
		return nil, 0, errors.New("bytes is nil")
	}

	data := string(bytes)

	if len(data) == 0 {
		return nil, 0, nil
	}
	return data, len(data), nil
}
# start up server 
go run ./example/server/server/server.go
# view client start up code examples
cat client.go
/*
	Copyright 2022 Phoenix

	Licensed under the Apache License, Version 2.0 (the "License");
	you may not use this file except in compliance with the License.
	You may obtain a copy of the License at

	http://www.apache.org/licenses/LICENSE-2.0

	Unless required by applicable law or agreed to in writing, software
	distributed under the License is distributed on an "AS IS" BASIS,
	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
	See the License for the specific language governing permissions and
	limitations under the License.
*/

package main

import (
	"errors"
	"fmt"
	"log"
	"syscall"
	"time"

	"github.com/Softwarekang/knetty"
	"github.com/Softwarekang/knetty/session"
)

func main() {
	// setting optional options for the server
	options := []knetty.ClientOption{
		knetty.WithClientNewSessionCallBackFunc(newSessionCallBackFn),
	}
	client := knetty.NewClient("tcp", "127.0.0.1:8000", options...)

	if err := client.Run(); err != nil {
		log.Printf("run client: %s\n", err)
	}
}

// set the necessary parameters for the session to run.
func newSessionCallBackFn(s session.Session) error {
	s.SetCodec(codec{})
	s.SetEventListener(&pkgListener{})
	return nil
}

func sendHello(s session.Session) {
	n, err := s.WritePkg("hello")
	if err != nil && err != syscall.EAGAIN {
		log.Println(err)
	}

	fmt.Printf("client session send %v bytes data to server\n", n)
	if err := s.FlushBuffer(); err != nil {
		log.Println(err)
	}
}

type codec struct{}

func (c codec) Encode(pkg interface{}) ([]byte, error) {
	if pkg == nil {
		return nil, errors.New("pkg is illegal")
	}
	data, ok := pkg.(string)
	if !ok {
		return nil, errors.New("pkg type must be string")
	}

	return []byte(data), nil
}

func (c codec) Decode(bytes []byte) (interface{}, int, error) {
	if bytes == nil {
		return nil, 0, errors.New("bytes is nil")
	}

	if len(bytes) < 5 {
		return nil, 0, nil
	}

	data := string(bytes)
	if len(bytes) > 5 {
		data = data[0:5]
	}
	return data, len(data), nil
}

type pkgListener struct {
}

func (e *pkgListener) OnMessage(s session.Session, pkg interface{}) session.ExecStatus {
	data := pkg.(string)
	fmt.Printf("client got data:%s\n", data)
	_, err := s.WriteBuffer([]byte(data))
	if err = s.FlushBuffer(); err != nil {
	}
	time.Sleep(2 * time.Second)
	return session.Normal
}

func (e *pkgListener) OnConnect(s session.Session) {
	fmt.Printf("local:%s get a remote:%s connection\n", s.LocalAddr(), s.RemoteAddr())
	sendHello(s)
}

func (e *pkgListener) OnClose(s session.Session) {
	fmt.Printf("client session: %s closed\n", s.Info())

}

func (e *pkgListener) OnError(s session.Session, err error) {
	fmt.Printf("client session: %s got err :%v\n", s.Info(), err)
}
# start up client
go run ./example/server/client/client.go

More Detail

Using NewSessionCallBackFunc

definition

/*
	NewSessionCallBackFunc It is executed when a new session is established,
	so some necessary parameters for drawing need to be set to ensure that the session starts properly.
*/
type NewSessionCallBackFunc func(s session.Session) error

You can set parameters such as codec, event listener and more for the session via the provided API.

// set the necessary parameters for the session to run.
func newSessionCallBackFn(s session.Session) error {
	s.SetCodec(&codec{})
	s.SetEventListener(&helloWorldListener{})
	return nil
}

Using Codec

definition

// Codec for session
type Codec interface {
	// Encode will convert object to binary network data
	Encode(pkg interface{}) ([]byte, error)

	// Decode will convert binary network data into upper-layer protocol objects.
	// The following three conditions are used to distinguish abnormal, half - wrapped, normal and sticky packets.
	// Exceptions: nil,0,err
	// Half-pack: nil,0,nil
	// Normal & Sticky package: pkg,pkgLen,nil
	Decode([]byte) (interface{}, int, error)
}

Here is an implementation of a hello string boundary encoder that handles semi-packet, sticky packet, and exceptional network data processing logic.

func (c codec) Encode(pkg interface{}) ([]byte, error) {
	if pkg == nil {
		return nil, errors.New("pkg is illegal")
	}
	data, ok := pkg.(string)
	if !ok {
		return nil, errors.New("pkg type must be string")
	}

	if len(data) != 5 || data != "hello" {
		return nil, errors.New("pkg string must be \"hello\"")
	}

	return []byte(data), nil
}

func (c codec) Decode(bytes []byte) (interface{}, int, error) {
	if bytes == nil {
		return nil, 0, errors.New("bytes is nil")
	}

	if len(bytes) < 5 {
		return nil, 0, nil
	}

	data := string(bytes)
	if len(bytes) > 5 {
		data = data[0:5]
	}
	if data != "hello" {
		return nil, 0, errors.New("data is not 'hello'")
	}
	return data, len(data), nil
}

Using Custom Logger

definition

// Logger  A Logger is a minimalistic interface for the knetty to log messages to. Should
// be used to provide custom logging writers for the knetty to use.
type Logger interface {
	Errorf(format string, args ...interface{})
	Fatalf(format string, args ...interface{})
	Fatal(args ...interface{})
	Infof(format string, args ...interface{})
	Info(args ...interface{})
	Warnf(format string, args ...interface{})
	Debugf(format string, args ...interface{})
	Debug(args ...interface{})
}

// SetLogger set custom log
func SetLogger(logger log.Logger) {
	log.DefaultLogger = logger
}

set custom logger

// logger must impl Logger Interface
knetty.SetLogger(logger)

Using EventListener

definition

// EventListener listen for session
type EventListener interface {
	// OnMessage runs when the session gets a pkg
	OnMessage(s Session, pkg interface{})
	// OnConnect runs when the connection initialized
	OnConnect(s Session)
	// OnClose runs before the session closed
	OnClose(s Session)
	// OnError runs when the session err
	OnError(s Session, e error)
}

Below is a typical event listener.

type helloWorldListener struct {
}

func (e *helloWorldListener) OnMessage(s session.Session, pkg interface{}) {
	data := pkg.(string)
	fmt.Println(data)
}

func (e *helloWorldListener) OnConnect(s session.Session) {
	fmt.Printf("local:%s get a remote:%s connection\n", s.LocalAddr(), s.RemoteAddr())
}

func (e *helloWorldListener) OnClose(s session.Session) {
	fmt.Printf("session close\n")
}

func (e *helloWorldListener) OnError(s session.Session, err error) {
	fmt.Printf("session got err :%v\n", err)
}

Graceful shutdown

package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/Softwarekang/knetty"
	"github.com/Softwarekang/knetty/session"
)

func main() {
	// setting optional options for the server
	options := []knetty.ServerOption{
		knetty.WithServiceNewSessionCallBackFunc(newSessionCallBackFn),
	}

	// creating a new server with network settings such as tcp/upd, address such as 127.0.0.1:8000, and optional options
	server := knetty.NewServer("tcp", "127.0.0.1:8000", options...)
	// Initializing the server in a goroutine so that
	// it won't block the graceful shutdown handling below
	go func() {
		if err := server.Server(); err != nil && errors.Is(err, http.ErrServerClosed) {
			log.Printf("run server: %s\n", err)
		}
	}()

	// Wait for interrupt signal to gracefully shutdown the server with
	quit := make(chan os.Signal)
	// kill (no param) default send syscall.SIGTERM
	// kill -2 is syscall.SIGINT
	// kill -9 is syscall.SIGKILL but can't be caught, so don't need to add it
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit
	log.Println("shutting down server...")

	// The context is used to inform the server it has 5 seconds to finish
	// the request it is currently handling
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	if err := server.Shutdown(ctx); err != nil {
		log.Fatal("server starting shutdown:", err)
	}

	log.Println("server exiting")
}

Benchmarks

About

Knetty is a network communication framework written in Go based on the event-driven architecture.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published