Skip to content

Commit

Permalink
refs #921. Adds configuration for automatic Twitter account handling.…
Browse files Browse the repository at this point in the history
… Handles account info in harvest messages.
  • Loading branch information
Justin Littman committed May 30, 2018
1 parent d0a6578 commit 3df8932
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 12 deletions.
33 changes: 29 additions & 4 deletions sfm/message_consumer/sfm_ui_consumer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from sfmutils.consumer import BaseConsumer
from sfmutils.harvester import CODE_UNKNOWN_ERROR
from sfmutils.harvester import CODE_UNKNOWN_ERROR, CODE_UID_NOT_FOUND, CODE_TOKEN_NOT_FOUND, CODE_TOKEN_UNAUTHORIZED, \
CODE_UID_UNAUTHORIZED, CODE_TOKEN_SUSPENDED, CODE_UID_SUSPENDED
from ui.models import User, Harvest, Collection, Seed, Warc, Export, HarvestStat
from ui.jobs import collection_stop
from ui.utils import get_email_addresses_for_collection_set, get_site_url
Expand Down Expand Up @@ -40,7 +41,8 @@ def on_message(self):
elif self.routing_key == "harvest.start.web":
self._on_web_harvest_start_message()
else:
log.warn("Unexpected message with routing key %s: %s", self.routing_key, json.dumps(self.message, indent=4))
log.warn("Unexpected message with routing key %s: %s", self.routing_key,
json.dumps(self.message, indent=4))
except Exception, e:
log.exception(e)
raise e
Expand Down Expand Up @@ -102,6 +104,29 @@ def _on_harvest_status_message(self):
except ObjectDoesNotExist:
log.error("Seed model object with seed_id %s not found to update uid to %s", seed_id, uid)

# Delete seeds based on warnings and collection harvest options
harvest_options = json.loads(harvest.collection.harvest_options)
for warning_msg in self.message.get("warnings", []):
log.info(warning_msg)
if warning_msg.get('seed_id'):
history_note = None
if warning_msg['code'] in (CODE_UID_NOT_FOUND, CODE_TOKEN_NOT_FOUND) and harvest_options.get(
'deactivate_not_found_seeds'):
history_note = "Account deleted or not found."
elif warning_msg['code'] in (CODE_UID_UNAUTHORIZED, CODE_TOKEN_UNAUTHORIZED) and harvest_options.get(
'deactivate_unauthorized_seeds'):
history_note = "Account protected."
elif warning_msg['code'] in (CODE_UID_SUSPENDED, CODE_TOKEN_SUSPENDED) and harvest_options.get(
'deactivate_suspended_seeds'):
history_note = "Account suspended."
if history_note:
seed = Seed.objects.get(seed_id=warning_msg['seed_id'])
if seed.is_active:
log.debug("Turning seed %s off: %s", (seed.token or seed.uid), history_note)
seed.is_active = False
seed.history_note = history_note
seed.save()

# Update stats
if self.message["status"] != Harvest.FAILURE:
day_stats = self.message.get("stats", {})
Expand Down Expand Up @@ -129,8 +154,8 @@ def _on_harvest_status_message(self):

# Send email if completed and failed or has messages
if harvest.status == Harvest.FAILURE or (
harvest.status in (Harvest.SUCCESS, Harvest.PAUSED) and (
harvest.infos or harvest.warnings or harvest.errors)):
harvest.status in (Harvest.SUCCESS, Harvest.PAUSED) and (
harvest.infos or harvest.warnings or harvest.errors)):

# Get emails for group members
receiver_emails = get_email_addresses_for_collection_set(harvest.collection.collection_set,
Expand Down
46 changes: 44 additions & 2 deletions sfm/message_consumer/test_sfm_ui_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ def setUp(self):
harvest_type="test_type", name="test_collection",
harvest_options=json.dumps({}))
stream_collection = Collection.objects.create(collection_set=collection_set, credential=credential,
harvest_type=Collection.TWITTER_SAMPLE, name="test_stream_collection",
harvest_options=json.dumps({}), is_on=True)
harvest_type=Collection.TWITTER_SAMPLE,
name="test_stream_collection",
harvest_options=json.dumps({}), is_on=True)

self.assertTrue(stream_collection.is_on)
Seed.objects.create(collection=collection, uid="131866249@N02", seed_id='1')
Seed.objects.create(collection=collection, token="library_of_congress", seed_id='2')
Expand Down Expand Up @@ -156,6 +158,46 @@ def test_harvest_status_on_message(self):
self.assertEqual("f0c3c5ef7031", harvest.host)
self.assertEqual("39", harvest.instance)

# Now changes harvest options and check that seeds deleted.
# "deactivate_not_found_seeds": self.cleaned_data["deleted_accounts_option"],
# "deactivate_unauthorized_seeds": self.cleaned_data["protected_accounts_options"],
# "deactivate_suspended_seeds": self.cleaned_data["suspended_accounts_option"]
collection = Collection.objects.get(name="test_collection")
# Make sure both seeds are on.
seed_ids = []
for seed in collection.seeds.all():
self.assertTrue(seed.is_active)
seed_ids.append(seed.seed_id)
collection.harvest_options = json.dumps({
"deactivate_not_found_seeds": True,
"deactivate_unauthorized_seeds": False,
"deactivate_suspended_seeds": False
})
collection.save()

self.consumer.message = {
"id": "test:1",
"status": Harvest.SUCCESS,
"date_started": "2015-07-28T11:18:36.640044",
"date_ended": "2015-07-28T11:18:42.539470",
"warnings": [
{"code": "token_unauthorized", "message": "This token is unauthorized.", "seed_id": seed_ids[0]},
{"code": "token_not_found", "message": "This token is not found.", "seed_id": seed_ids[1]},
],
"service": "Twitter Harvester",
"host": "f0c3c5ef7031",
"instance": "39",
}
# Trigger on_message
self.consumer.on_message()

unauthorized_seed = Seed.objects.get(seed_id=seed_ids[0])
self.assertTrue(unauthorized_seed.is_active)

not_found_seed = Seed.objects.get(seed_id=seed_ids[1])
self.assertFalse(not_found_seed.is_active)


@patch("message_consumer.sfm_ui_consumer.collection_stop")
def test_harvest_status_stream_failed_on_message(self, mock_collection_stop):
self.consumer.routing_key = "harvest.status.twitter.twitter_sample"
Expand Down
28 changes: 22 additions & 6 deletions sfm/ui/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,19 @@ class CollectionTwitterUserTimelineForm(BaseCollectionForm):
media_option = forms.BooleanField(initial=False, required=False, label=TWITTER_MEDIA_LABEL)
web_resources_option = forms.BooleanField(initial=False, required=False, label=TWITTER_WEB_RESOURCES_LABEL)
user_images_option = forms.BooleanField(initial=False, required=False, label=USER_PROFILE_LABEL)
deleted_accounts_option = forms.BooleanField(initial=False, required=False, label="Automatically delete seeds "
"for deleted / not found "
"accounts.")
suspended_accounts_option = forms.BooleanField(initial=False, required=False, label="Automatically delete seeds "
"for suspended accounts.")
protected_accounts_options = forms.BooleanField(initial=False, required=False, label="Automatically delete seeds "
"for protected accounts.")

def __init__(self, *args, **kwargs):
super(CollectionTwitterUserTimelineForm, self).__init__(*args, **kwargs)
self.helper.layout[0][4].extend(('incremental', 'media_option', 'user_images_option', 'web_resources_option'))
self.helper.layout[0][4].extend(('incremental', 'media_option', 'user_images_option', 'web_resources_option',
'deleted_accounts_option', 'suspended_accounts_option',
'protected_accounts_options'))

if self.instance and self.instance.harvest_options:
harvest_options = json.loads(self.instance.harvest_options)
Expand All @@ -190,6 +199,12 @@ def __init__(self, *args, **kwargs):
self.fields['web_resources_option'].initial = harvest_options["web_resources"]
if "user_images" in harvest_options:
self.fields['user_images_option'].initial = harvest_options["user_images"]
if "deactivate_not_found_seeds" in harvest_options:
self.fields['deleted_accounts_option'].initial = harvest_options["deactivate_not_found_seeds"]
if "deactivate_unauthorized_seeds" in harvest_options:
self.fields['protected_accounts_options'].initial = harvest_options["deactivate_unauthorized_seeds"]
if "deactivate_suspended_seeds" in harvest_options:
self.fields['suspended_accounts_option'].initial = harvest_options["deactivate_suspended_seeds"]

def save(self, commit=True):
m = super(CollectionTwitterUserTimelineForm, self).save(commit=False)
Expand All @@ -198,7 +213,10 @@ def save(self, commit=True):
"incremental": self.cleaned_data["incremental"],
"media": self.cleaned_data["media_option"],
"web_resources": self.cleaned_data["web_resources_option"],
"user_images": self.cleaned_data["user_images_option"]
"user_images": self.cleaned_data["user_images_option"],
"deactivate_not_found_seeds": self.cleaned_data["deleted_accounts_option"],
"deactivate_unauthorized_seeds": self.cleaned_data["protected_accounts_options"],
"deactivate_suspended_seeds": self.cleaned_data["suspended_accounts_option"]
}
m.harvest_options = json.dumps(harvest_options, sort_keys=True)
m.save()
Expand Down Expand Up @@ -390,7 +408,6 @@ def __init__(self, *args, **kwargs):
super(CollectionWeiboSearchForm, self).__init__(*args, **kwargs)
self.helper.layout[0][4].extend(('image_sizes', 'incremental', 'web_resources_option'))


if self.instance and self.instance.harvest_options:
harvest_options = json.loads(self.instance.harvest_options)
if "incremental" in harvest_options:
Expand Down Expand Up @@ -591,7 +608,7 @@ def __init__(self, *args, **kwargs):
token = json.loads(self.instance.token)
# This except handling is for converting over old query tokens
except ValueError:
token = { 'query': self.instance.token }
token = {'query': self.instance.token}
if 'query' in token:
self.fields['query'].initial = token['query']
if 'geocode' in token:
Expand Down Expand Up @@ -626,7 +643,6 @@ def save(self, commit=True):
return m



class SeedWeiboSearchForm(BaseSeedForm):
class Meta(BaseSeedForm.Meta):
fields = ['token']
Expand Down Expand Up @@ -832,7 +848,7 @@ def clean_tokens(self):
numtoken.append(clean_t)
elif clean_t and not clean_t.isdigit():
strtoken.append(clean_t)
finaltokens.append(clean_t+"\n")
finaltokens.append(clean_t + "\n")
if seed_type == 'token' and numtoken:
raise ValidationError(
'Screen names may not be numeric. Please correct the following seeds: ' + ', '.join(numtoken) + '.')
Expand Down

0 comments on commit 3df8932

Please sign in to comment.