Skip to content

Commit

Permalink
Merge pull request #2 from Gathondu/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
Gathondu committed Jun 23, 2017
2 parents a7d25a4 + fb16d0b commit b963494
Show file tree
Hide file tree
Showing 16 changed files with 356 additions and 207 deletions.
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,27 @@ You can set the required environment variables like so
$ export MORPH_AWS_REGION=<aws_region>
$ export MORPH_AWS_ACCESS_KEY_ID= <aws_access_key_id>
$ export MORPH_AWS_SECRET_KEY= <aws_secret_key>
$ export ES_HOST= <elastic_search_host_endpoint> # Do not set this if you would like to use elastic search locally on your machine
$ export WEBHOOK_URL= <slack_webhook_url> # Do not set this if you don't want to post error messages on Slack
```
**If you want to use elasticsearch locally on your machine use the following instructions to set it up**

For linux and windows users, follow instructions from this [link](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html)

For mac users run `brew install elasticsearch` on your terminal

**If you want to post messages on slack**

Set up `Incoming Webhooks` [here](https://slack.com/signin?redir=%2Fservices%2Fnew%2Fincoming-webhook) and set the global environment for the `WEBHOOK_URL`

If you set up elasticsearch locally run it `$ elasticsearch`

You can now run the scrapers `$ python scraper.py` (It might take a while)

You can now run the scrapers `$ python scraper.py` (It might take a while and you might need to change the endpoints in config.py if you haven't authorization for them)

## Running the tests
_**make sure if you use elasticsearch locally, it's running**_

Use nosetests to run tests (with stdout) like this:
```$ nosetests --nocapture```

17 changes: 17 additions & 0 deletions circle.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
machine:
python:
version: 2.7.5
java:
version: openjdk8
dependencies:
pre:
- pip install -r requirements.txt
post:
- wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.4.2.tar.gz
- tar -xzf elasticsearch-5.4.2.tar.gz
- elasticsearch-5.4.2/bin/elasticsearch: {background: true}
- sleep 10 && wget --waitretry=5 --retry-connrefused -v http://127.0.0.1:9200/

test:
override:
- nosetests --nocapture
25 changes: 14 additions & 11 deletions healthtools/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,26 @@

# sites to be scraped
SITES = {
'DOCTORS': 'http://medicalboard.co.ke/online-services/retention/?currpage={}',
'FOREIGN_DOCTORS': 'http://medicalboard.co.ke/online-services/foreign-doctors-license-register/?currpage={}',
'CLINICAL_OFFICERS': 'http://clinicalofficerscouncil.org/online-services/retention/?currpage={}',
'TOKEN_URL': 'http://api.kmhfl.health.go.ke/o/token/'
"DOCTORS": "http://medicalboard.co.ke/online-services/retention/?currpage={}",
"FOREIGN_DOCTORS": "http://medicalboard.co.ke/online-services/foreign-doctors-license-register/?currpage={}",
"CLINICAL_OFFICERS": "http://clinicalofficerscouncil.org/online-services/retention/?currpage={}",
"TOKEN_URL": "http://api.kmhfl.health.go.ke/o/token/"
}

AWS = {
"aws_access_key_id": os.getenv("MORPH_AWS_ACCESS_KEY"),
"aws_secret_access_key": os.getenv("MORPH_AWS_SECRET_KEY"),
"region_name": 'eu-west-1',
# Doctors document endpoint
"cloudsearch_doctors_endpoint": "http://doc-cfa-healthtools-ke-doctors-m34xee6byjmzcgzmovevkjpffy.eu-west-1.cloudsearch.amazonaws.com/",
# Clinical document endpoint
"cloudsearch_cos_endpoint": "http://doc-cfa-healthtools-ke-cos-nhxtw3w5goufkzram4er7sciz4.eu-west-1.cloudsearch.amazonaws.com/",
# Health facilities endpoint
"cloudsearch_health_faciities_endpoint":"https://doc-health-facilities-ke-65ftd7ksxazyatw5fiv5uyaiqi.eu-west-1.cloudsearch.amazonaws.com",
"region_name": os.getenv("MORPH_AWS_REGION", "eu-west-1"),
"s3_bucket": os.getenv("S3_BUCKET", "cfa-healthtools-ke")
}

ES = {
"host": os.getenv("ES_HOST", None),
"index": "healthtools"
}

SLACK = {
"url": os.getenv("WEBHOOK_URL")
}

TEST_DIR = os.getcwd() + "/healthtools/tests"
157 changes: 118 additions & 39 deletions healthtools/scrapers/base_scraper.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from bs4 import BeautifulSoup
from cStringIO import StringIO
from datetime import datetime
from healthtools.config import AWS
from elasticsearch import Elasticsearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
from serializer import JSONSerializerPython2
from healthtools.config import AWS, ES, SLACK
import requests
import boto3
import re
Expand All @@ -14,28 +17,43 @@ def __init__(self):
self.num_pages_to_scrape = None
self.site_url = None
self.fields = None
self.cloudsearch = None
self.s3_key = None
self.document_id = 0 # id for each entry, to be incremented
self.delete_file = None # contains docs to be deleted after scrape
self.s3_historical_record_key = None # s3 historical_record key
self.s3 = boto3.client("s3", **{
"aws_access_key_id": AWS["aws_access_key_id"],
"aws_secret_access_key": AWS["aws_secret_access_key"],
"region_name": 'eu-west-1',
"region_name": AWS["region_name"]
})
# set up authentication credentials
awsauth = AWS4Auth(AWS["aws_access_key_id"], AWS["aws_secret_access_key"], AWS["region_name"], 'es')
# client host for aws elastic search service
if ES['host']:
self.es_client = Elasticsearch(
hosts=ES['host'],
port=443,
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
serializer=JSONSerializerPython2()
)
else:
self.es_client = Elasticsearch('127.0.0.1')

def scrape_site(self):
'''
Scrape the whole site
'''
self.get_total_number_of_pages()
print "[{0}] ".format(re.sub(r"(\w)([A-Z])", r"\1 \2", type(self).__name__))
print "[{0}] Started Scraper.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

all_results = []
delete_batch = []
skipped_pages = 0

print "[{0}] ".format(re.sub(r"(\w)([A-Z])", r"\1 \2", type(self).__name__))
print "{{{0}}} - Started Scraper.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
self.get_total_number_of_pages()

for page_num in range(1, self.num_pages_to_scrape + 1):
url = self.site_url.format(page_num)
try:
Expand All @@ -45,31 +63,31 @@ def scrape_site(self):
print "There's something wrong with the site. Proceeding to the next scraper."
return

entries = scraped_page[0]
delete_docs = scraped_page[1]
entries, delete_docs = scraped_page

all_results.extend(entries)
delete_batch.extend(delete_docs)
except Exception as err:
skipped_pages += 1
print "ERROR: scrape_site() - source: {} - page: {} - {}".format(url, page_num, err)
self.print_error("ERROR: scrape_site() - source: {} - page: {} - {}".format(url, page_num, err))
continue
print "{{{0}}} - Scraper completed. {1} documents retrieved.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'),len(all_results))
print "[{0}] - Scraper completed. {1} documents retrieved.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S'), len(all_results))

if all_results:
all_results_json = json.dumps(all_results)
delete_batch = json.dumps(delete_batch)

self.delete_cloudsearch_docs()
self.upload_data(all_results_json)
self.delete_elasticsearch_docs()
self.upload_data(all_results)
self.archive_data(all_results_json)

# store delete operations for next scrape
delete_file = StringIO(delete_batch)
self.s3.upload_fileobj(
delete_file, "cfa-healthtools-ke",
self.delete_file)
print "{{{0}}} - Completed Scraper.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
print "[{0}] - Completed Scraper.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

return all_results

Expand All @@ -92,31 +110,37 @@ def scrape_page(self, page_url):
columns.append(self.document_id)

entry = dict(zip(self.fields, columns))
entry = self.format_for_cloudsearch(entry)
meta, entry = self.format_for_elasticsearch(entry)
entries.append(meta)
entries.append(entry)

delete_batch.append({"type": "delete", "id": entry["id"]})
delete_batch.append({
"delete":
{
"_index": ES['index'],
"_type": meta['index']['_type'],
"_id": entry["id"]
}})
self.document_id += 1
return entries, delete_batch
except Exception as err:
if self.retries >= 5:
print "ERROR: Failed to scrape data from page {} -- {}".format(page_url, str(err))
self.print_error("ERROR: Failed to scrape data from page {} -- {}".format(page_url, str(err)))
return err
else:
self.retries += 1
self.scrape_page(page_url)

def upload_data(self, payload):
'''
Upload data to AWS Cloud Search
Upload data to Elastic Search
'''
try:
response = self.cloudsearch.upload_documents(
documents=payload, contentType="application/json"
)
# bulk index the data and use refresh to ensure that our data will be immediately available
response = self.es_client.bulk(index=ES['index'], body=payload, refresh=True)
return response
except Exception as err:
print "ERROR - upload_data() - {} - {}".format(type(self).__name__, str(err))
self.print_error("ERROR - upload_data() - {} - {}".format(type(self).__name__, str(err)))

def archive_data(self, payload):
'''
Expand All @@ -137,34 +161,63 @@ def archive_data(self, payload):
CopySource="cfa-healthtools-ke/" + self.s3_key,
Key=self.s3_historical_record_key.format(
date))
print "{{{0}}} - Archived data has been updated.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
print "[{0}] - Archived data has been updated.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
return
else:
print "{{{0}}} - Data Scraped does not differ from archived data.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
print "[{0}] - Data Scraped does not differ from archived data.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

except Exception as err:
print "ERROR - archive_data() - {} - {}".format(self.s3_key, str(err))
self.print_error("ERROR - archive_data() - {} - {}".format(self.s3_key, str(err)))

def delete_cloudsearch_docs(self):
def delete_elasticsearch_docs(self):
'''
Delete documents that were uploaded to cloudsearch in the last scrape
Delete documents that were uploaded to elasticsearch in the last scrape
'''
try:
# get the type to use with the index depending on the calling method
if 'clinical' in re.sub(r"(\w)([A-Z])", r"\1 \2", type(self).__name__).lower():
_type = 'clinical-officers'
elif 'doctors' in re.sub(r"(\w)([A-Z])", r"\1 \2", type(self).__name__).lower():
_type = 'doctors'
else:
_type = 'health-facilities'

# get documents to be deleted
delete_docs = self.s3.get_object(
Bucket="cfa-healthtools-ke",
Key=self.delete_file)['Body'].read()

# delete
response = self.cloudsearch.upload_documents(
documents=delete_docs, contentType="application/json"
)
try:
response = self.es_client.bulk(index=ES['index'], body=delete_docs, refresh=True)
except:
# incase records are saved in cloudsearch's format, reformat for elasticsearch deletion
delete_records = []
for record in json.loads(delete_docs):
try:
delete_records.append({
"delete": {
"_index": ES['index'],
"_type": _type,
"_id": record['delete']["_id"]
}
})
except:
delete_records.append({
"delete": {
"_index": ES['index'],
"_type": _type,
"_id": record["id"]
}
})
response = self.es_client.bulk(index=ES['index'], body=delete_records)
return response
except Exception as err:
if "NoSuchKey" in err:
print "ERROR - delete_cloudsearch_docs() - no delete file present"
self.print_error("ERROR - delete_elasticsearch_docs() - no delete file present")
return
print "ERROR - delete_cloudsearch_docs() - {} - {}".format(type(self).__name__, str(err))
self.print_error("ERROR - delete_elasticsearch_docs() - {} - {}".format(type(self).__name__, str(err)))

def get_total_number_of_pages(self):
'''
Expand All @@ -177,8 +230,7 @@ def get_total_number_of_pages(self):
pattern = re.compile("(\d+) pages?")
self.num_pages_to_scrape = int(pattern.search(text).group(1))
except Exception as err:
print "ERROR: **get_total_page_numbers()** - url: {} - err: {}".\
format(self.site_url, str(err))
self.print_error("ERROR: **get_total_page_numbers()** - url: {} - err: {}".format(self.site_url, str(err)))
return

def make_soup(self, url):
Expand All @@ -189,8 +241,35 @@ def make_soup(self, url):
soup = BeautifulSoup(response.content, "html.parser")
return soup

def format_for_cloudsearch(self, entry):
'''
Format entry into cloudsearch ready document
'''
return {"id": entry["id"], "type": "add", "fields": entry}
def format_for_elasticsearch(self, entry):
"""
Format entry into elasticsearch ready document
:param entry: the data to be formatted
:return: dictionaries of the entry's metadata and the formatted entry
"""
# all bulk data need meta data describing the data
meta_dict = {
"index": {
"_index": "index",
"_type": "type",
"_id": "id"
}
}
return meta_dict, entry

def print_error(self, message):
"""
print error messages in the terminal
if slack webhook is set up post the errors to slack
"""
print('[{0}] - '.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) + message)
response = None
if SLACK['url']:
response = requests.post(
SLACK['url'],
data=json.dumps(
{"text": "```{}```".format(message)}
),
headers={'Content-Type': 'application/json'}
)
return response

0 comments on commit b963494

Please sign in to comment.