Skip to content

Commit

Permalink
Add support for specifying timerange and dumping to DB
Browse files Browse the repository at this point in the history
Make the OONI tasks cleaner
  • Loading branch information
hellais committed Jun 4, 2015
1 parent 086bdbb commit e314a97
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 32 deletions.
153 changes: 125 additions & 28 deletions pipeline/batch/spark_apps.py
Expand Up @@ -5,55 +5,152 @@
import logging

import luigi
import luigi.postgres
from luigi.task import ExternalTask
from luigi.contrib.spark import PySparkTask

from invoke.config import Config

from pipeline.helpers.util import get_luigi_target, json_dumps
from pipeline.helpers.util import json_loads, get_date_interval
from pipeline.helpers.util import get_luigi_target
from pipeline.helpers.util import get_imported_dates


config = Config(runtime_path="invoke.yaml")
logger = logging.getLogger('ooni-pipeline')


class CountInterestingReports(PySparkTask):
class FindInterestingReports(PySparkTask):
driver_memory = '2g'
executor_memory = '3g'
py_packages = ["pipeline"]

files = luigi.Parameter()
date = luigi.DateParameter()
src = luigi.Parameter()
dst = luigi.Parameter()

test_name = "http_requests_test"
software_name = "ooniprobe"

def input(self):
input_path = os.path.join(self.src, "reports-sanitised", "streams", self.files)
return get_luigi_target(input_path)
return get_luigi_target(os.path.join(self.src, "%s.json" % self.date))

def output(self):
output_path = os.path.join(self.src,
"analysis",
"http_requests_test-%s-interesting-count.json" % self.files)
output_path = os.path.join(self.dst,
"{software_name}-{test_name}"
"-interesting-{date}.json".format(
date=self.date,
test_name=self.test_name,
software_name=self.software_name))
return get_luigi_target(output_path)

def main(self, sc, *args):
df = sc.jsonFile(self.input().path)
http_requests = df.filter("test_name = 'http_requests_test' AND record_type = 'entry'")
interestings = http_requests.filter("body_length_match = false OR headers_match = false").groupBy("report_id")

with self.output.open('w') as out_file:
for interesting in interestings.count().collect():
data = json_dumps({
"report_id": interesting.report_id,
"count": interesting.count
})
out_file.write(data)
out_file.write("\n")


def run(files="2013-12-25", src="s3n://ooni-public/", worker_processes=16):
logger.info("Running CountInterestingReports for %s on %s" % (files, src))
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.jsonFile(self.input().path)
report_entries = df.filter("test_name = '{test_name}'"
" AND record_type = 'entry'".format(
test_name=self.test_name))
interestings = self.find_interesting(report_entries)

out_file = self.output().open('w')
for interesting in interestings.toJSON().collect():
out_file.write(interesting)
out_file.write("\n")
out_file.close()

def find_interesting(self, report_entries):
raise NotImplemented("You must implement a find_interesting method")


class HTTPRequestsInterestingFind(FindInterestingReports):
test_name = "http_requests_test"

def find_interesting(self, report_entries):
return report_entries.filter("body_length_match = false"
" OR headers_match = false")


class InterestingToDB(luigi.postgres.CopyToTable):
src = luigi.Parameter()
date = luigi.DateParameter()
dst = luigi.Parameter()

host = str(config.postgres.host)
database = str(config.postgres.database)
user = str(config.postgres.username)
password = str(config.postgres.password)
table = 'spark_results'

columns = [
("report_id", "TEXT"),
("report_filename", "TEXT"),
("input", "TEXT"),
]

finder = FindInterestingReports

def requires(self):
f = self.finder(src=self.src, date=self.date, dst=self.dst)
logger.info("Running the finder %s" % f)
return f

def rows(self):
with self.input().open('r') as in_file:
for line in in_file:
record = json_loads(line.decode('utf-8', 'ignore').strip('\n'))
logger.info("Adding to DB %s" % (record["report_id"]))
yield self.serialize(record)

def serialize(self, record):
return [record.report_id, record.report_filename, record.input]


class HTTPRequestsToDB(InterestingToDB):
table = 'http_requests_interesting'

columns = [
("report_id", "TEXT"),
("report_filename", "TEXT"),
("input", "TEXT")
]
finder = HTTPRequestsInterestingFind

def serialize(self, record):
return [record["report_id"], record["report_filename"],
record["input"]]


class SparkResultsToDatabase(ExternalTask):
src = luigi.Parameter()
date = luigi.DateParameter()
dst = luigi.Parameter()

def run(self):
logger.info("Running HTTPRequestsToDB for date %s" % self.date)
yield HTTPRequestsToDB(src=self.src, date=self.date, dst=self.dst)


def run(date_interval, src="s3n://ooni-public/reports-sanitised/streams/",
dst="s3n://ooni-public/processed/", worker_processes=16):

sch = luigi.scheduler.CentralPlannerScheduler()
w = luigi.worker.Worker(scheduler=sch,
worker_processes=worker_processes)
task = CountInterestingReports(src=src, files=files)
w.add(task)
w = luigi.worker.Worker(
scheduler=sch, worker_processes=worker_processes)

imported_dates = get_imported_dates(
src, aws_access_key_id=config.aws.access_key_id,
aws_secret_access_key=config.aws.secret_access_key)

interval = get_date_interval(date_interval)
for date in interval:
if str(date) not in imported_dates:
continue

logger.info("Running CountInterestingReports for %s on %s to %s" %
(date, src, dst))
task = SparkResultsToDatabase(src=src, date=date, dst=dst)
w.add(task)

w.run()
w.stop()
7 changes: 5 additions & 2 deletions pipeline/helpers/util.py
Expand Up @@ -100,8 +100,11 @@ def get_imported_dates(directory, aws_access_key_id=None,
aws_secret_access_key=aws_secret_access_key,
recursive=False)
dates = []
for date_directory in walker(directory):
dates.append(date_directory.split("/")[-2])
for listing in walker(directory):
if listing.endswith(".json"):
dates.append(listing.split("/")[-1].replace(".json", ""))
else:
dates.append(listing.split("/")[-2])
return dates


Expand Down
6 changes: 4 additions & 2 deletions tasks.py
Expand Up @@ -10,6 +10,7 @@
config = Config(runtime_path="invoke.yaml")
logger = setup_pipeline_logging(config)

os.environ["PYTHONPATH"] = os.environ.get("PYTHONPATH") if os.environ.get("PYTHONPATH") else ""
os.environ["PYTHONPATH"] = ":".join(os.environ["PYTHONPATH"].split(":") + [config.core.ooni_pipeline_path])

def _create_cfg_files():
Expand Down Expand Up @@ -220,12 +221,13 @@ def spark_submit(ctx, script,


@task
def spark_apps(ctx, files="2013-12-25.json", src="s3n://ooni-public/", workers=16):
def spark_apps(ctx, date_interval, src="s3n://ooni-public/reports-sanitised/streams/",
dst="s3n://ooni-public/processed/", workers=16):
timer = Timer()
timer.start()
from pipeline.batch import spark_apps
logger.info("Running spark apps")
spark_apps.run(files=files, src=src, worker_processes=workers)
spark_apps.run(date_interval=date_interval, src=src, dst=dst, worker_processes=workers)
logger.info("spark_submit runtime: %s" % timer.stop())


Expand Down

0 comments on commit e314a97

Please sign in to comment.