Skip to content
This repository has been archived by the owner on Nov 3, 2021. It is now read-only.

Commit

Permalink
Sites release dashboard (bug 623359). Snuck in unit test for dumbo run.
Browse files Browse the repository at this point in the history
Channel handling for version select box still missing.
  • Loading branch information
x1B committed Feb 3, 2011
1 parent 99fadd5 commit 3e4f53d
Show file tree
Hide file tree
Showing 18 changed files with 322 additions and 227 deletions.
81 changes: 5 additions & 76 deletions apps/website_issues/management/commands/generate_sites.py
@@ -1,26 +1,9 @@
import os, os.path, sys, subprocess, pipes
import bz2
from shutil import rmtree
from optparse import make_option
from tempfile import mkdtemp

from django.conf import settings
from django.core.management.base import BaseCommand, CommandError

from dumbo.util import system
from website_issues.mapreduce.normalize_to_tsv import normalize_unix
from website_issues.mapreduce import generate_sites

from settings import path


def system(args, more_env={}, stdout=sys.stdout, stderr=sys.stderr):
print >> stderr, 'Calling:', " ".join(args)
env = os.environ.copy()
env.update(more_env)
process = subprocess.Popen(" ".join(args), shell=True,
env=env,
stdout=stdout, stderr=stderr)
return os.waitpid(process.pid, 0)[1] / 256

class Command(BaseCommand):
"""
Expand All @@ -46,66 +29,12 @@ class Command(BaseCommand):
help='Do not load results into the sites database.'),
make_option('--clean',
action='store_true',
dest='do_clean',
dest='only_clean',
default=False,
help='Clean work/output files and exit.'),
)

def handle(self, *args, **options):
dest_dir = os.path.join(settings.TSV_EXPORT_DIR, "sites")
if not os.path.exists(dest_dir):
os.mkdir(dest_dir)
if options["do_clean"]:
print "Removing output at %s" % dest_dir
rmtree(dest_dir)
return
print "Using work/output directory: %s" % dest_dir

source = options['source']
if source is None:
source = os.path.join(settings.TSV_EXPORT_DIR, 'opinions.tsv.bz2')
if not os.path.exists(source):
raise CommandError("Missing input file: %s" % source)
if source.endswith(".bz2"):
print "Decompressing %s" % source
# we need to decompress the file to disk for dumbo to work with it
infile = bz2.BZ2File(source)
outname = os.path.join(dest_dir,
os.path.basename(source[:-len(".bz2")]))
with open(outname, "w+") as outfile:
for line in infile: outfile.write(line)
infile.close()
source = outname

mapreduce_dir = path("apps/website_issues/mapreduce")

dumbo_job_file = os.path.join(mapreduce_dir, "job.py")
show_counters = os.path.join(mapreduce_dir, "show_counters.py")

if dest_dir is None: dest_dir = mkdtemp()
dest = os.path.join(dest_dir, "clustered_comments.tsv.coded")
if os.path.exists(dest): os.remove(dest)

print "Generating site from %s using dumbo (unix backend)." % source

q = lambda s: pipes.quote(s)
python_env = {"PYTHONPATH": q(":".join(sys.path))}
system(["dumbo start", q(dumbo_job_file),
"-input", q(source), "-output", q(dest),
"2>&1 | python", q(show_counters)],
more_env=python_env)

print "Exporting result to %s" % dest_dir
normalize_unix(open(dest, "r"), dest_dir)

if options["skip_load"]: return

print "Loading results into sites database."
sql_load = os.path.join(mapreduce_dir, "load.sql")
sql_filter = """sed "s/INFILE '/INFILE '%s\\//g" """ % \
q(dest_dir).replace('/', '\\/')
system(["cat", q(sql_load),
"|" , sql_filter,
"| python ./manage.py dbshell --database=website_issues"],
more_env=python_env)
normalize_unix(open(dest, "r"), dest_dir)
return generate_sites(options["source"],
options["skip_load"],
options["only_clean"])
82 changes: 82 additions & 0 deletions apps/website_issues/mapreduce/__init__.py
@@ -0,0 +1,82 @@
import os, os.path, sys, subprocess, pipes
import bz2
from shutil import rmtree
from tempfile import mkdtemp

from dumbo.util import system

from django.conf import settings
from settings import path

from website_issues.mapreduce.normalize_to_tsv import normalize_unix

def _system(args, more_env={}):
print >> sys.stderr, 'Calling:', " ".join(args)
env = os.environ.copy()
env.update(more_env)
process = subprocess.Popen(" ".join(args), shell=True,
env=env,
stdout=sys.stdout,
stderr=sys.stderr)
return os.waitpid(process.pid, 0)[1] / 256


def generate_sites(source, skip_load=False, only_clean=False):
dest_dir = os.path.join(settings.TSV_EXPORT_DIR, "sites")
if not os.path.exists(dest_dir):
os.mkdir(dest_dir)
if only_clean:
print "Removing output at %s" % dest_dir
rmtree(dest_dir)
return
print "Using work/output directory: %s" % dest_dir

if source is None:
source = os.path.join(settings.TSV_EXPORT_DIR, 'opinions.tsv.bz2')
if not os.path.exists(source):
raise Exception("Missing input file: %s" % source)
if source.endswith(".bz2"):
print "Decompressing %s" % source
# we need to decompress the file to disk for dumbo to work with it
infile = bz2.BZ2File(source)
outname = os.path.join(dest_dir,
os.path.basename(source[:-len(".bz2")]))
with open(outname, "w+") as outfile:
for line in infile: outfile.write(line)
infile.close()
source = outname

mapreduce_dir = path("apps/website_issues/mapreduce")

dumbo_job_file = os.path.join(mapreduce_dir, "job.py")
show_counters = os.path.join(mapreduce_dir, "show_counters.py")

if dest_dir is None: dest_dir = mkdtemp()
dest = os.path.join(dest_dir, "clustered_comments.tsv.coded")
if os.path.exists(dest): os.remove(dest)

print "Generating site from %s using dumbo (unix backend)." % source

q = lambda s: pipes.quote(s)
python_env = {"PYTHONPATH": q(":".join(sys.path))}
_system(["dumbo start", q(dumbo_job_file),
"-input", q(source), "-output", q(dest),
"2>&1 | python", q(show_counters)],
more_env=python_env)

print "Exporting result to %s" % dest_dir
normalize_unix(open(dest, "r"), dest_dir)

database = settings.DATABASES["website_issues"]["NAME"]
print "Loading results into sites database: %s" % database
if skip_load: return

print "Loading results into sites database."
sql_load = os.path.join(mapreduce_dir, "load.sql")
sql_filter = """sed "s/INFILE '/INFILE '%s\\//g" """ % \
q(dest_dir).replace('/', '\\/')
_system(["cat", q(sql_load),
"|" , sql_filter,
"| python ./manage.py dbshell --database=%s" % database],
more_env=python_env)
normalize_unix(open(dest, "r"), dest_dir)
25 changes: 19 additions & 6 deletions apps/website_issues/mapreduce/job.py
@@ -1,19 +1,32 @@
import dumbo
from dumbo.lib import identitymapper, identityreducer

# Prepare the job environment. This is like manage.py, but for dumbo jobs.
# The PYTHONPATH for the app needs to be set in the environment.
try:
try: import settings_local as settings
except ImportError: import settings
except ImportError:
import sys
sys.stderr.write("Dumbo job cannot find settings. Is the path OK?\n")
sys.stderr.write("PYTHONPATH: %r" % sys.path)
sys.exit(1)
from django.core.management import setup_environ
setup_environ(settings)

from website_issues.mapreduce import tasks


"""Dumbo job that can be executed on a hadoop cluster."""


def runner(job):
job.additer(tasks.SiteSummaryMapper, tasks.CommentClusteringReducer)
job.additer(identitymapper, tasks.ClusterIdReducer)
job.additer(identitymapper, tasks.SummarySizeReducer)
job.additer(identitymapper, tasks.SummaryIdReducer)
job.additer(identitymapper, tasks.DenormalizingReducer)
job.additer(identitymapper, identityreducer)
job.additer(tasks.SiteSummaryMapper, tasks.CommentClusteringReducer)
job.additer(identitymapper, tasks.ClusterIdReducer)
job.additer(identitymapper, tasks.SummarySizeReducer)
job.additer(identitymapper, tasks.SummaryIdReducer)
job.additer(identitymapper, tasks.DenormalizingReducer)
job.additer(identitymapper, identityreducer)

if __name__ == "__main__":
dumbo.main(runner)
10 changes: 7 additions & 3 deletions apps/website_issues/mapreduce/tasks.py
Expand Up @@ -4,6 +4,7 @@

from website_issues.utils import normalize_url

from input import OPINION_PRAISE, OPINION_ISSUE, OPINION_BROKEN

"""Map/Reduce clustering for sites.
Expand Down Expand Up @@ -45,11 +46,14 @@ def __init__(self):
self.comments_out = self.counters["comments used"]

def __call__(self, data):
supported_types = set([OPINION_BROKEN.short,
OPINION_ISSUE.short,
OPINION_PRAISE.short])
for key, value in recombined(data):
self.comments_in += 1
m_id, ts, type, product, version, os, locale, \
manufacturer, device, url, message = value.split('\t', 10)
if url == "" or type not in ("praise", "issue"): continue
if url == "" or type not in supported_types: continue
app = '<%s>' % product
site = normalize_url(url)
out_keys = cartesian((version,), (site,), (app, os, None), (type,))
Expand Down Expand Up @@ -197,8 +201,8 @@ def __call__(self, key, values_gen):
values = list(values_gen)
s_sad_size, s_happy_size = 0, 0
for type, _, s_size, _, _, _, _, _, _, _ in values:
if type == "praise": s_happy_size = s_size
elif type == "issue": s_sad_size = s_size
if type == OPINION_PRAISE.short: s_happy_size = s_size
elif type == OPINION_ISSUE.short: s_sad_size = s_size
for type, s_id, s_size, c_id, c_type, c_size, m_refid, m_id, message, \
score in values:
yield \
Expand Down
27 changes: 27 additions & 0 deletions apps/website_issues/mapreduce/tests.py
Expand Up @@ -10,6 +10,7 @@
from django.core.management import call_command

from website_issues.mapreduce import tasks
from website_issues.mapreduce import generate_sites


TEST_FILE = "lib/website_issues/test_opinions.tsv"
Expand Down Expand Up @@ -142,3 +143,29 @@ def _denormalized(self):
def test_denormalizing_reducer(self):
pairs = self._denormalized()
eq_(len(pairs), 222)


class TestJob(object):
# So we can unit-test dumbo command invocation, here is a monkey patch for
# http://code.google.com/p/python-nose/issues/detail?id=290
from nose.ext.dtcompat import _SpoofOut
class SpoofFile(_SpoofOut):
def fileno(self):
return 0

class SpoofContext(object):
def __enter__(self):
self._true_stdout = sys.stdout
self._true_stderr = sys.stderr
sys.stdout = TestJob.SpoofFile(sys.stdout)
sys.stderr = TestJob.SpoofFile(sys.stderr)

def __exit__(self, exc_type, exc_value, exc_tb):
sys.stdout = self._true_stdout
sys.stderr = self._true_stderr

def test_generate_job(self):
with TestJob.SpoofContext():
generate_sites(TEST_FILE, skip_load=True)
generate_sites(TEST_FILE, skip_load=True)
generate_sites(TEST_FILE, skip_load=True, only_clean=True)
@@ -0,0 +1,5 @@
{% macro button(form, name, value='', label='', title='', classes='') %}
{% set sel = (form.cleaned_data[name] == value) %}
<a class="{{ 'selected' if sel else ''}} {{ classes }}"
href="{{ sites_url(form,**{name: value}) }}">{{ label }}</a>
{% endmacro %}
Expand Up @@ -36,4 +36,4 @@ <h2 title="{{ site.domain }}">
{% endfor %}
</ul>

{% include "website_issues/paginate_sites.html" %}
{% include "website_issues/includes/paginate_sites.html" %}
@@ -0,0 +1,68 @@
{% from 'website_issues/includes/macros.html' import button %}

<div class="col left">
<form method="get" action="{{ url('website_issues') }}" id="filters" class="filters segments block">

<div class="choice">
<h3>{{ _('Product') }}</h3>
<div>
{{ products_block(products, product) }}
{{ versions_block(versions, version) }}
</ul>
</div>
</div>

{% if request.channel != "release" %}
<div class="choice" id="filter_type">
<h3>{{ _('Type of Feedback') }}</h3>
<div>
<ul>
<li>{{ button(form, 'sentiment', '', label=_('All', 'sites_search_all_feedback'), title=_('All feedback')) }}</li>
<li>{{ button(form, 'sentiment', 'happy', label=_('Praise'), title=_('Praise only')) }}</li>
<li>{{ button(form, 'sentiment', 'sad', label=_('Issues'), title=_('Issues only')) }}</li>
</ul>
</div>
</div>
{% endif %}

<div class="choice" id="filter_platform">
<h3>{{ _('Platforms') }}</h3>
<div>
<ul>
<li>{{ button(form, 'os', '', label=_('All', 'sites_search_all_platforms'), title=_('All platforms')) }}</li>
{% for os in oses %}
<li>{{ button(form, 'os', os.short, label=os.short, title=os.pretty) }}</li>
{% endfor %}
</ul>
</div>
</div>

{# TODO add minimum feedback count choice
<div class="choice">
<h3>Min Related</h3>

<div>
<ul>
<!--
--><li><a class="selected" href="search.html" title="No minimum">Any</a></li><!--
--><li><a class="" href="search.html" title="25+ messages">25</a></li><!--
--><li><a class="" href="search.html" title="50+ messages">50</a></li><!--
--><li><a class="" href="search.html" title="100+ messages">100</a></li><!--
--><li><a class="" href="search.html" title="250+ messages">250</a></li>
</ul>
</div>
</div>
#}

</form>
</div><!--
--><div class="col middle wide">
<div id="themes" class="block">
{% if not site %}
{% include "website_issues/includes/sites_list.html" %}
{% else %}
{% include "website_issues/includes/single_site.html" %}
{% endif %}
</div>
</div>
Expand Up @@ -46,7 +46,7 @@ <h3>{{ _('No results found!') }}</h3>
{{ count }} messages
{% endtrans %}
</a></li>
{% if site.positive is none %}
{% if request.channel == 'beta' and site.positive is none %}
{% with praise_perc=(site.praise_count/site.size*100)|int %}
<li><a href="{{ sites_url(form, sentiment='happy', url=site.url) }}">{{
_('{percentage}% praise')|f(percentage=praise_perc) }}</a></li>
Expand Down Expand Up @@ -79,4 +79,4 @@ <h3>{{ _('No results found!') }}</h3>
{% endfor %}
</ul>

{% include "website_issues/paginate_sites.html" %}
{% include "website_issues/includes/paginate_sites.html" %}

0 comments on commit 3e4f53d

Please sign in to comment.