Skip to content

Commit

Permalink
Merge pull request #4 from Gathondu/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
Gathondu committed Jun 28, 2017
2 parents 37a4447 + 7ad9879 commit eb9fd54
Show file tree
Hide file tree
Showing 10 changed files with 261 additions and 186 deletions.
1 change: 1 addition & 0 deletions .gitignore
@@ -1 +1,2 @@
*.pyc
/data/
24 changes: 16 additions & 8 deletions README.md
Expand Up @@ -23,13 +23,14 @@ Change directory into package `$ cd healthtools_ke`
Install the dependencies by running `$ pip install requirements.txt`

You can set the required environment variables like so
```
$ export MORPH_AWS_REGION=<aws_region>
$ export MORPH_AWS_ACCESS_KEY_ID= <aws_access_key_id>
$ export MORPH_AWS_SECRET_KEY= <aws_secret_key>
$ export ES_HOST= <elastic_search_host_endpoint> # Do not set this if you would like to use elastic search locally on your machine
$ export WEBHOOK_URL= <slack_webhook_url> # Do not set this if you don't want to post error messages on Slack
```

$ export MORPH_AWS_REGION=<aws_region>
$ export MORPH_AWS_ACCESS_KEY_ID=<aws_access_key_id>
$ export MORPH_AWS_SECRET_KEY=<aws_secret_key>
$ export MORPH_S3_BUCKET=<s3_bucket_name> # If not set, data will be archived locally in the project's folder in a folder called data
$ export MORPH_ES_HOST=<elastic_search_host_endpoint> # Do not set this if you would like to use elastic search locally on your machine
$ export MORPH_WEBHOOK_URL=<slack_webhook_url> # Do not set this if you don't want to post error messages on Slack

**If you want to use elasticsearch locally on your machine use the following instructions to set it up**

For linux and windows users, follow instructions from this [link](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html)
Expand All @@ -38,12 +39,19 @@ For mac users run `brew install elasticsearch` on your terminal

**If you want to post messages on slack**

Set up `Incoming Webhooks` [here](https://slack.com/signin?redir=%2Fservices%2Fnew%2Fincoming-webhook) and set the global environment for the `WEBHOOK_URL`
Set up `Incoming Webhooks` [here](https://slack.com/signin?redir=%2Fservices%2Fnew%2Fincoming-webhook) and set the global environment for the `MORPH_WEBHOOK_URL`

If you set up elasticsearch locally run it `$ elasticsearch`

You can now run the scrapers `$ python scraper.py` (It might take a while)

**FOR DEVELOPMENT PURPOSES**

Set the **BATCH** and **HF_BATCH** (for health facilities) in the config file that will ensure the scraper doesn't scrape entire sites but just the number
of pages that you would like it to scrape defined by this variable.

use `$ python scraper.py small_batch` to run the scrapers


## Running the tests
_**make sure if you use elasticsearch locally, it's running**_
Expand Down
14 changes: 8 additions & 6 deletions healthtools/config.py
Expand Up @@ -12,20 +12,22 @@
"aws_access_key_id": os.getenv("MORPH_AWS_ACCESS_KEY"),
"aws_secret_access_key": os.getenv("MORPH_AWS_SECRET_KEY"),
"region_name": os.getenv("MORPH_AWS_REGION", "eu-west-1"),
"s3_bucket": os.getenv("S3_BUCKET", "cfa-healthtools-ke")
"s3_bucket": os.getenv("MORPH_S3_BUCKET", None)
}

ES = {
"host": os.getenv("ES_HOST", None),
"host": os.getenv("MORPH_ES_HOST", "127.0.0.1"),
"port": os.getenv("MORPH_ES_PORT", "9200"),
"index": "healthtools"
}

SLACK = {
"url": os.getenv("WEBHOOK_URL")
"url": os.getenv("MORPH_WEBHOOK_URL")
}

TEST_DIR = os.getcwd() + "/healthtools/tests"

SLACK = {
"url": os.getenv("WEBHOOK_URL")
}
SMALL_BATCH = 5 # No of pages from clinical officers, doctors and foreign doctors sites, scrapped in development mode
SMALL_BATCH_HF = 100 # No of records scraped from health-facilities sites in development mode

DATA_DIR = os.getcwd() + "/data/"
197 changes: 129 additions & 68 deletions healthtools/scrapers/base_scraper.py
Expand Up @@ -4,12 +4,14 @@
from elasticsearch import Elasticsearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
from serializer import JSONSerializerPython2
from healthtools.config import AWS, ES, SLACK
from healthtools.config import AWS, ES, SLACK, SMALL_BATCH, DATA_DIR
import requests
import boto3
import re
import json
import hashlib
import sys
import os


class Scraper(object):
Expand All @@ -25,36 +27,45 @@ def __init__(self):
"aws_access_key_id": AWS["aws_access_key_id"],
"aws_secret_access_key": AWS["aws_secret_access_key"],
"region_name": AWS["region_name"]
})
# set up authentication credentials
awsauth = AWS4Auth(AWS["aws_access_key_id"], AWS["aws_secret_access_key"], AWS["region_name"], 'es')
# client host for aws elastic search service
if ES['host']:
self.es_client = Elasticsearch(
hosts=ES['host'],
port=443,
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
serializer=JSONSerializerPython2()
})
self.small_batch = True if "small_batch" in sys.argv else False
try:
# client host for aws elastic search service
if "aws" in ES["host"]:
# set up authentication credentials
awsauth = AWS4Auth(AWS["aws_access_key_id"], AWS["aws_secret_access_key"], AWS["region_name"], "es")
self.es_client = Elasticsearch(
hosts=ES["host"],
port=ES["port"],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
serializer=JSONSerializerPython2()
)
else:
self.es_client = Elasticsearch('127.0.0.1')
else:
self.es_client = Elasticsearch("{}:{}".format(ES["host"], ES["port"]))
except Exception as err:
self.print_error("[{}] - ERROR: Invalid Parameters For ES Client".format(
datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
# if to save locally create relevant directories
if not AWS["s3_bucket"] and not os.path.exists(DATA_DIR):
os.mkdir(DATA_DIR)
os.mkdir(DATA_DIR + "archive")
os.mkdir(DATA_DIR + "test")

def scrape_site(self):
'''
Scrape the whole site
'''
print "[{0}] ".format(re.sub(r"(\w)([A-Z])", r"\1 \2", type(self).__name__))
print "[{0}] Started Scraper.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
print "[{0}] Started Scraper.".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

all_results = []
delete_batch = []
skipped_pages = 0

self.get_total_number_of_pages()

for page_num in range(1, self.num_pages_to_scrape + 1):
url = self.site_url.format(page_num)
try:
Expand All @@ -73,7 +84,7 @@ def scrape_site(self):
self.print_error("ERROR: scrape_site() - source: {} - page: {} - {}".format(url, page_num, err))
continue
print "[{0}] - Scraper completed. {1} documents retrieved.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S'), len(all_results))
datetime.now().strftime("%Y-%m-%d %H:%M:%S"), len(all_results)/2) # don't count indexing data

if all_results:
all_results_json = json.dumps(all_results)
Expand All @@ -84,11 +95,15 @@ def scrape_site(self):
self.archive_data(all_results_json)

# store delete operations for next scrape
delete_file = StringIO(delete_batch)
self.s3.upload_fileobj(
delete_file, "cfa-healthtools-ke",
self.delete_file)
print "[{0}] - Completed Scraper.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
if AWS["s3_bucket"]:
delete_file = StringIO(delete_batch)
self.s3.upload_fileobj(
delete_file, AWS["s3_bucket"],
self.delete_file)
else:
with open(self.delete_file, "w") as delete:
json.dump(delete_batch, delete)
print "[{0}] - Completed Scraper.".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

return all_results

Expand All @@ -98,7 +113,7 @@ def scrape_page(self, page_url):
'''
try:
soup = self.make_soup(page_url)
table = soup.find('table', {"class": "zebra"}).find("tbody")
table = soup.find("table", {"class": "zebra"}).find("tbody")
rows = table.find_all("tr")

entries = []
Expand All @@ -118,8 +133,8 @@ def scrape_page(self, page_url):
delete_batch.append({
"delete":
{
"_index": ES['index'],
"_type": meta['index']['_type'],
"_index": ES["index"],
"_type": meta["index"]["_type"],
"_id": entry["id"]
}})
self.document_id += 1
Expand All @@ -138,7 +153,7 @@ def upload_data(self, payload):
'''
try:
# bulk index the data and use refresh to ensure that our data will be immediately available
response = self.es_client.bulk(index=ES['index'], body=payload, refresh=True)
response = self.es_client.bulk(index=ES["index"], body=payload, refresh=True)
return response
except Exception as err:
self.print_error("ERROR - upload_data() - {} - {}".format(type(self).__name__, str(err)))
Expand All @@ -148,25 +163,37 @@ def archive_data(self, payload):
Upload scraped data to AWS S3
'''
try:
old_etag = self.s3.get_object(
Bucket="cfa-healthtools-ke", Key=self.s3_key)["ETag"]
new_etag = hashlib.md5(payload.encode('utf-8')).hexdigest()
if eval(old_etag) != new_etag:
file_obj = StringIO(payload.encode('utf-8'))
self.s3.upload_fileobj(file_obj,
"cfa-healthtools-ke", self.s3_key)
date = datetime.today().strftime("%Y%m%d")
if AWS["s3_bucket"]:
old_etag = self.s3.get_object(
Bucket=AWS["s3_bucket"], Key=self.s3_key)["ETag"]
new_etag = hashlib.md5(payload.encode("utf-8")).hexdigest()
if eval(old_etag) != new_etag:
file_obj = StringIO(payload.encode("utf-8"))
self.s3.upload_fileobj(file_obj,
AWS["s3_bucket"], self.s3_key)

# archive historical data
date = datetime.today().strftime('%Y%m%d')
self.s3.copy_object(Bucket="cfa-healthtools-ke",
CopySource="cfa-healthtools-ke/" + self.s3_key,
Key=self.s3_historical_record_key.format(
date))
print "[{0}] - Archived data has been updated.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
return
# archive historical data
self.s3.copy_object(Bucket=AWS["s3_bucket"],
CopySource="{}/".format(AWS["s3_bucket"]) + self.s3_key,
Key=self.s3_historical_record_key.format(
date))
print "[{0}] - Archived data has been updated.".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
return
else:
print "[{0}] - Data Scraped does not differ from archived data.".format(
datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
else:
print "[{0}] - Data Scraped does not differ from archived data.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
# check if it's test and append the correct path
if "test" in self.s3_key:
self.s3_key = DATA_DIR + self.s3_key
# archive to local dir
with open(self.s3_key, "w") as data:
json.dump(payload, data)
# archive historical data to local dir
with open(self.s3_historical_record_key.format(date), "w") as history:
json.dump(payload, history)
print "[{0}] - Archived data has been updated.".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

except Exception as err:
self.print_error("ERROR - archive_data() - {} - {}".format(self.s3_key, str(err)))
Expand All @@ -177,40 +204,48 @@ def delete_elasticsearch_docs(self):
'''
try:
# get the type to use with the index depending on the calling method
if 'clinical' in re.sub(r"(\w)([A-Z])", r"\1 \2", type(self).__name__).lower():
_type = 'clinical-officers'
elif 'doctors' in re.sub(r"(\w)([A-Z])", r"\1 \2", type(self).__name__).lower():
_type = 'doctors'
if "clinical" in re.sub(r"(\w)([A-Z])", r"\1 \2", type(self).__name__).lower():
_type = "clinical-officers"
elif "doctors" in re.sub(r"(\w)([A-Z])", r"\1 \2", type(self).__name__).lower():
_type = "doctors"
else:
_type = 'health-facilities'
_type = "health-facilities"
# get documents to be deleted
delete_docs = self.s3.get_object(
Bucket="cfa-healthtools-ke",
Key=self.delete_file)['Body'].read()
if AWS["s3_bucket"]:
delete_docs = self.s3.get_object(
Bucket=AWS["s3_bucket"],
Key=self.delete_file)["Body"].read()
else:
if os.path.exists(self.delete_file):
with open(self.delete_file) as delete:
delete_docs = json.load(delete)
else:
self.print_error("ERROR - delete_elasticsearch_docs() - no delete file present")
return
# delete
try:
response = self.es_client.bulk(index=ES['index'], body=delete_docs, refresh=True)
response = self.es_client.bulk(index=ES["index"], body=delete_docs, refresh=True)
except:
# incase records are saved in cloudsearch's format, reformat for elasticsearch deletion
delete_records = []
for record in json.loads(delete_docs):
try:
delete_records.append({
"delete": {
"_index": ES['index'],
"_index": ES["index"],
"_type": _type,
"_id": record['delete']["_id"]
"_id": record["delete"]["_id"]
}
})
except:
delete_records.append({
"delete": {
"_index": ES['index'],
"_index": ES["index"],
"_type": _type,
"_id": record["id"]
}
})
response = self.es_client.bulk(index=ES['index'], body=delete_records)
response = self.es_client.bulk(index=ES["index"], body=delete_records)
return response
except Exception as err:
if "NoSuchKey" in err:
Expand All @@ -223,11 +258,15 @@ def get_total_number_of_pages(self):
Get the total number of pages to be scraped
'''
try:
soup = self.make_soup(self.site_url.format(1)) # get first page
text = soup.find("div", {"id": "tnt_pagination"}).getText()
# what number of pages looks like
pattern = re.compile("(\d+) pages?")
self.num_pages_to_scrape = int(pattern.search(text).group(1))
# ensure the number of pages set is restrained to 1-10
if self.small_batch:
self.num_pages_to_scrape = SMALL_BATCH
else:
soup = self.make_soup(self.site_url.format(1)) # get first page
text = soup.find("div", {"id": "tnt_pagination"}).getText()
# what number of pages looks like
pattern = re.compile("(\d+) pages?")
self.num_pages_to_scrape = int(pattern.search(text).group(1))
except Exception as err:
self.print_error("ERROR: **get_total_page_numbers()** - url: {} - err: {}".format(self.site_url, str(err)))
return
Expand Down Expand Up @@ -261,14 +300,36 @@ def print_error(self, message):
print error messages in the terminal
if slack webhook is set up post the errors to slack
"""
print('[{0}] - '.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) + message)
print("[{0}] - ".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + message)
response = None
if SLACK['url']:
if SLACK["url"]:
message = message.split("-")
response = requests.post(
SLACK['url'],
SLACK["url"],
data=json.dumps(
{"text": "```{}```".format(message)}
{"attachments": [{
"author_name": "{}".format(message[2]),
"color": "danger",
"pretext": "[SCRAPER] New Alert for{}:{}".format(message[2], message[1]),
"fields": [
{
"title": "Message",
"value": "{}".format(message[3]),
"short": False
},
{
"title": "Time",
"value": "{}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
"short": True
},
{
"title": "Severity",
"value": "{}".format(message[3].split(":")[1]),
"short": True
}
]
}]}
),
headers={'Content-Type': 'application/json'}
headers={"Content-Type": "application/json"}
)
return response

0 comments on commit eb9fd54

Please sign in to comment.