Skip to content
This repository has been archived by the owner on Mar 30, 2022. It is now read-only.

Commit

Permalink
automatically start grobid service unless a url was specified
Browse files Browse the repository at this point in the history
  • Loading branch information
de-code committed Oct 20, 2017
1 parent d05b8be commit 489b723
Show file tree
Hide file tree
Showing 9 changed files with 392 additions and 30 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
/sciencebeam.egg-info
/.eggs
/.vscode
/venv
/logs
/.temp

*.pyc
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include requirements.txt
152 changes: 126 additions & 26 deletions sciencebeam/examples/grobid_service_pdf_to_xml.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from __future__ import absolute_import

import argparse
import os
from os.path import splitext
import subprocess
import errno
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
Expand All @@ -14,17 +18,70 @@
)
from sciencebeam.transformers.xslt import xslt_transformer_from_file

def get_logger():
return logging.getLogger(__name__)

def create_fn_api_runner():
from apache_beam.runners.portability.fn_api_runner import FnApiRunner
return FnApiRunner()

def run(argv=None):
"""Main entry point; defines and runs the tfidf pipeline."""
def configure_pipeline(p, opt):
# read the files and create a collection with filename, content tuples
pcoll = p | ReadFileNamesAndContent(opt.input)

# map the pdf content to xml using Grobid
# (grobid_service either accepts the content or tuples)
output = pcoll | beam.Map(grobid_service(
opt.grobid_url, opt.grobid_action, start_service=opt.start_grobid_service
))

if opt.xslt_path:
output |= MapValues(xslt_transformer_from_file(opt.xslt_path))

# change the key (filename) from pdf to xml to reflect the new content
output |= MapKeys(lambda k: splitext(k)[0] + opt.output_suffix)

# write the files, using the key as the filename
output |= WriteToFile()

def get_cloud_project():
cmd = [
'gcloud', '-q', 'config', 'list', 'project',
'--format=value(core.project)'
]
with open(os.devnull, 'w') as dev_null:
try:
res = subprocess.check_output(cmd, stderr=dev_null).strip()
if not res:
raise Exception(
'--cloud specified but no Google Cloud Platform '
'project found.\n'
'Please specify your project name with the --project '
'flag or set a default project: '
'gcloud config set project YOUR_PROJECT_NAME'
)
return res
except OSError as e:
if e.errno == errno.ENOENT:
raise Exception(
'gcloud is not installed. The Google Cloud SDK is '
'necessary to communicate with the Cloud ML service. '
'Please install and set up gcloud.'
)
raise

def parse_args(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
required=True,
help='Input file pattern to process.')
help='Input file pattern to process.'
)
parser.add_argument(
'--output_path',
required=False,
help='Output directory to write results to.'
)
parser.add_argument(
'--output-suffix',
required=False,
Expand All @@ -33,7 +90,7 @@ def run(argv=None):
parser.add_argument(
'--grobid-url',
required=False,
default='http://localhost:8080',
default=None,
help='Base URL to the Grobid service')
parser.add_argument(
'--grobid-action',
Expand All @@ -47,43 +104,86 @@ def run(argv=None):
parser.add_argument(
'--runner',
required=False,
default='FnApiRunner',
default=None,
help='Runner.')
known_args, pipeline_args = parser.parse_known_args(argv)
parser.add_argument(
'--cloud',
default=False,
action='store_true'
)
parser.add_argument(
'--project',
type=str,
help='The cloud project name to be used for running this pipeline'
)
parser.add_argument(
'--num_workers',
default=10,
type=int,
help='The number of workers.'
)
# parsed_args, other_args = parser.parse_known_args(argv)
parsed_args = parser.parse_args(argv)

if not parsed_args.grobid_url:
parsed_args.grobid_url = 'http://localhost:8080/api'
parsed_args.start_grobid_service = True
else:
parsed_args.start_grobid_service = False

if not parsed_args.output_path:
parsed_args.output_path = os.path.dirname(parsed_args.input.replace('/*/', '/'))
if parsed_args.num_workers:
parsed_args.autoscaling_algorithm = 'NONE'
parsed_args.max_num_workers = parsed_args.num_workers
parsed_args.setup_file = './setup.py'

if parsed_args.cloud:
# Flags which need to be set for cloud runs.
default_values = {
'project':
get_cloud_project(),
'temp_location':
os.path.join(os.path.dirname(parsed_args.output_path), 'temp'),
'runner':
'DataflowRunner',
'save_main_session':
True,
}
else:
# Flags which need to be set for local runs.
default_values = {
'runner': 'FnApiRunner',
}

get_logger().info('default_values: %s', default_values)
for kk, vv in default_values.iteritems():
if kk not in parsed_args or not vars(parsed_args)[kk]:
vars(parsed_args)[kk] = vv
get_logger().info('parsed_args: %s', parsed_args)

return parsed_args

def run(argv=None):
"""Main entry point; defines and runs the tfidf pipeline."""
known_args = parse_args(argv)

# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options = PipelineOptions.from_dictionary(vars(known_args))
pipeline_options.view_as(SetupOptions).save_main_session = True

runner = known_args.runner
if runner == 'FnApiRunner':
runner = create_fn_api_runner()

with beam.Pipeline(runner, options=pipeline_options) as p:
# read the files and create a collection with filename, content tuples
pcoll = p | ReadFileNamesAndContent(known_args.input)

# map the pdf content to xml using Grobid
# (grobid_service either accepts the content or tuples)
output = pcoll | beam.Map(grobid_service(
known_args.grobid_url, known_args.grobid_action
))

if known_args.xslt_path:
output |= MapValues(xslt_transformer_from_file(known_args.xslt_path))

# change the key (filename) from pdf to xml to reflect the new content
output |= MapKeys(lambda k: splitext(k)[0] + known_args.output_suffix)

# write the files, using the key as the filename
output |= WriteToFile()
configure_pipeline(p, known_args)

# Execute the pipeline and wait until it is completed.


if __name__ == '__main__':
import logging
logging.getLogger().setLevel(logging.INFO)
logging.basicConfig(level='INFO')

run()
19 changes: 15 additions & 4 deletions sciencebeam/transformers/grobid_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,29 @@
import requests
import six

NAME = __name__
from sciencebeam.transformers.grobid_service_wrapper import (
GrobidServiceWrapper
)

PROCESS_HEADER_DOCUMENT_PATH = '/processHeaderDocument'

def grobid_service(base_url, path):
service_wrapper = GrobidServiceWrapper()

def get_logger():
return logging.getLogger(__name__)

def start_service_if_not_running():
service_wrapper.start_service_if_not_running()

def grobid_service(base_url, path, start_service=True):
url = base_url + path

def do_grobid_service(x):
logger = logging.getLogger(NAME)
if start_service:
start_service_if_not_running()
filename = x[0] if isinstance(x, tuple) else 'unknown.pdf'
content = x[1] if isinstance(x, tuple) else x
logger.info('processing: %s (%d) - %s', filename, len(content), url)
get_logger().info('processing: %s (%d) - %s', filename, len(content), url)
response = requests.post(url,
files={'input': (filename, six.StringIO(content))},
)
Expand Down
126 changes: 126 additions & 0 deletions sciencebeam/transformers/grobid_service_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import logging
from threading import Thread
from functools import partial
import shlex
import subprocess
from subprocess import PIPE
import atexit
import os
from zipfile import ZipFile
from shutil import rmtree
from urllib import URLopener

from sciencebeam.utils.io_utils import makedirs
from sciencebeam.utils.zip_utils import extract_all_with_executable_permission

def get_logger():
return logging.getLogger(__name__)

def iter_read_lines(reader):
while True:
line = reader.readline()
if not line:
break
yield line

def stream_lines_to_logger(lines, logger, prefix=''):
for line in lines:
line = line.strip()
if line:
logger.info('%s%s', prefix, line)

class GrobidServiceWrapper(object):
def __init__(self):
self.grobid_service_instance = None
temp_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../.temp'))
self.grobid_service_target_directory = os.path.join(temp_dir, 'grobid-service')
self.grobid_service_zip_filename = os.path.join(temp_dir, 'grobid-service.zip')
self.grobid_service_zip_url = (
'https://storage.googleapis.com/elife-ml/artefacts/grobid-service.zip'
)

def stop_service_if_running(self):
if self.grobid_service_instance is not None:
get_logger().info('stopping instance: %s', self.grobid_service_instance)
self.grobid_service_instance.kill()

def download__grobid_service_zip_if_not_exist(self):
if not os.path.isfile(self.grobid_service_zip_filename):
get_logger().info(
'downloading %s to %s',
self.grobid_service_zip_url,
self.grobid_service_zip_filename
)

makedirs(os.path.dirname(self.grobid_service_zip_filename), exists_ok=True)

temp_zip_filename = self.grobid_service_zip_filename + '.part'
if os.path.isfile(temp_zip_filename):
os.remove(temp_zip_filename)
URLopener().retrieve(self.grobid_service_zip_url, temp_zip_filename)
os.rename(temp_zip_filename, self.grobid_service_zip_filename)

def unzip_grobid_service_zip_if_target_directory_does_not_exist(self):
if not os.path.isdir(self.grobid_service_target_directory):
self.download__grobid_service_zip_if_not_exist()
get_logger().info(
'unzipping %s to %s',
self.grobid_service_zip_filename,
self.grobid_service_target_directory
)
temp_target_directory = self.grobid_service_target_directory + '.part'
if os.path.isdir(temp_target_directory):
rmtree(temp_target_directory)

with ZipFile(self.grobid_service_zip_filename, 'r') as zf:
extract_all_with_executable_permission(zf, temp_target_directory)
sub_dir = os.path.join(temp_target_directory, 'grobid-service')
if os.path.isdir(sub_dir):
os.rename(sub_dir, self.grobid_service_target_directory)
rmtree(temp_target_directory)
else:
os.rename(temp_target_directory, self.grobid_service_target_directory)

def start_service_if_not_running(self):
get_logger().info('grobid_service_instance: %s', self.grobid_service_instance)
if self.grobid_service_instance is None:
self.unzip_grobid_service_zip_if_target_directory_does_not_exist()
grobid_service_home = os.path.abspath(self.grobid_service_target_directory)
cwd = grobid_service_home + '/bin'
grobid_service_home_jar_dir = grobid_service_home + '/lib'
command_line = 'java -cp "{}/*" org.grobid.service.main.GrobidServiceApplication'.format(
grobid_service_home_jar_dir
)
args = shlex.split(command_line)
get_logger().info('command_line: %s', command_line)
get_logger().info('args: %s', args)
self.grobid_service_instance = subprocess.Popen(
args, cwd=cwd, stdout=PIPE, stderr=subprocess.STDOUT
)
if self.grobid_service_instance is None:
raise RuntimeError('failed to start grobid service')
atexit.register(self.stop_service_if_running)
pstdout = self.grobid_service_instance.stdout
out_prefix = 'stdout: '
while True:
line = pstdout.readline().strip()
if line:
get_logger().info('%s%s', out_prefix, line)
if 'jetty.server.Server: Started' in line:
get_logger().info('grobid service started successfully')
break
if 'ERROR' in line or 'Error' in line:
raise RuntimeError('failed to start grobid service due to {}'.format(line))
t = Thread(target=partial(
stream_lines_to_logger,
lines=iter_read_lines(pstdout),
logger=get_logger(),
prefix=out_prefix
))
t.daemon = True
t.start()

if __name__ == '__main__':
logging.basicConfig(level='INFO')

GrobidServiceWrapper().start_service_if_not_running()
Empty file added sciencebeam/utils/__init__.py
Empty file.
16 changes: 16 additions & 0 deletions sciencebeam/utils/io_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import os
import errno

def makedirs(path, exists_ok=False):
try:
# Python 3
os.makedirs(path, exists_ok=exists_ok)
except TypeError:
# Python 2
try:
os.makedirs(path)
except OSError as e:
if e.errno == errno.EEXIST and os.path.isdir(path) and exists_ok:
pass
else:
raise
Loading

0 comments on commit 489b723

Please sign in to comment.