Skip to content

Commit eca88af

Browse files
committed
Add go library
1 parent cdefa03 commit eca88af

File tree

2 files changed

+378
-0
lines changed

2 files changed

+378
-0
lines changed

lib/go/buse/buse.go

Lines changed: 375 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,375 @@
1+
// Copyright (C) 2021 Vojtech Aschenbrenner <v@asch.cz>
2+
3+
package buse
4+
5+
import (
6+
"encoding/binary"
7+
"errors"
8+
"fmt"
9+
"io/ioutil"
10+
"os"
11+
"runtime"
12+
"sync"
13+
"syscall"
14+
)
15+
16+
const (
17+
// Character device for buse device %d and read queue %d.
18+
buseReadPathFmt = "/dev/buse%d-r%d"
19+
20+
// Character device for buse device %d and write queue %d.
21+
buseWritePathFmt = "/dev/buse%d-w%d"
22+
23+
// Path to the configfs directory.
24+
configFsPath = "/sys/kernel/config/buse"
25+
26+
// Size of write request in write queue.
27+
writeRequestSize = 16
28+
29+
// Size of read request in read queue.
30+
readRequestSize = 24
31+
)
32+
33+
// Provides functions which are called by buse as a reaction to the received
34+
// command.
35+
type BuseReadWriter interface {
36+
// Read extent starting at sector with length lenth. Data should be
37+
// read to chunk which has appropriate size. Called as a reaction to
38+
// the read command in the read queue.
39+
BuseRead(sector, length int64, chunk []byte) error
40+
41+
// Write batched writes stored in chunk. writes is the number of
42+
// individual writes in the metadata part of the chunk. Called as a
43+
// reaction to the batched writes command in the write queue.
44+
BuseWrite(writes int64, chunk []byte) error
45+
46+
// Called just before the block device is started.
47+
BusePreRun()
48+
49+
// Called after the block device is removed.
50+
BusePostRemove()
51+
}
52+
53+
// Options for created buse device.
54+
type Options struct {
55+
Durable bool
56+
WriteChunkSize int64
57+
BlockSize int64
58+
Threads int
59+
Major int64
60+
WriteShmSize int64
61+
ReadShmSize int64
62+
Size int64
63+
CollisionArea int64
64+
QueueDepth int64
65+
Scheduler bool
66+
}
67+
68+
// Buse is a library wrapping the low level interaction with buse kernel module
69+
// and provides simple API to for creating a block device in user space.
70+
type Buse struct {
71+
ReadWriter BuseReadWriter
72+
Options Options
73+
}
74+
75+
// Returns new instance of Buse configured with options o.
76+
func New(rw BuseReadWriter, o Options) (Buse, error) {
77+
buse := Buse{
78+
ReadWriter: rw,
79+
Options: o,
80+
}
81+
82+
err := buse.checkOptions()
83+
if err != nil {
84+
return Buse{}, err
85+
}
86+
87+
err = buse.configure()
88+
if err != nil {
89+
return Buse{}, err
90+
}
91+
92+
return buse, nil
93+
}
94+
95+
// Returns total memory presented to the system.
96+
func totalMemory() (uint64, error) {
97+
sysInfo := &syscall.Sysinfo_t{}
98+
99+
if err := syscall.Sysinfo(sysInfo); err != nil {
100+
return 0, err
101+
}
102+
103+
// On 32-bit architectures the result is uint, hence we need to type it
104+
// to uint64 to conform with function signature.
105+
totalMemory := uint64(sysInfo.Totalram) * uint64(sysInfo.Unit)
106+
107+
return totalMemory, nil
108+
}
109+
110+
// Validates passed options.
111+
func (b *Buse) checkOptions() error {
112+
o := &b.Options
113+
114+
if o.Threads == 0 || o.Threads > runtime.NumCPU() {
115+
o.Threads = runtime.NumCPU()
116+
}
117+
118+
totalMem, err := totalMemory()
119+
if err != nil {
120+
return errors.New("Cannot read total amount of ram!")
121+
}
122+
123+
neededMemory := uint64(o.Threads) * uint64(o.WriteShmSize+o.ReadShmSize)
124+
if neededMemory > totalMem {
125+
return errors.New("Not enough memory!")
126+
}
127+
128+
if o.WriteShmSize%o.WriteChunkSize != 0 {
129+
return errors.New("Write buffer size has to be a multiple of chunk size!")
130+
}
131+
132+
if o.BlockSize != 512 && o.BlockSize != 4096 {
133+
return errors.New("Block size has to 512 or 4096!")
134+
}
135+
136+
return nil
137+
}
138+
139+
// Performs configuration of the block device which is just being created. It
140+
// configures buse device via configs according to the options passed to the
141+
// New() function. When configuration succeed the device is power on.
142+
func (b *Buse) configure() error {
143+
var noScheduler int64
144+
if !b.Options.Scheduler {
145+
noScheduler = 1
146+
}
147+
148+
configFsPath := fmt.Sprint(configFsPath, "/", b.Options.Major)
149+
if _, err := os.Stat(configFsPath); !os.IsNotExist(err) {
150+
return errors.New(fmt.Sprintf("Device buse%d already exists!", b.Options.Major))
151+
}
152+
153+
if err := os.Mkdir(configFsPath, 0755); err != nil {
154+
return err
155+
}
156+
157+
kernelParams := map[string]int64{
158+
"size": b.Options.Size,
159+
"collision_area_size": int64(b.Options.CollisionArea),
160+
"read_shm_size": int64(b.Options.ReadShmSize),
161+
"write_shm_size": int64(b.Options.WriteShmSize),
162+
"write_chunk_size": int64(b.Options.WriteChunkSize),
163+
"hw_queues": int64(b.Options.Threads),
164+
"blocksize": int64(b.Options.BlockSize),
165+
"queue_depth": int64(b.Options.QueueDepth),
166+
"no_scheduler": noScheduler,
167+
}
168+
169+
for variable, value := range kernelParams {
170+
if err := b.setConfig(variable, value); err != nil {
171+
return err
172+
}
173+
}
174+
175+
if err := b.setConfig("power", 1); err != nil {
176+
return err
177+
}
178+
179+
return nil
180+
}
181+
182+
// Opens control file and mmap it. Returns file and mmapped memory.
183+
func openAndMmapControlFile(chardev string, shm_size int) (*os.File, []byte, error) {
184+
f, err := os.OpenFile(chardev, os.O_RDWR, 0644)
185+
if err != nil {
186+
return nil, nil, err
187+
}
188+
189+
shmem, err := syscall.Mmap(int(f.Fd()), 0, shm_size,
190+
syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
191+
if err != nil {
192+
f.Close()
193+
return nil, nil, err
194+
}
195+
196+
return f, shmem, err
197+
}
198+
199+
// Parses request reading from write queue character device.
200+
func (b *Buse) parseWriteRequest(request []byte) ([]byte, uint64, uint64) {
201+
raw := make([]byte, 8)
202+
copy(raw, request[:8])
203+
offset := binary.LittleEndian.Uint64(raw)
204+
writesLen := binary.LittleEndian.Uint64(request[8:16])
205+
206+
return raw, offset, writesLen
207+
}
208+
209+
// Parses request reading from read queue character device.
210+
func (b *Buse) parseReadRequest(request []byte) ([]byte, uint64, uint64, uint64) {
211+
raw := make([]byte, 8)
212+
copy(raw, request[16:24])
213+
offset := binary.LittleEndian.Uint64(raw)
214+
215+
sector := binary.LittleEndian.Uint64(request[:8]) * 512 / uint64(b.Options.BlockSize)
216+
length := binary.LittleEndian.Uint64(request[8:16]) * 512 / uint64(b.Options.BlockSize)
217+
218+
return raw, offset, sector, length
219+
}
220+
221+
// True if the request means termination of the device.
222+
func isTermination(offset uint64) bool {
223+
return offset == ^uint64(0)
224+
}
225+
226+
// True if the request is flush.
227+
func isFlush(offset uint64) bool {
228+
return offset > (1 << 32)
229+
}
230+
231+
// Infinite loop reading from write queue character device and calling
232+
// BuseWrite() callback provided by calling application. When the BuseWrite()
233+
// returns then the batched write is confirmed to the kernel leading to the
234+
// recycling of the buffer in shared memory.
235+
func (b *Buse) writer(chardev string, wgFunc *sync.WaitGroup, shm_size int) {
236+
defer wgFunc.Done()
237+
238+
controlFile, shmem, err := openAndMmapControlFile(chardev, shm_size)
239+
if err != nil {
240+
panic(err)
241+
}
242+
defer controlFile.Close()
243+
defer syscall.Munmap(shmem)
244+
245+
requestBuffer := make([]byte, writeRequestSize)
246+
wg := sync.WaitGroup{}
247+
for {
248+
_, err := controlFile.Read(requestBuffer)
249+
if err != nil {
250+
continue
251+
}
252+
253+
offsetRaw, offset, writesLen := b.parseWriteRequest(requestBuffer)
254+
255+
if isTermination(offset) {
256+
wg.Wait()
257+
return
258+
}
259+
260+
if isFlush(offset) {
261+
if b.Options.Durable {
262+
wg.Wait()
263+
}
264+
controlFile.Write(offsetRaw)
265+
continue
266+
}
267+
268+
dataRegion := shmem[offset : offset+uint64(b.Options.WriteChunkSize)]
269+
wg.Add(1)
270+
go func() {
271+
defer wg.Done()
272+
273+
err := b.ReadWriter.BuseWrite(int64(writesLen), dataRegion)
274+
if err != nil {
275+
fmt.Fprintf(os.Stderr, "Chunk write (%d writes) failed!\n", writesLen)
276+
fmt.Fprint(os.Stderr, err)
277+
}
278+
279+
n, err := controlFile.Write(offsetRaw)
280+
if err != nil {
281+
fmt.Fprint(os.Stderr, "Read ack error, n =", n, "err=", err.Error())
282+
fmt.Fprint(os.Stderr, err)
283+
}
284+
}()
285+
}
286+
}
287+
288+
// Infinite loop reading from read queue character device and calling
289+
// BuseRead() callback provided by calling application. When the BuseRead()
290+
// returns then the read request is acknowledged to the kernel.
291+
func (b *Buse) reader(chardev string, wgFunc *sync.WaitGroup, shm_size int) {
292+
defer wgFunc.Done()
293+
294+
controlFile, shmem, err := openAndMmapControlFile(chardev, shm_size)
295+
if err != nil {
296+
panic(err)
297+
}
298+
defer controlFile.Close()
299+
defer syscall.Munmap(shmem)
300+
301+
requestBuffer := make([]byte, readRequestSize)
302+
var wg sync.WaitGroup
303+
for {
304+
_, err := controlFile.Read(requestBuffer)
305+
if err != nil {
306+
continue
307+
}
308+
309+
offsetRaw, offset, sector, length := b.parseReadRequest(requestBuffer)
310+
311+
if isTermination(offset) {
312+
wg.Wait()
313+
return
314+
}
315+
316+
size := int64(length) * b.Options.BlockSize
317+
dataRegion := shmem[int64(offset) : int64(offset)+size]
318+
319+
wg.Add(1)
320+
go func() {
321+
defer wg.Done()
322+
323+
err := b.ReadWriter.BuseRead(int64(sector), int64(length), dataRegion)
324+
if err != nil {
325+
fmt.Fprint(os.Stderr, err)
326+
}
327+
328+
_, err = controlFile.Write(offsetRaw)
329+
if err != nil {
330+
fmt.Fprint(os.Stderr, err)
331+
}
332+
}()
333+
}
334+
}
335+
336+
// Bind all the control queues and start processing read and write commands.
337+
// This is done via multiple readers and writers. One worker per queue.
338+
func (b *Buse) Run() {
339+
b.ReadWriter.BusePreRun()
340+
341+
var wg sync.WaitGroup
342+
wg.Add(int(b.Options.Threads) * 2)
343+
for i := 0; i < int(b.Options.Threads); i++ {
344+
w := fmt.Sprintf(buseWritePathFmt, b.Options.Major, i)
345+
r := fmt.Sprintf(buseReadPathFmt, b.Options.Major, i)
346+
347+
go b.writer(w, &wg, int(b.Options.WriteShmSize))
348+
go b.reader(r, &wg, int(b.Options.ReadShmSize))
349+
}
350+
wg.Wait()
351+
}
352+
353+
// Write value to configfs variable.
354+
func (b *Buse) setConfig(variable string, value int64) error {
355+
configFsPath := fmt.Sprint(configFsPath, "/", b.Options.Major, "/", variable)
356+
byteValue := []byte(fmt.Sprint(value))
357+
358+
err := ioutil.WriteFile(configFsPath, byteValue, 0644)
359+
360+
return err
361+
}
362+
363+
// Stop buse device. All requests are refused but the device is still visible
364+
// and can be started again.
365+
func (b *Buse) StopDevice() error {
366+
err := b.setConfig("power", 0)
367+
return err
368+
}
369+
370+
// Remove the device. The device is unregistered as block device.
371+
func (b *Buse) RemoveDevice() error {
372+
err := syscall.Rmdir(fmt.Sprint(configFsPath, "/", b.Options.Major))
373+
b.ReadWriter.BusePostRemove()
374+
return err
375+
}

lib/go/buse/go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
module github.com/asch/buse/lib/go/buse
2+
3+
go 1.16

0 commit comments

Comments
 (0)