Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Phillips committed Dec 10, 2014
2 parents 2a70644 + f88e5a7 commit d292261
Show file tree
Hide file tree
Showing 56 changed files with 1,577 additions and 559 deletions.
15 changes: 8 additions & 7 deletions perma_web/fabfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ def run(port="0.0.0.0:8000"):
except ImportError:
local("python manage.py runserver %s" % port)

def run_ssl(port="0.0.0.0:8000"):
"""
Run django test server with SSL.
"""
local("python manage.py runsslserver %s" % port)
return

def test(apps="perma mirroring"):
"""
Run perma tests. (For coverage, run `coverage report` after tests pass.)
Expand Down Expand Up @@ -139,6 +146,7 @@ def deploy_code(restart=True, branch='master'):
"""
Deploy code only. This is faster than the full deploy.
"""
run('find . -name "*.pyc" -exec rm -rf {} \;')
run("git pull origin %s" % branch)
if restart:
restart_server()
Expand Down Expand Up @@ -218,13 +226,6 @@ def shell():

### MIRRORING ###

def sync_mirror():
"""
Fetch all archived links from upstream.
"""
from mirroring.tasks import sync_mirror
sync_mirror()

def generate_keys():
"""
Generate a keypair suitable for settings.py on the main server.
Expand Down
1 change: 1 addition & 0 deletions perma_web/fixtures/users.json
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@
"model": "perma.LinkUser",
"fields": {
"is_active": true,
"is_staff": true,
"is_confirmed": true,
"last_login": "2013-07-23T18:55:41Z",
"groups": [
Expand Down
12 changes: 8 additions & 4 deletions perma_web/mirroring/management/commands/runmirror.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,14 @@ def handle(self, *args, **options):

try:
print "Creating mirror database ..."
print "(If you get stuck on this step after a previous run, try `sudo service mysql restart`)"
main_database = settings.DATABASES['default']['NAME']
mirror_database = main_database+"_mirror"
mysql_credentials = [
"-u"+settings.DATABASES['default']['USER'],
"-p" + settings.DATABASES['default']['PASSWORD'],
]
empty_tables = ['perma_linkuser','perma_link','perma_asset']
empty_tables = ['perma_linkuser','perma_link','perma_asset', 'mirroring_updatequeue']
mysqldump_command = "mysqldump %(user)s %(password)s %%(options)s %(main_database)s %%(tables)s | mysql %(user)s %(password)s %(mirror_database)s" % {
'user':mysql_credentials[0], 'password':mysql_credentials[1], 'main_database':main_database, 'mirror_database':mirror_database,
}
Expand All @@ -108,6 +109,7 @@ def handle(self, *args, **options):
print "Launching main server ..."
main_server_env = dict(
DJANGO__DOWNSTREAM_SERVERS__0__address='http://%s:%s' % (mirror_server_address, mirror_server_port),
DJANGO__DOWNSTREAM_SERVERS__0__public_key=settings.GPG_PUBLIC_KEY,
DJANGO__MIRRORING_ENABLED='True',
DJANGO__CELERY_DEFAULT_QUEUE='runmirror_main_queue',
DJANGO__DIRECT_MEDIA_URL='http://%s:%s/media/' % (main_server_media, router_port),
Expand All @@ -116,7 +118,7 @@ def handle(self, *args, **options):
running_processes.append(subprocess.Popen(['python', 'manage.py', 'runserver', str(main_server_port)],
env=dict(os.environ, **main_server_env)))
running_processes.append(subprocess.Popen(
['celery', '-A', 'perma', 'worker', '--loglevel=info', '-Q', 'runmirror_main_queue', '--hostname=runmirror_main_queue'],
['celery', '-A', 'perma', 'worker', '--loglevel=info', '--queues=runmirror_main_queue', '--hostname=runmirror_main_queue', '--beat', '--concurrency=1'],
env=dict(os.environ, **main_server_env)))

print "Launching mirror server ..."
Expand All @@ -127,14 +129,13 @@ def handle(self, *args, **options):
DJANGO__MIRROR_SERVER='True',
DJANGO__UPSTREAM_SERVER__address='http://%s:%s' % (main_server_address, main_server_port),
DJANGO__UPSTREAM_SERVER__public_key=settings.GPG_PUBLIC_KEY,
#DJANGO__RUN_TASKS_ASYNC='False',
DJANGO__MEDIA_ROOT=temp_dir.name,
DJANGO__WARC_HOST='%s:%s' % (mirror_server_media, router_port),
)
running_processes.append(subprocess.Popen(['python', 'manage.py', 'runserver', str(mirror_server_port)],
env=dict(os.environ, **mirror_server_env)))
running_processes.append(subprocess.Popen(
['celery', '-A', 'perma', 'worker', '--loglevel=info', '-Q', 'runmirror_mirror_queue', '--hostname=runmirror_mirror_queue'],
['celery', '-A', 'perma', 'worker', '--loglevel=info', '--queues=runmirror_mirror_queue', '--hostname=runmirror_mirror_queue', '--beat', '--concurrency=1'],
env=dict(os.environ, **mirror_server_env)))

print "Syncing contents from %s to %s ..." % (settings.MEDIA_ROOT, temp_dir.name)
Expand All @@ -153,6 +154,9 @@ def handle(self, *args, **options):

site = server.Site(root)
reactor.listenTCP(router_port, site)

print "------------- Ready for requests -----------------"

reactor.run()

finally:
Expand Down
11 changes: 9 additions & 2 deletions perma_web/mirroring/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ def get_user(request):
user_info = request.COOKIES.get(settings.MIRROR_COOKIE_NAME)
if user_info:
try:
user_info = read_signed_message(user_info)
# Here we'll check if the user_info cookie is signed by the upstream server.
# Since the upstream server could be *this* server on a different domain, we check for our
# own public key if we have no upstream server configured.
upstream_key = settings.UPSTREAM_SERVER['public_key'] if settings.UPSTREAM_SERVER else settings.GPG_PUBLIC_KEY
user_info = read_signed_message(user_info, upstream_key, max_age=request.session.get_expiry_age())
user = FakeLinkUser.init_from_serialized_user(user_info)
except Exception, e:
print "Error loading mirror user:", e
Expand All @@ -64,7 +68,10 @@ def get_user(request):

class MirrorAuthenticationMiddleware(AuthenticationMiddleware):
def process_request(self, request):
request.user = SimpleLazyObject(lambda: get_user(request))
if settings.MIRRORING_ENABLED:
request.user = SimpleLazyObject(lambda: get_user(request))
else:
super(MirrorAuthenticationMiddleware, self).process_request(request)


### forwarding ###
Expand Down
36 changes: 36 additions & 0 deletions perma_web/mirroring/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
import datetime
from south.db import db
from south.v2 import SchemaMigration
from django.db import models


class Migration(SchemaMigration):

def forwards(self, orm):
# Adding model 'UpdateQueue'
db.create_table(u'mirroring_updatequeue', (
(u'id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('action', self.gf('django.db.models.fields.CharField')(default='update', max_length=10)),
('json', self.gf('django.db.models.fields.TextField')()),
('sent', self.gf('django.db.models.fields.BooleanField')(default=False)),
))
db.send_create_signal(u'mirroring', ['UpdateQueue'])


def backwards(self, orm):
# Deleting model 'UpdateQueue'
db.delete_table(u'mirroring_updatequeue')


models = {
u'mirroring.updatequeue': {
'Meta': {'ordering': "['pk']", 'object_name': 'UpdateQueue'},
'action': ('django.db.models.fields.CharField', [], {'default': "'update'", 'max_length': '10'}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'json': ('django.db.models.fields.TextField', [], {}),
'sent': ('django.db.models.fields.BooleanField', [], {'default': 'False'})
}
}

complete_apps = ['mirroring']
Empty file.
97 changes: 96 additions & 1 deletion perma_web/mirroring/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
from django.conf import settings
from django.core import serializers
from django.core.files.storage import default_storage
from django.db import models
from django.db.models.signals import post_save, post_delete, m2m_changed
from django.db import transaction
from django.dispatch import receiver

from perma.models import LinkUser
from perma.storage_backends import file_saved
from perma.utils import run_task


class FakeLinkUser(LinkUser):
Expand All @@ -23,4 +32,90 @@ def save(self, *args, **kwargs):
raise NotImplementedError("FakeLinkUser should never be saved.")

def delete(self, *args, **kwargs):
raise NotImplementedError("FakeLinkUser should never be deleted.")
raise NotImplementedError("FakeLinkUser should never be deleted.")


class UpdateQueue(models.Model):
action = models.CharField(max_length=10, default='update', choices=(('update','update'),('delete','delete')))
json = models.TextField()
sent = models.BooleanField(default=False)

@classmethod
def init_from_instance(self, instance, **kwargs):
return UpdateQueue(json=serializers.serialize("json", [instance], fields=instance.mirror_fields), **kwargs)

class Meta:
ordering = ['pk']

@classmethod
@transaction.atomic
def import_updates(cls, updates):
"""
Import a list of updates.
Each update should be a dict with keys {'pk', 'json', 'action'}.
"""
# first save updates to our downstream copy of the UpdateQueue
model_updates = []
for update in updates:
print "IMPORTING", update['json']
UpdateQueue(**update).save()
decoded_instance = serializers.deserialize("json", update['json']).next().object

# Keep track of the actual instances we need to save, but don't save them yet --
# we don't want to run updates that are just going to be superceded by the following update
# (which happens frequently with common database access patterns).
last_instance = model_updates[-1] if model_updates else None
this_instance = {'action':update['action'], 'instance':decoded_instance}
if last_instance and type(last_instance['instance']) == type(decoded_instance) and last_instance['instance'].pk == decoded_instance.pk:
# this instance is the same as the last one -- overwrite the last one
model_updates[-1] = this_instance
else:
# this instance is different from the last one -- append
model_updates.append(this_instance)

# actually apply all updates
for update in model_updates:
if update['action'] == 'update':
update['instance'].save()
elif update['action'] == 'delete':
update['instance'].delete()


### this is in models for now because it's annoying to put it in signals.py and resolve circular imports with models.py
### in Django 1.8 we can avoid that issue by putting this in signals.py and importing it from ready()
### https://docs.djangoproject.com/en/dev/topics/signals/

if settings.DOWNSTREAM_SERVERS:
def queue_update(instance, action='update'):
# Only send model classes with a mirror_fields setting.
# Set _no_downstream_update on individual instances to disable sending of trivial updates.
if not hasattr(instance, 'mirror_fields') or getattr(instance, '_no_downstream_update', False):
return

from .tasks import send_updates
update = UpdateQueue.init_from_instance(instance=instance, action=action)
update.save()
print "MAIN: Created %s %s; queueing send." % (action, update.pk)
run_task(send_updates, options={'countdown': 2})

# add all useful database updates to UpdateQueue
@receiver(post_save)
def model_update(sender, instance, **kwargs):
queue_update(instance)

@receiver(post_delete)
def model_delete(sender, instance, **kwargs):
queue_update(instance)

@receiver(m2m_changed)
def model_m2m_changed(sender, instance, **kwargs):
queue_update(instance)

@receiver(file_saved)
def broadcast_file_update(sender, **kwargs):
print "Got save message", sender, kwargs
if kwargs['instance'] == default_storage._wrapped:
from .tasks import trigger_media_sync

print "Saving."
run_task(trigger_media_sync, paths=[kwargs['path']])
Loading

0 comments on commit d292261

Please sign in to comment.