Skip to content

Commit

Permalink
Started the repozitory project. We have a schema and untested code th…
Browse files Browse the repository at this point in the history
…at implements the archive(obj) method.
  • Loading branch information
hathawsh committed Jul 10, 2011
0 parents commit e74b660
Show file tree
Hide file tree
Showing 9 changed files with 581 additions and 0 deletions.
17 changes: 17 additions & 0 deletions .project
@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>repozitory</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.python.pydev.PyDevBuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.python.pydev.pythonNature</nature>
</natures>
</projectDescription>
18 changes: 18 additions & 0 deletions .pydevproject
@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?eclipse-pydev version="1.0"?>

<pydev_project>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Python for repozitory</pydev_property>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python 2.6</pydev_property>
<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
<path>/repozitory</path>
<path>/repozitory/eggs/SQLAlchemy-0.7.1-py2.7-linux-x86_64.egg</path>
<path>/repozitory/eggs/zope.dublincore-3.8.2-py2.7.egg</path>
<path>/repozitory/eggs/zope.interface-3.6.4-py2.7-linux-x86_64.egg</path>
<path>/repozitory/eggs/simplejson-2.1.6-py2.7-linux-x86_64.egg</path>
<path>/repozitory/eggs/zope.sqlalchemy-0.6.1-py2.7.egg</path>
<path>/repozitory/eggs/zope.schema-3.8.0-py2.7.egg</path>
<path>/repozitory/eggs/ZODB3-3.10.3-py2.7-linux-x86_64.egg</path>
<path>/repozitory/eggs/zope.component-3.10.0-py2.7.egg</path>
</pydev_pathproperty>
</pydev_project>
11 changes: 11 additions & 0 deletions buildout.cfg
@@ -0,0 +1,11 @@
[buildout]
develop = .
parts = test

[test]
recipe = zc.recipe.egg
eggs =
nose
coverage
repozitory
interpreter = py
1 change: 1 addition & 0 deletions repozitory/__init__.py
@@ -0,0 +1 @@
#
235 changes: 235 additions & 0 deletions repozitory/archive.py
@@ -0,0 +1,235 @@

from persistent import Persistent
from repozitory.interfaces import IAttachment
from repozitory.interfaces import IObjectContent
from repozitory.interfaces import IObjectIdentity
from repozitory.schema import ArchivedAttachment
from repozitory.schema import ArchivedBlob
from repozitory.schema import ArchivedBlobPart
from repozitory.schema import ArchivedChunk
from repozitory.schema import ArchivedClass
from repozitory.schema import ArchivedCurrent
from repozitory.schema import ArchivedObject
from repozitory.schema import ArchivedState
from repozitory.schema import Base
from sqlalchemy import func
from sqlalchemy.engine import create_engine
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm.session import sessionmaker
from zope.component import getAdapter
from zope.sqlalchemy import ZopeTransactionExtension
import hashlib

_sessions = {} # {db_string: SQLAlchemy session}


class EngineParams(object):
def __init__(self, db_string, kwargs):
self.db_string = db_string
self.kwargs = kwargs


class Archive(Persistent):
"""An object archive that uses SQLAlchemy."""

_v_session = None
chunk_size = 1048576

def __init__(self, engine_params):
self.engine_params = engine_params

@property
def session(self):
"""Get the SQLAlchemy session."""
session = self._v_session
if session is None:
params = self.engine_params
db_string = params.db_string
session = _sessions.get(db_string)
if session is None:
engine = create_engine(db_string, **params.kwargs)
session = self._create_session(engine)
self._v_session = session
return session

def _create_session(self, engine):
Base.metadata.create_all(engine)
session = scoped_session(sessionmaker(
extension=ZopeTransactionExtension()))
session.configure(bind=engine)
return session

def archive(self, obj, klass=None):
"""Add a version to the archive of an object.
The object does not need to have been in the archive
previously. The object must either implement or be adaptable
to IObjectIdentity and IObjectContent.
Returns the new version number.
"""
if IObjectIdentity.providedBy(obj):
obj_id = obj
else:
obj_id = getAdapter(obj, IObjectIdentity)
docid = obj_id.docid

if IObjectContent.providedBy(obj):
obj_content = obj
else:
obj_content = getAdapter(obj, IObjectContent)

session = self.session
prev_version = None
arc_obj = (session.query(ArchivedObject)
.filter_by(docid=docid)
.first())
if arc_obj is None:
arc_obj = ArchivedObject(
docid=docid,
created=obj_content.created,
)
session.add(arc_obj)
else:
(prev_version,) = (
session.query(func.max(ArchivedState.version_num))
.filter_by(docid=docid)
.one())

if klass is None:
klass = type(obj)
class_id = self._prepare_class_id(klass)

arc_state = ArchivedState(
docid=docid,
version_num=(prev_version or 0) + 1,
class_id=class_id,
path=obj_id.path,
modified=obj_content.modified,
title=obj_content.title,
description=obj_content.description,
attrs=obj_content.attrs,
)
session.add(arc_state)

attachments = obj_content.attachments
if attachments:
for name, value in attachments.items():
self._attach(arc_state, name, value)

arc_current = (
session.query(ArchivedCurrent)
.filter_by(docid=docid)
.first())
if arc_current is None:
arc_current = ArchivedCurrent(
docid=docid,
version_num=arc_state.version_num,
)
else:
arc_current.version_num = arc_state.version_num
session.flush()
return arc_state.version_num

def _prepare_class_id(self, klass):
"""Add a class or reuse an existing class ID."""
session = self.session
module = klass.__module__
name = klass.__name__
cls = (session.query(ArchivedClass)
.filter_by(module=module, name=name)
.first())
if cls is None:
cls = ArchivedClass(module=module, name=name)
session.add(cls)
session.flush()
return cls.class_id

def _attach(self, arc_state, name, value):
"""Add a named attachment to an object state."""
if IAttachment.providedBy(value):
content_type = value.content_type
attrs = value.attrs
f = value.file
else:
content_type = None
attrs = None
f = value
if isinstance(f, basestring):
fn = f
f = open(fn, 'rb')
try:
blob_id = self._prepare_blob_id(f)
finally:
f.close()
else:
f.seek(0)
blob_id = self._prepare_blob_id(f)

session = self.session
att = ArchivedAttachment(
docid=arc_state.docid,
version_num=arc_state.version_num,
name=name,
content_type=content_type,
blob_id=blob_id,
attrs=attrs,
)
session.add(att)

def _prepare_blob_id(self, f):
"""Upload a blob or reuse an existing blob with the same data."""
# Compute the length and hashes of the blob data.
length = 0
md5_calc = hashlib.md5()
sha256_calc = hashlib.sha256()
while True:
data = f.read(self.chunk_size)
if not data:
break
length += len(data)
md5_calc.update(data)
sha256_calc.update(data)
md5 = md5_calc.hexdigest()
sha256 = sha256_calc.hexdigest()
f.seek(0)

session = self.session
arc_blob = (
session.query(ArchivedBlob)
.filter_by(length=length, md5=md5, sha256=sha256)
.first())
if arc_blob is not None:
return arc_blob.blob_id

arc_blob = ArchivedBlob(
chunk_count=0,
length=length,
md5=md5,
sha256=sha256,
)
session.add(arc_blob)
session.flush() # Assign arc_blob.blob_id
blob_id = arc_blob.blob_id

# Upload the data.
next_chunk_num = 0
while True:
data = f.read(self.chunk_size)
if not data:
break
arc_chunk = ArchivedChunk(data=data)
del data
session.add(arc_chunk)
session.flush() # Assign arc_chunk.chunk_id
part = ArchivedBlobPart(
blob_id=blob_id,
chunk_num=next_chunk_num,
chunk_id=arc_chunk.chunk_id,
)
session.add(part)
next_chunk_num += 1

arc_blob.chunk_count = next_chunk_num
session.flush()
return arc_blob.blob_id
56 changes: 56 additions & 0 deletions repozitory/interfaces.py
@@ -0,0 +1,56 @@

from zope.dublincore.interfaces import IDCDescriptiveProperties
from zope.interface import Attribute
from zope.interface import Interface
from zope.dublincore.interfaces import IDCTimes


class IObjectIdentity(Interface):
"""The docid and path of an object for version control."""
docid = Attribute("The docid of the object as an integer.")
path = Attribute("The path of the object as a Unicode string.")


class IObjectContent(IDCDescriptiveProperties, IDCTimes):
"""The content of an object for version control.
Note that the following attributes are required, as specified by
the base interfaces:
title
description
created
modified
"""

attrs = Attribute(
"""The attributes to store as a JSON-encodable dictionary.
May be None.""")

attachments = Attribute(
"""A map of attachments to include. May be None.
Each key is a unicode string and each value is a
filename, open file object (such as a StringIO), or an
object that provides IAttachment.
""")


class IAttachment(Interface):
"""The metadata and content of a versioned attachment."""

file = Attribute(
"Either a filename or an open file containing the attachment.")

content_type = Attribute(
"Optional: A MIME type string such as 'text/plain'")

attrs = Attribute(
"Optional: attributes to store as a JSON-encodable dictionary.")


class IContainerIdentity(Interface):
"""The ID and path of a container for version control."""
container_id = Attribute("The container_id of the object as an integer.")
path = Attribute("The path of the object as a Unicode string.")
50 changes: 50 additions & 0 deletions repozitory/jsontype.py
@@ -0,0 +1,50 @@

from sqlalchemy.types import TypeDecorator
from sqlalchemy.types import UnicodeText
import simplejson as json


class JSONType(TypeDecorator):
"""Holds a JSON-serialized object.
JSONType builds upon the UnicodeText type to apply Python's
``json.dumps()`` to incoming objects, and ``json.loads()`` on
the way out, allowing any JSON compatible object to be stored as
a Unicode text field.
Note: This column type is immutable, meaning SQLAlchemy
will not notice if something changes the content of values
read from columns of this type. Values should be replaced only.
"""

impl = UnicodeText

def bind_processor(self, dialect):
impl_processor = self.impl.bind_processor(dialect)
use_impl_processor = bool(impl_processor)

def process(value):
if value is not None:
value = json.dumps(value, separators=(',', ':'))
if not isinstance(value, unicode):
value = unicode(value, 'ascii')
if use_impl_processor:
value = impl_processor(value)
return value

return process

def result_processor(self, dialect, coltype):
impl_processor = self.impl.result_processor(dialect, coltype)
use_impl_processor = bool(impl_processor)

def process(value):
if use_impl_processor:
value = impl_processor(value) # pragma: no cover
if value:
value = json.loads(value)
else:
value = None
return value

return process

0 comments on commit e74b660

Please sign in to comment.