Skip to content

Commit

Permalink
Changed BufferedInputStream from a protocol to a class; removed a bun…
Browse files Browse the repository at this point in the history
…ch of redundant code; changed the InputStream interface to work more like the read(2) system call; fixed a bug with restoring large files when network errors occur.
  • Loading branch information
Stefan Reitshamer authored and Stefan Reitshamer committed Aug 20, 2010
1 parent 5b990ad commit 846d391
Show file tree
Hide file tree
Showing 93 changed files with 1,394 additions and 1,070 deletions.
8 changes: 6 additions & 2 deletions ArqRepo.m
Expand Up @@ -82,7 +82,9 @@ - (Commit *)commitForSHA1:(NSString *)theSHA1 error:(NSError **)error {
return nil;
}
DataInputStream *dis = [[DataInputStream alloc] initWithData:data];
Commit *commit = [[[Commit alloc] initWithBufferedInputStream:dis error:error] autorelease];
BufferedInputStream *bis = [[BufferedInputStream alloc] initWithUnderlyingStream:dis];
Commit *commit = [[[Commit alloc] initWithBufferedInputStream:bis error:error] autorelease];
[bis release];
[dis release];
return commit;
}
Expand All @@ -109,7 +111,9 @@ - (Tree *)treeForSHA1:(NSString *)theSHA1 error:(NSError **)error {
return nil;
}
DataInputStream *dis = [[DataInputStream alloc] initWithData:data];
Tree *tree = [[[Tree alloc] initWithBufferedInputStream:dis error:error] autorelease];
BufferedInputStream *bis = [[BufferedInputStream alloc] initWithUnderlyingStream:dis];
Tree *tree = [[[Tree alloc] initWithBufferedInputStream:bis error:error] autorelease];
[bis release];
[dis release];
return tree;
}
Expand Down
2 changes: 1 addition & 1 deletion Commit.h
Expand Up @@ -48,7 +48,7 @@
NSDate *_creationDate;
NSArray *_commitFailedFiles;
}
- (id)initWithBufferedInputStream:(id <BufferedInputStream>)is error:(NSError **)error;
- (id)initWithBufferedInputStream:(BufferedInputStream *)is error:(NSError **)error;

@property(readonly,copy) NSString *author;
@property(readonly,copy) NSString *comment;
Expand Down
13 changes: 7 additions & 6 deletions Commit.m
Expand Up @@ -35,6 +35,7 @@
#import "StringIO.h"
#import "Commit.h"
#import "DataInputStream.h"
#import "BufferedInputStream.h"
#import "RegexKitLite.h"
#import "SetNSError.h"
#import "NSErrorCodes.h"
Expand All @@ -43,7 +44,7 @@
#define HEADER_LENGTH (10)

@interface Commit (internal)
- (BOOL)readHeader:(id <BufferedInputStream>)is error:(NSError **)error;
- (BOOL)readHeader:(BufferedInputStream *)is error:(NSError **)error;
@end

@implementation Commit
Expand All @@ -57,7 +58,7 @@ @implementation Commit
creationDate = _creationDate,
commitFailedFiles = _commitFailedFiles;

- (id)initWithBufferedInputStream:(id <BufferedInputStream>)is error:(NSError **)error {
- (id)initWithBufferedInputStream:(BufferedInputStream *)is error:(NSError **)error {
if (self = [super init]) {
_parentCommitSHA1s = [[NSMutableSet alloc] init];
if (![self readHeader:is error:error]) {
Expand Down Expand Up @@ -158,12 +159,12 @@ - (void)dealloc {
@end

@implementation Commit (internal)
- (BOOL)readHeader:(id <BufferedInputStream>)is error:(NSError **)error {
unsigned char *headerBytes = [is readExactly:HEADER_LENGTH error:error];
if (headerBytes == NULL) {
- (BOOL)readHeader:(BufferedInputStream *)is error:(NSError **)error {
NSData *headerData = [is readExactly:HEADER_LENGTH error:error];
if (headerData == nil) {
return NO;
}
NSString *header = [[[NSString alloc] initWithBytes:headerBytes length:HEADER_LENGTH encoding:NSASCIIStringEncoding] autorelease];
NSString *header = [[[NSString alloc] initWithData:headerData encoding:NSUTF8StringEncoding] autorelease];
NSRange versionRange = [header rangeOfRegex:@"^CommitV(\\d{3})$" capture:1];
commitVersion = 0;
if (versionRange.location != NSNotFound) {
Expand Down
4 changes: 2 additions & 2 deletions CommitFailedFile.h
Expand Up @@ -31,13 +31,13 @@
*/

#import <Cocoa/Cocoa.h>
@protocol BufferedInputStream;
@class BufferedInputStream;

@interface CommitFailedFile : NSObject {
NSString *relativePath;
NSString *errorMessage;
}
- (id)initWithInputStream:(id <BufferedInputStream>)is error:(NSError **)error;
- (id)initWithInputStream:(BufferedInputStream *)is error:(NSError **)error;
- (NSString *)relativePath;
- (NSString *)errorMessage;
- (void)writeTo:(NSMutableData *)data;
Expand Down
2 changes: 1 addition & 1 deletion CommitFailedFile.m
Expand Up @@ -35,7 +35,7 @@
#import "BufferedInputStream.h"

@implementation CommitFailedFile
- (id)initWithInputStream:(id <BufferedInputStream>)is error:(NSError **)error {
- (id)initWithInputStream:(BufferedInputStream *)is error:(NSError **)error {
if (self = [super init]) {
if (![StringIO read:&relativePath from:is error:error]
|| ![StringIO read:&errorMessage from:is error:error]) {
Expand Down
22 changes: 13 additions & 9 deletions DiskPack.m
Expand Up @@ -34,6 +34,7 @@
#import "DiskPack.h"
#import "SetNSError.h"
#import "FDInputStream.h"
#import "BufferedInputStream.h"
#import "StringIO.h"
#import "IntegerIO.h"
#import "ServerBlob.h"
Expand All @@ -51,7 +52,7 @@

@interface DiskPack (internal)
- (BOOL)savePack:(ServerBlob *)sb bytesWritten:(unsigned long long *)written error:(NSError **)error;
- (NSArray *)sortedPackIndexEntriesFromStream:(id <BufferedInputStream>)fis error:(NSError **)error;
- (NSArray *)sortedPackIndexEntriesFromStream:(BufferedInputStream *)fis error:(NSError **)error;
@end

@implementation DiskPack
Expand Down Expand Up @@ -113,33 +114,34 @@ - (ServerBlob *)newServerBlobForObjectAtOffset:(unsigned long long)offset error:
}
ServerBlob *ret = nil;
FDInputStream *fdis = [[FDInputStream alloc] initWithFD:fd];
BufferedInputStream *bis = [[BufferedInputStream alloc] initWithUnderlyingStream:fdis];
do {
if (lseek(fd, offset, SEEK_SET) == -1) {
SETNSERROR(@"UnixErrorDomain", errno, @"lseek(%@, %qu): %s", localPath, offset, strerror(errno));
break;
}
NSString *mimeType;
NSString *downloadName;
if (![StringIO read:&mimeType from:fdis error:error] || ![StringIO read:&downloadName from:fdis error:error]) {
if (![StringIO read:&mimeType from:bis error:error] || ![StringIO read:&downloadName from:bis error:error]) {
break;
}
uint64_t dataLen = 0;
if (![IntegerIO readUInt64:&dataLen from:fdis error:error]) {
if (![IntegerIO readUInt64:&dataLen from:bis error:error]) {
break;
}
NSData *data = nil;
if (dataLen > 0) {
const unsigned char *bytes = [fdis readExactly:dataLen error:error];
if (bytes == NULL) {
data = [bis readExactly:dataLen error:error];
if (data == nil) {
break;
}
data = [NSData dataWithBytes:bytes length:dataLen];
} else {
data = [NSData data];
}
ret = [[ServerBlob alloc] initWithData:data mimeType:mimeType downloadName:downloadName];
} while (0);
close(fd);
[bis release];
[fdis release];
return ret;
}
Expand All @@ -158,7 +160,9 @@ - (NSArray *)sortedPackIndexEntries:(NSError **)error {
return NO;
}
FileInputStream *fis = [[FileInputStream alloc] initWithPath:localPath offset:0 length:length];
NSArray *ret = [self sortedPackIndexEntriesFromStream:fis error:error];
BufferedInputStream *bis = [[BufferedInputStream alloc] initWithUnderlyingStream:fis];
NSArray *ret = [self sortedPackIndexEntriesFromStream:bis error:error];
[bis release];
[fis release];
return ret;
}
Expand All @@ -183,7 +187,7 @@ - (BOOL)savePack:(ServerBlob *)sb bytesWritten:(unsigned long long *)written err
[is release];
return ret;
}
- (NSArray *)sortedPackIndexEntriesFromStream:(id <BufferedInputStream>)is error:(NSError **)error {
- (NSArray *)sortedPackIndexEntriesFromStream:(BufferedInputStream *)is error:(NSError **)error {
uint32_t packSig;
uint32_t packVersion;
if (![IntegerIO readUInt32:&packSig from:is error:error] || ![IntegerIO readUInt32:&packVersion from:is error:error]) {
Expand All @@ -210,7 +214,7 @@ - (NSArray *)sortedPackIndexEntriesFromStream:(id <BufferedInputStream>)is error
if (![StringIO read:&mimeType from:is error:error] || ![StringIO read:&name from:is error:error] || ![IntegerIO readUInt64:&length from:is error:error]) {
return NO;
}
NSString *objectSHA1 = [SHA1Hash hashStream:is withlength:length error:error];
NSString *objectSHA1 = [SHA1Hash hashStream:is withLength:length error:error];
if (objectSHA1 == nil) {
return NO;
}
Expand Down
2 changes: 1 addition & 1 deletion Node.m
Expand Up @@ -42,7 +42,7 @@ @implementation Node
@synthesize ctime_sec, ctime_nsec, createTime_sec, createTime_nsec, st_nlink, st_ino;
@dynamic treeSHA1, dataSHA1s;

- (id)initWithInputStream:(id <BufferedInputStream>)is treeVersion:(int)theTreeVersion error:(NSError **)error {
- (id)initWithInputStream:(BufferedInputStream *)is treeVersion:(int)theTreeVersion error:(NSError **)error {
if (self = [super init]) {
treeVersion = theTreeVersion;
dataSHA1s = [[NSMutableArray alloc] init];
Expand Down
1 change: 1 addition & 0 deletions Restorer.h
Expand Up @@ -42,6 +42,7 @@
NSString *rootPath;
NSMutableArray *restoreNodes;
NSMutableDictionary *hardlinks;
unsigned long long writtenToCurrentFile;
}
- (id)initWithS3Service:(S3Service *)theS3 s3BucketName:(NSString *)theS3BucketName computerUUID:(NSString *)theComputerUUID bucketUUID:(NSString *)theBucketUUID bucketName:(NSString *)theBucketName encryptionKey:(NSString *)theEncryptionKey;
- (BOOL)restore:(NSError **)error;
Expand Down
108 changes: 68 additions & 40 deletions Restorer.m
Expand Up @@ -47,14 +47,19 @@
#import "NSFileManager_extra.h"
#import "CFStreamPair.h"
#import "NSErrorCodes.h"
#import "BufferedInputStream.h"
#import "StreamPairFactory.h"

#define MAX_RETRIES (10)
#define MY_BUF_SIZE (8192)

@interface Restorer (internal)
- (BOOL)addRestoreNodesForTreeSHA1:(NSString *)treeSHA1 relativePath:(NSString *)relativePath error:(NSError **)error;
- (BOOL)restoreRestoreNode:(RestoreNode *)rn error:(NSError **)error;
- (BOOL)createFile:(Node *)node atPath:(NSString *)path error:(NSError **)error;
- (BOOL)createFileAtPath:(NSString *)path fromSHA1s:(NSArray *)dataSHA1s error:(NSError **)error;
- (BOOL)createFileOnceAtPath:(NSString *)path fromSHA1s:(NSArray *)dataSHA1s error:(NSError **)error;
- (BOOL)appendBlobForSHA1:(NSString *)sha1 toFile:(FileOutputStream *)fos error:(NSError **)error;
- (BOOL)doAppendBlobForSHA1:(NSString *)sha1 toFile:(FileOutputStream *)fos error:(NSError **)error;
- (BOOL)applyTree:(Tree *)tree toPath:(NSString *)restorePath error:(NSError **)error;
- (BOOL)applyNode:(Node *)node toPath:(NSString *)restorePath error:(NSError **)error;
- (BOOL)applyACLSHA1:(NSString *)aclSHA1 toFileAttributes:(FileAttributes *)fa error:(NSError **)error;
Expand Down Expand Up @@ -354,26 +359,6 @@ - (BOOL)createFile:(Node *)node atPath:(NSString *)path error:(NSError **)error
return YES;
}
- (BOOL)createFileAtPath:(NSString *)path fromSHA1s:(NSArray *)dataSHA1s error:(NSError **)error {
BOOL ret = YES;
for (;;) {
NSError *myError = nil;
if (![self createFileOnceAtPath:path fromSHA1s:dataSHA1s error:&myError]) {
if ([[myError domain] isEqualToString:[CFStreamPair errorDomain]]) {
HSLogDebug(@"network error restoring %@ (retrying): %@", path, [myError localizedDescription]);
} else {
if (error != NULL) {
*error = myError;
}
ret = NO;
break;
}
} else {
break;
}
}
return ret;
}
- (BOOL)createFileOnceAtPath:(NSString *)path fromSHA1s:(NSArray *)dataSHA1s error:(NSError **)error {
FileOutputStream *fos = [[FileOutputStream alloc] initWithPath:path append:NO];
BOOL ret = YES;
for (NSString *sha1 in dataSHA1s) {
Expand All @@ -386,35 +371,76 @@ - (BOOL)createFileOnceAtPath:(NSString *)path fromSHA1s:(NSArray *)dataSHA1s err
return ret;
}
- (BOOL)appendBlobForSHA1:(NSString *)sha1 toFile:(FileOutputStream *)fos error:(NSError **)error {
ServerBlob *dataBlob = [repo newServerBlobForSHA1:sha1 error:error];
if (dataBlob == nil) {
HSLogError(@"error getting server blob for %@", sha1);
int i = 0;
BOOL ret = NO;
NSError *myError = nil;
NSAutoreleasePool *pool = nil;
for (;;) {
[pool drain];
pool = [[NSAutoreleasePool alloc] init];
if ([self doAppendBlobForSHA1:sha1 toFile:fos error:&myError]) {
ret = YES;
break;
}
[[StreamPairFactory theFactory] clear];
BOOL isNetworkError = [[myError domain] isEqualToString:[CFStreamPair errorDomain]];
// Retry indefinitely on network errors:
if (!isNetworkError && (++i >= MAX_RETRIES)) {
HSLogError(@"failed to get blob %@ after %d retries: %@", sha1, i, [myError localizedDescription]);
break;
}
HSLogWarn(@"error appending blob %@ to file %@ (retrying): %@", sha1, [fos path], [myError localizedDescription]);
// Seek back to the starting offset for this blob:
if (![fos seekTo:writtenToCurrentFile error:&myError]) {
ret = NO;
break;
}
}
[myError retain];
[pool drain];
[myError autorelease];
if (error != NULL) {
*error = myError;
}
return ret;
}
- (BOOL)doAppendBlobForSHA1:(NSString *)sha1 toFile:(FileOutputStream *)fos error:(NSError **)error {
if (error != NULL) {
*error = nil;
}
ServerBlob *sb = [[repo newServerBlobForSHA1:sha1 error:error] autorelease];
if (sb == nil) {
return NO;
}
id <InputStream> is = [dataBlob newInputStream];
[dataBlob release];
id <InputStream> is = [[sb newInputStream] autorelease];
BOOL ret = YES;
NSAutoreleasePool *pool = nil;
unsigned char *buf = (unsigned char *)malloc(MY_BUF_SIZE);
for (;;) {
NSUInteger received = 0;
NSError *myError = nil;
unsigned char *buf = [is read:&received error:&myError];
if (buf == nil) {
if ([myError code] != ERROR_EOF) {
ret = NO;
HSLogError(@"error reading from stream for blob %@: %@", sha1, [myError localizedDescription]);
if (error != NULL) {
*error = myError;
}
}
[pool drain];
pool = [[NSAutoreleasePool alloc] init];
NSUInteger received = [is read:buf bufferLength:MY_BUF_SIZE error:error];
if (received < 0) {
ret = NO;
break;
}
if (received == 0) {
break;
}
if (![fos write:buf length:received error:error]) {
ret = NO;
break;
}
[NSThread sleepForTimeInterval:0.01];
writtenToCurrentFile += received;
}
free(buf);
if (error != NULL) {
[*error retain];
}
[pool drain];
if (error != NULL) {
[*error autorelease];
}
[is release];
return ret;
}
- (BOOL)createSymLink:(Node *)node path:(NSString *)symLinkFile target:(NSString *)target error:(NSError **)error {
Expand Down Expand Up @@ -450,7 +476,9 @@ - (BOOL)applyXAttrsSHA1:(NSString *)xattrsSHA1 toFile:(NSString *)path error:(NS
return NO;
}
DataInputStream *is = [xattrsData newInputStream];
XAttrSet *set = [[[XAttrSet alloc] initWithBufferedInputStream:is error:error] autorelease];
BufferedInputStream *bis = [[BufferedInputStream alloc] initWithUnderlyingStream:is];
XAttrSet *set = [[[XAttrSet alloc] initWithBufferedInputStream:bis error:error] autorelease];
[bis release];
[is release];
if (!set) {
return NO;
Expand Down
4 changes: 2 additions & 2 deletions Tree.h
Expand Up @@ -32,7 +32,7 @@

#import <Cocoa/Cocoa.h>
#import "Blob.h"
@protocol BufferedInputStream;
@class BufferedInputStream;
@class Node;

#define CURRENT_TREE_VERSION 10
Expand Down Expand Up @@ -64,7 +64,7 @@
NSMutableDictionary *nodes;
}
+ (NSString *)errorDomain;
- (id)initWithBufferedInputStream:(id <BufferedInputStream>)is error:(NSError **)error;
- (id)initWithBufferedInputStream:(BufferedInputStream *)is error:(NSError **)error;
- (NSArray *)childNodeNames;
- (Node *)childNodeWithName:(NSString *)name;
- (BOOL)containsNodeNamed:(NSString *)name;
Expand Down

0 comments on commit 846d391

Please sign in to comment.