-
Notifications
You must be signed in to change notification settings - Fork 635
/
UnbufferedFileStream.cs
288 lines (254 loc) · 8.7 KB
/
UnbufferedFileStream.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
using System;
using System.IO;
using System.Runtime.InteropServices;
using EventStore.Common.Utils;
using Microsoft.Win32.SafeHandles;
namespace EventStore.Core.TransactionLog.Unbuffered {
//NOTE THIS DOES NOT SUPPORT ALL STREAM OPERATIONS AS YOU MIGHT EXPECT IT SUPPORTS WHAT WE USE!
public unsafe class UnbufferedFileStream : Stream {
private static readonly INativeFile NativeFile;
private byte* _writeBuffer;
private byte* _readBuffer;
private readonly int _writeBufferSize;
private readonly int _readBufferSize;
private readonly IntPtr _writeBufferOriginal;
private readonly IntPtr _readBufferOriginal;
private readonly uint _blockSize;
private long _bufferedCount;
private bool _aligned;
private long _lastPosition;
private bool _needsFlush;
private SafeFileHandle _handle;
private long _readLocation = -1;
private bool _needsRead;
static UnbufferedFileStream() {
if (Runtime.IsWindows) {
NativeFile = new NativeFileWindows();
} else {
NativeFile = new NativeFileUnix();
}
}
private UnbufferedFileStream(SafeFileHandle handle, uint blockSize, int internalWriteBufferSize,
int internalReadBufferSize) {
_handle = handle;
_readBufferSize = internalReadBufferSize;
_writeBufferSize = internalWriteBufferSize;
_writeBufferOriginal = Marshal.AllocHGlobal((int)(internalWriteBufferSize + blockSize));
_readBufferOriginal = Marshal.AllocHGlobal((int)(internalReadBufferSize + blockSize));
_readBuffer = Align(_readBufferOriginal, blockSize);
_writeBuffer = Align(_writeBufferOriginal, blockSize);
_blockSize = blockSize;
}
private byte* Align(IntPtr buf, uint alignTo) {
//This makes an aligned buffer linux needs this.
//The buffer must originally be at least one alignment bigger!
var diff = alignTo - (buf.ToInt64() % alignTo);
var aligned = (IntPtr)(buf.ToInt64() + diff);
return (byte*)aligned;
}
public static UnbufferedFileStream Create(string path,
FileMode mode,
FileAccess acc,
FileShare share,
bool sequential,
int internalWriteBufferSize,
int internalReadBufferSize,
bool writeThrough,
uint minBlockSize) {
var blockSize = NativeFile.GetDriveSectorSize(path);
blockSize = blockSize > minBlockSize ? blockSize : minBlockSize;
if (internalWriteBufferSize % blockSize != 0)
throw new Exception("write buffer size must be aligned to block size of " + blockSize + " bytes");
if (internalReadBufferSize % blockSize != 0)
throw new Exception("read buffer size must be aligned to block size of " + blockSize + " bytes");
var handle = NativeFile.CreateUnbufferedRW(path, acc, share, mode, writeThrough);
return new UnbufferedFileStream(handle, blockSize, internalWriteBufferSize, internalReadBufferSize);
}
public override void Flush() {
CheckDisposed();
if (!_needsFlush) return;
var alignedbuffer = (int)GetLowestAlignment(_bufferedCount);
var positionAligned = GetLowestAlignment(_lastPosition);
if (!_aligned) {
SeekInternal(positionAligned, SeekOrigin.Begin);
}
if (_bufferedCount == alignedbuffer) {
InternalWrite(_writeBuffer, (uint)_bufferedCount);
_lastPosition = positionAligned + _bufferedCount;
_bufferedCount = 0;
_aligned = true;
} else {
var left = _bufferedCount - alignedbuffer;
InternalWrite(_writeBuffer, (uint)(alignedbuffer + _blockSize));
_lastPosition = positionAligned + alignedbuffer;
SetBuffer(alignedbuffer, left);
_bufferedCount = left;
_aligned = false;
}
_needsFlush = false;
}
private static void MemCopy(byte[] src, long srcOffset, byte* dest, long destOffset, long count) {
fixed (byte* p = src) {
MemCopy(p, srcOffset, dest, destOffset, count);
}
}
private static void MemCopy(byte* src, long srcOffset, byte[] dest, long destOffset, long count) {
fixed (byte* p = dest) {
MemCopy(src, srcOffset, p, destOffset, count);
}
}
private static void MemCopy(byte* src, long srcOffset, byte* dest, long destOffset, long count) {
byte* psrc = src + srcOffset;
byte* pdest = dest + destOffset;
for (var i = 0; i < count; i++) {
*pdest = *psrc;
pdest++;
psrc++;
}
}
private void SeekInternal(long positionAligned, SeekOrigin origin) {
NativeFile.Seek(_handle, positionAligned, origin);
}
private void InternalWrite(byte* buffer, uint count) {
var written = 0;
NativeFile.Write(_handle, buffer, count, ref written);
}
public override long Seek(long offset, SeekOrigin origin) {
long mungedOffset = offset;
CheckDisposed();
if (origin == SeekOrigin.Current) throw new NotImplementedException("only supports seek origin begin/end");
if (origin == SeekOrigin.End) mungedOffset = Length + offset;
var aligned = GetLowestAlignment(mungedOffset);
var left = (int)(mungedOffset - aligned);
Flush();
_bufferedCount = left;
_aligned = aligned == left;
_lastPosition = aligned;
//TODO cant do two seeks + a read here.
SeekInternal(aligned, SeekOrigin.Begin);
_needsRead = true;
return offset;
}
private long GetLowestAlignment(long offset) {
return offset - (offset % _blockSize);
}
public override void SetLength(long value) {
CheckDisposed();
var aligned = GetLowestAlignment(value);
aligned = aligned == value ? aligned : aligned + _blockSize;
NativeFile.SetFileSize(_handle, aligned);
if(Position > aligned)
Seek(aligned, SeekOrigin.Begin);
}
public override int Read(byte[] buffer, int offset, int count) {
CheckDisposed();
if (offset < 0 || buffer.Length < offset) throw new ArgumentException("offset");
if (count < 0 || buffer.Length < count) throw new ArgumentException("offset");
if (offset + count > buffer.Length)
throw new ArgumentException("offset + count must be less than size of array");
var position = GetLowestAlignment(Position);
var roffset = (int)(Position - position);
var bytesRead = _readBufferSize;
if (_readLocation + _readBufferSize <= position || _readLocation > position || _readLocation == -1) {
SeekInternal(position, SeekOrigin.Begin);
bytesRead = NativeFile.Read(_handle, _readBuffer, 0, _readBufferSize);
_readLocation = position;
} else if (_readLocation != position) {
roffset += (int)(position - _readLocation);
}
var bytesAvailable = bytesRead - roffset;
if (bytesAvailable <= 0) return 0;
var toCopy = count > bytesAvailable ? bytesAvailable : count;
MemCopy(_readBuffer, roffset, buffer, offset, toCopy);
_bufferedCount += toCopy;
if (count - toCopy == 0) return toCopy;
return toCopy + Read(buffer, offset + toCopy, count - toCopy);
}
public override void Write(byte[] buffer, int offset, int count) {
CheckDisposed();
var done = false;
long left = count;
long current = offset;
if (_needsRead) {
SeekInternal(_lastPosition, SeekOrigin.Begin);
NativeFile.Read(_handle, _writeBuffer, 0, (int)_blockSize);
SeekInternal(_lastPosition, SeekOrigin.Begin);
_needsRead = false;
}
while (!done) {
_needsFlush = true;
if (_bufferedCount + left < _writeBufferSize) {
CopyBuffer(buffer, current, left);
done = true;
current += left;
} else {
var toFill = _writeBufferSize - _bufferedCount;
CopyBuffer(buffer, current, toFill);
Flush();
left -= toFill;
current += toFill;
done = left == 0;
}
}
}
private void CopyBuffer(byte[] buffer, long offset, long count) {
MemCopy(buffer, offset, _writeBuffer, _bufferedCount, count);
_bufferedCount += count;
}
public override bool CanRead {
get {
CheckDisposed();
return true;
}
}
public override bool CanSeek {
get {
CheckDisposed();
return true;
}
}
public override bool CanWrite {
get {
CheckDisposed();
return true;
}
}
public override long Length {
get {
CheckDisposed();
return NativeFile.GetFileSize(_handle);
}
}
public override long Position {
get {
CheckDisposed();
if (_aligned)
return _lastPosition + _bufferedCount;
return GetLowestAlignment(_lastPosition) + _bufferedCount;
}
set {
CheckDisposed();
Seek(value, SeekOrigin.Begin);
}
}
private void SetBuffer(long alignedbuffer, long left) {
MemCopy(_writeBuffer, alignedbuffer, _writeBuffer, 0, left);
}
[System.Diagnostics.Conditional("DEBUG")]
private void CheckDisposed() {
//only check in debug
if (_handle == null) throw new ObjectDisposedException("object is disposed.");
}
protected override void Dispose(bool disposing) {
if (_handle == null) return;
Flush();
_handle.Close();
_handle = null;
_readBuffer = (byte*)IntPtr.Zero;
_writeBuffer = (byte*)IntPtr.Zero;
Marshal.FreeHGlobal(_readBufferOriginal);
Marshal.FreeHGlobal(_writeBufferOriginal);
GC.SuppressFinalize(this);
}
}
}