# Distributed Runner

## Needed Operations (aka API)

1. Exists (hash)
1. Save File (src)
2. Copy File (hash, dst)
3. Save Tree (list_files)
4. Restore Tree (hash, dst)



In [38]:
from collections import namedtuple
from contextlib import contextmanager
from itertools import chain
import io
import os
import hashlib
import zlib

def read_streams_chunked(file_objects, block_size=4096):
    readers = (iter(lambda f=f: f.read(block_size), '') for f in file_objects)
    return chain.from_iterable(readers)

def obj_header(objtype, length):
    return (objtype + '\x20' + str(length) + '\x00').encode()

def write_compressed(fout, zlibobj, stream, block_size=4096, flush=True):
    while True:
        chunk = stream.read(block_size)
        if not chunk:
            break
        compressed_bytes = zlib_obj.compress(chunk)
        if compressed_bytes:
            fout.write(compressed_bytes)
    
    if flush:
        compressed_bytes = zlib_obj.flush()
        if compressed_bytes:
            fout.write(compressed_bytes)
            
def write_obj(store_path, sha, hstream, dstream):
    path = os.path.join(store_path, obj.hash[:2], obj.hash[2:])
    zlib_obj = zlib.compressobj()
    
    with open(path, 'wb') as fout:
        write_compressed(fout, zlib_obj, hstream, flush=False)
        write_compressed(fout, zlib_obj, dstream, flush=True)  
        
def compute_sha1_hash(obj):
    sha = hashlib.sha1()
    with obj.header() as hstream, obj.data() as dstream:
        while True:
            data = hstream.read(4096)
            if not data:
                break    
            sha.update(data)
            
        while True:
            data = dstream.read(4096)
            if not data:
                break
            sha.update(data)
            
    return sha.hexdigest()
        
class GitObject(object):
    def __init__(self, objtype, data_path=None, data_stream=None):
        # Can only have one of path or stream
        if data_path is not None:
            assert data_stream is None, "Cannot provide both stream and path"
        else:
            assert data_stream is not None, "Must provide stream or path"
            
        self.objtype = objtype
        self.path = data_path
        self.stream = data_stream
        
        self._hash = None
        self._len = None
    
    @property
    def hash(self):
        if self._hash is None:
            with self.header() as hstream, self.data() as dstream:
                self._hash = compute_sha1_hash(self)
        return self._hash
    
    def __len__(self):
        if self._len is None:
            with self.data() as dstream:
                dstream.seek(0, os.SEEK_END)
                self._len = dstream.tell()
        return self._len
    
    @contextmanager
    def header(self):
        header = obj_header(self.objtype, len(self))
        yield io.BytesIO(header)
    
    @contextmanager
    def data(self):
        f = None
        try:
            if self.path is not None:
                f = open(self.path, 'rb')
                yield f
            else:
                self.stream.seek(0)
                yield self.stream
        finally:
            if f is not None:
                f.close()
                
    def __repr__(self):
        with obj.header() as hstream:
            return hstream.read().decode() + ' ' + obj.hash
        
class BlobObject(GitObject):
    def __init__(self, data_path=None, data_stream=None):
        super(BlobObject, self).__init__('blob', data_path=data_path, data_stream=data_stream)
    
    

In [40]:
obj = BlobObject(data_path='C:/Users/fille/source/repos/distributed-runner/diagrams.vsdx')
print(obj)

blob 30851  ad3ce99c0a2e8e83ccac518a8eaffc205384fa0d


In [6]:
with open('C:/Users/fille/source/repos/distributed-runner/diagrams.vsdx', 'rb') as f:
    f.seek(0, os.SEEK_END)
    print(f.tell())

30851


In [12]:
"abc".encode()

b'abc'

In [1]:
from cache import BlobObject, write_obj

obj = BlobObject(data_path='C:/Users/fille/source/repos/distributed-runner/diagrams.vsdx')
print(obj)

with obj.header() as h, obj.data() as d:
    write_obj('.', obj.hash, h, d)

blob 30851  ad3ce99c0a2e8e83ccac518a8eaffc205384fa0d
RAW 11
COMPRESSED 2
RAW 4096
RAW 4096
RAW 4096
RAW 4096
RAW 4096
COMPRESSED 16580
RAW 4096
RAW 4096
RAW 2179
FLUSHED 10857


In [7]:
import io 

with open('C:/Users/fille/source/repos/distributed-runner/diagrams.vsdx', 'rb') as f:
    print("INSTANCE", isinstance(f, io.IOBase))
    print("HAS", dir(f))

INSTANCE True
HAS ['__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__lt__', '__ne__', '__new__', '__next__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '_checkClosed', '_checkReadable', '_checkSeekable', '_checkWritable', '_dealloc_warn', '_finalizing', 'close', 'closed', 'detach', 'fileno', 'flush', 'isatty', 'mode', 'name', 'peek', 'raw', 'read', 'read1', 'readable', 'readinto', 'readinto1', 'readline', 'readlines', 'seek', 'seekable', 'tell', 'truncate', 'writable', 'write', 'writelines']


In [9]:
s = io.StringIO('ABC')
print(hasattr(s, 'len'))

False


In [14]:
x = b'\x23\x00\x25\x26'
x[x.find(b'\x00')+1:]

b'%&'

In [17]:
b''.join([b'\x23', b'\x45']).decode()

'#E'

In [1]:
from cache import read_object, write_object
import io

repo = './repo'

data = io.BytesIO("SOME DATA NEW DATA".encode())

sha = write_object('./repo', 'blob', data)

In [4]:

with read_object(repo, sha) as obj:
    print('TYPE:', "'%s'" % obj.type)
    print('LENGTH:', '%d bytes' % obj.length)
    print('DATA:', "'%s'" % b''.join([chunk for chunk in obj]).decode())
        

TYPE: 'blob'
LENGTH: 18 bytes
DATA: 'SOME DATA NEW DATA'


In [3]:
from cache import read_object, write_object, write_tree
import io, time

start = time.time()

repo = './repo'
work_dir = 'C:\\Users\\fille\\source\\secure\\trunk2\\trunk\\bin'
sha = write_tree(repo, work_dir)

print("It took %s seconds" % (time.time()-start))

Total written: 619.88 MB
It took 25.54059338569641 seconds


In [6]:
start = time.time()
sha = write_tree(repo, work_dir)
print("It took %s seconds" % (time.time()-start))

It took 46.9803946018219 seconds


In [2]:
with read_object(repo, sha) as obj:
    print("TYPE: '%s'" % obj.type)
    print('LENGTH: %d bytes' % obj.length)
    print('DATA:\n%s' % ''.join([chunk.decode() for chunk in obj]))

TYPE: 'tree'
LENGTH: 32274 bytes
DATA:
'100444' '0cf47a411f8adfe4f50c04efadc3e4bbb7b6be90' '.travis.yml'
'100444' '83a68ab0941a5cf600fb14ca7ab42ce1d35e280b' 'AUTHORS'
'100444' '468cb3fbec3f7c950711d5f9274250f141f31fcc' 'CHANGES.rst'
'100444' '904f1837cd3d3d6cf8a4a36c0e415b701f799413' 'CONTRIBUTING.rst'
'100444' 'e2bb85d0ba28f7aacbc45a90158b30c37d4c922a' 'LICENSE'
'100444' '4f28c3322b60052fdbbc6a19b9a070f3f1f17dc8' 'MAINTAINERS.rst'
'100444' '5d56245509a656cddb5e16360cd2d724d1d1abb4' 'MANIFEST.in'
'100444' '3fc3fcb5f6e111881d56447854cca004cf7dbbf4' 'README.rst'
'100444' '88b873982b3c3d0babfd341d4ee15483c3a64390' 'requirements_dev.txt'
'100444' '8cc605892a11fb791be1b2b3a240640448cba14d' 'setup.cfg'
'100444' 'd627c343de8b46c417d9a827dc29964f94e473b4' 'setup.py'
'100444' '50a02b14def8cfcecff0b70cf360338be0cd4a47' 'tox.ini'
'100444' 'e81728e7dfe60a633410e52ef72d9a35ada87b78' 'doc\.fresh-start'
'100444' '6ab1795b8e9369f66ec5f89e2868915e7d6d5fa5' 'doc\builders.rst'
'100444' 'f90a26a59944681ac

In [13]:
oct(s.st_mode)[2:]

'100666'

In [15]:
int('100666', 8)

33206

In [7]:
import pyre

ModuleNotFoundError: No module named 'pyre'