Skip to content

Commit

Permalink
upload file cache
Browse files Browse the repository at this point in the history
  • Loading branch information
devinyf committed May 16, 2024
1 parent fea3c9e commit 020ba64
Show file tree
Hide file tree
Showing 11 changed files with 221 additions and 129 deletions.
14 changes: 8 additions & 6 deletions example/qwen_vl/stream_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,24 @@ func main() {
panic("token is empty")
}

cli := dashscopego.NewTongyiClient(model, token)
cli := dashscopego.NewTongyiClient(model, token).
SetUploadCache(qwen.NewMemoryFileCache()) // 可以通过 UploadCacher 接口 自定义缓存实现 避免重复上传, 默认使用内存缓存

homedir, _ := os.UserHomeDir()

sysContent := qwen.VLContentList{
{
Text: "You are a helpful assistant.",
},
}

userContent := qwen.VLContentList{
{
// Text: "describe the image",
Text: "用唐诗体说明一下这张图片中的内容", //nolint:gosmopolitan
},
{
// Image: "https://pic.ntimg.cn/20140113/8800276_184351657000_2.jpg",
// Image: "file:///Users/xxxx/xxxx.png",
Image: "https://dashscope.oss-cn-beijing.aliyuncs.com/images/dog_and_girl.jpeg",
Image: "file://" + homedir + "/Downloads/pandas_img.jpg",
// Image: "https://dashscope.oss-cn-beijing.aliyuncs.com/images/dog_and_girl.jpeg",
},
}

Expand All @@ -42,12 +44,12 @@ func main() {
{Role: qwen.RoleUser, Content: &userContent},
},
}

// callback function: print stream result
streamCallbackFn := func(ctx context.Context, chunk []byte) error {
log.Print(string(chunk))
return nil
}

req := &dashscopego.VLRequest{
Input: input,
StreamingFn: streamCallbackFn,
Expand Down
9 changes: 8 additions & 1 deletion qwen/content.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,17 @@ type IBlobContent interface {

type IBlobListConvert interface {
ConvertToBlobList() []IBlobContent
ConvertBackFromBlobList(list []IBlobContent)
}

func popBlobContent(rawList IBlobListConvert) (IBlobContent, bool) {
// TODO: rawList must be a pointer, otherwise it will panic.
list := rawList.ConvertToBlobList()
return innerGetBlob(&list)
content, hasBlob := innerGetBlob(&list)

rawList.ConvertBackFromBlobList(list)

return content, hasBlob
}

func innerGetBlob(list *[]IBlobContent) (IBlobContent, bool) {
Expand All @@ -36,6 +42,7 @@ func innerGetBlob(list *[]IBlobContent) (IBlobContent, bool) {
if v.GetBlob() != "" {
hasBlob = true
preSlice := (*list)[:i]

if i == len(*list)-1 {
*list = preSlice
} else {
Expand Down
13 changes: 13 additions & 0 deletions qwen/content_audio.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,16 @@ func (acList *AudioContentList) ConvertToBlobList() []IBlobContent {
}
return list
}

func (acList *AudioContentList) ConvertBackFromBlobList(list []IBlobContent) {
if acList == nil {
panic("VLContentList is nil or empty")
}

*acList = make([]AudioContent, len(list))
for i, v := range list {
if content, ok := v.(AudioContent); ok {
(*acList)[i] = content
}
}
}
13 changes: 13 additions & 0 deletions qwen/content_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,16 @@ func (fclist *FileContentList) ConvertToBlobList() []IBlobContent {
}
return list
}

func (fclist *FileContentList) ConvertBackFromBlobList(list []IBlobContent) {
if fclist == nil {
panic("VLContentList is nil or empty")
}

*fclist = make([]FileContent, len(list))
for i, v := range list {
if content, ok := v.(FileContent); ok {
(*fclist)[i] = content
}
}
}

Check failure on line 93 in qwen/content_file.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofmt`-ed with `-s` (gofmt)
13 changes: 13 additions & 0 deletions qwen/content_vl.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,16 @@ func (vlist *VLContentList) ConvertToBlobList() []IBlobContent {
}
return list
}

func (vlist *VLContentList) ConvertBackFromBlobList(list []IBlobContent) {
if vlist == nil {
panic("VLContentList is nil or empty")
}

*vlist = make([]VLContent, len(list))
for i, v := range list {
if content, ok := v.(VLContent); ok {
(*vlist)[i] = content
}
}
}
94 changes: 0 additions & 94 deletions qwen/filecachemgr.go

This file was deleted.

82 changes: 82 additions & 0 deletions qwen/upload_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package qwen

import (
"crypto/sha1"

Check failure on line 4 in qwen/upload_cache.go

View workflow job for this annotation

GitHub Actions / lint

G505: Blocklisted import crypto/sha1: weak cryptographic primitive (gosec)
"encoding/hex"
"time"
)

// UploadCacher is an interface for caching uploaded file url to prevent duplicate upload.
// By default we provide Sample MemoryFileCache as the implementation.
// Customize your own cache manager by implementing this interface.
type UploadCacher interface {
SaveCache(buf []byte, url string) error
GetCache(buf []byte) string
}

// ==================== Sample MemoryFileCache ====================

Check failure on line 17 in qwen/upload_cache.go

View workflow job for this annotation

GitHub Actions / lint

Comment should end in a period (godot)
type FileCacheInfo struct {
URL string
UploadTime int64
}

// MemoryFileCache is a simple in-memory-cache implementation for UploadCacher interface.
type MemoryFileCache struct {
MapFiles map[string]*FileCacheInfo
MaxFileCacheLifeTime time.Duration
}

func NewMemoryFileCache() *MemoryFileCache {

Check failure on line 29 in qwen/upload_cache.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary leading newline (whitespace)

Check failure on line 30 in qwen/upload_cache.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed (gofumpt)
mgr := &MemoryFileCache{
MapFiles: make(map[string]*FileCacheInfo),
MaxFileCacheLifeTime: time.Hour*2 - time.Minute*5,
}

// cron job to clean up outdated memory cache.
go mgr.cronMemoryCleaner()

return mgr
}

func (mgr *MemoryFileCache) SaveCache(buf []byte, url string) error {
key := mgr.hash(buf)

mgr.MapFiles[key] = &FileCacheInfo{
URL: url,
UploadTime: time.Now().Unix(),
}

return nil
}

func (mgr *MemoryFileCache) GetCache(buf []byte) string {
key := mgr.hash(buf)

cache, isok := mgr.MapFiles[key]
if isok {
return cache.URL
}

return ""
}

func (mgr *MemoryFileCache) cronMemoryCleaner() {
for {
time.Sleep(time.Minute * 5)

curtime := time.Now().Unix()

for k, v := range mgr.MapFiles {
if curtime-v.UploadTime > int64(mgr.MaxFileCacheLifeTime) {
delete(mgr.MapFiles, k)
}
}
}
}

func (mgr *MemoryFileCache) hash(buf []byte) string {
h := sha1.New()

Check failure on line 79 in qwen/upload_cache.go

View workflow job for this annotation

GitHub Actions / lint

G401: Use of weak cryptographic primitive (gosec)
h.Write(buf)
return hex.EncodeToString(h.Sum(nil))
}
49 changes: 35 additions & 14 deletions qwen/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"io"
"log"
"mime/multipart"
"net/http"
"os"
Expand Down Expand Up @@ -92,36 +93,60 @@ type UploadRequest struct {
*/

// uploading local image to aliyun oss, a oss url will be returned.
func UploadLocalFile(ctx context.Context, filePath, model, apiKey string) (string, error) {
func UploadLocalFile(ctx context.Context, filePath, model, apiKey string, uploadCacher UploadCacher) (string, error) {
fileBytes, mimeType, err := loadLocalFileWithMimeType(filePath)
if err != nil {
return "", err
}

fileName := filepath.Base(filePath)

return uploadFIle(ctx, fileBytes, fileName, mimeType, model, apiKey)
if uploadCacher != nil {
return uploadFileWithCache(ctx, fileBytes, fileName, mimeType, model, apiKey, uploadCacher)
} else {

Check warning on line 106 in qwen/uploader.go

View workflow job for this annotation

GitHub Actions / lint

indent-error-flow: if block ends with a return statement, so drop this else and outdent its block (revive)
return uploadFile(ctx, fileBytes, fileName, mimeType, model, apiKey)
}
}

// download and uploading a online image to aliyun oss, a oss url will be returned.
func UploadFileFromURL(ctx context.Context, fileURL, model, apiKey string) (string, error) {
func UploadFileFromURL(ctx context.Context, fileURL, model, apiKey string, uploadCacher UploadCacher) (string, error) {
fileBytes, mimeType, err := downloadFileWithMimeType(fileURL)
if err != nil {
return "", err
}
fileName := filepath.Base(fileURL)

return uploadFIle(ctx, fileBytes, fileName, mimeType, model, apiKey)
if uploadCacher != nil {
return uploadFileWithCache(ctx, fileBytes, fileName, mimeType, model, apiKey, uploadCacher)
} else {

Check warning on line 121 in qwen/uploader.go

View workflow job for this annotation

GitHub Actions / lint

indent-error-flow: if block ends with a return statement, so drop this else and outdent its block (revive)
return uploadFile(ctx, fileBytes, fileName, mimeType, model, apiKey)
}
}

func uploadFIle(ctx context.Context, fileBytes []byte, fileName, mimeType, model, apiKey string) (string, error) {
if gFileCacheMgr != nil {
url := gFileCacheMgr.Get(fileBytes)
if url != "" {
return url, nil
}
func uploadFileWithCache(ctx context.Context, fileBytes []byte, fileName, mimeType, model, apiKey string, uploadCacher UploadCacher) (string, error) {
var ossUrl string

Check warning on line 127 in qwen/uploader.go

View workflow job for this annotation

GitHub Actions / lint

var-naming: var ossUrl should be ossURL (revive)
var err error

ossUrl = uploadCacher.GetCache(fileBytes)
if ossUrl != "" {
return ossUrl, nil
}

ossUrl, err = uploadFile(ctx, fileBytes, fileName, mimeType, model, apiKey)
if err != nil {
return "", err
}

err = uploadCacher.SaveCache(fileBytes, ossUrl)
if err != nil {
log.Printf("save upload cache error: %v\n", err)
}

return ossUrl, nil
}

func uploadFile(ctx context.Context, fileBytes []byte, fileName, mimeType, model, apiKey string) (string, error) {

Check failure on line 148 in qwen/uploader.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary leading newline (whitespace)

Check failure on line 149 in qwen/uploader.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed (gofumpt)
certInfo, err := getUploadCertificate(ctx, model, apiKey)
if err != nil {
return "", &WrapMessageError{Message: "upload Cert Error", Cause: err}
Expand Down Expand Up @@ -150,10 +175,6 @@ func uploadFIle(ctx context.Context, fileBytes []byte, fileName, mimeType, model

ossURL := "oss://" + ossKey

if gFileCacheMgr != nil {
gFileCacheMgr.Cache(fileBytes, ossURL)
}

return ossURL, nil
}

Expand Down
Loading

0 comments on commit 020ba64

Please sign in to comment.