diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..321dde2f2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +__pycache__/ +.DS_Store + diff --git a/src/acquisition/wiki/__init__.py b/src/acquisition/wiki/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/acquisition/wiki/create_wiki_meta_table.sql b/src/acquisition/wiki/create_wiki_meta_table.sql new file mode 100644 index 000000000..c4e1e6ffb --- /dev/null +++ b/src/acquisition/wiki/create_wiki_meta_table.sql @@ -0,0 +1,18 @@ +CREATE TABLE `wiki_meta`( + `id` INT(11) NOT NULL PRIMARY KEY AUTO_INCREMENT, + `datetime` DATETIME NOT NULL , + `date` date NOT NULL , + `epiweek` INT(11) NOT NULL , + `total` INT(11) NOT NULL , + UNIQUE KEY `datetime` (`datetime`) +); + +# Add a column `language` to the wiki_meta table +ALTER TABLE `wiki_meta` +ADD `language` CHAR(2) NOT NULL DEFAULT 'en'; + +# Another step is to update the UNIQUE KEY +ALTER TABLE `wiki_meta` + DROP INDEX `datetime`, + ADD UNIQUE KEY `datetime` (`datetime`, `language`); + diff --git a/src/acquisition/wiki/create_wiki_raw_table.sql b/src/acquisition/wiki/create_wiki_raw_table.sql new file mode 100644 index 000000000..557155e41 --- /dev/null +++ b/src/acquisition/wiki/create_wiki_raw_table.sql @@ -0,0 +1,13 @@ +CREATE TABLE `wiki_raw` ( + `id` int(11) NOT NULL PRIMARY KEY AUTO_INCREMENT, + `name` varchar(64) NOT NULL, + `hash` char(32) NOT NULL, + `status` int(11) NOT NULL DEFAULT '0', + `size` int(11) DEFAULT NULL, + `datetime` datetime DEFAULT NULL, + `worker` varchar(256) DEFAULT NULL, + `elapsed` float DEFAULT NULL, + `data` varchar(2048) DEFAULT NULL, + UNIQUE KEY `name` (`name`), + KEY `status` (`status`) +); \ No newline at end of file diff --git a/src/acquisition/wiki/create_wiki_table.sql b/src/acquisition/wiki/create_wiki_table.sql new file mode 100644 index 000000000..9b64e9b97 --- /dev/null +++ b/src/acquisition/wiki/create_wiki_table.sql @@ -0,0 +1,18 @@ +CREATE TABLE `wiki`( + `id` INT(11) NOT NULL PRIMARY KEY AUTO_INCREMENT, + `datetime` DATETIME NOT NULL , + `article` VARCHAR(64) NOT NULL , + `count` INT(11) NOT NULL , + UNIQUE KEY `datetime` (`datetime`, `article`), + KEY `datetime_2` (`datetime`), + KEY `article` (`article`) +); + +# Add a column `language` to the wiki table +ALTER TABLE `wiki` +ADD `language` CHAR(2) NOT NULL DEFAULT 'en'; + +# Another step is to update the UNIQUE KEY +ALTER TABLE `wiki` + DROP INDEX `datetime`, + ADD UNIQUE KEY `datetime` (`datetime`, `article`, `language`); diff --git a/src/acquisition/wiki/wiki_download.py b/src/acquisition/wiki/wiki_download.py index 7c66e56a9..64e252345 100644 --- a/src/acquisition/wiki/wiki_download.py +++ b/src/acquisition/wiki/wiki_download.py @@ -46,12 +46,15 @@ import json import subprocess import time +import os +from sys import platform + +from wiki_util import Articles VERSION = 10 MASTER_URL = 'https://delphi.midas.cs.cmu.edu/~automation/public/wiki/master.php' - def text(data_string): return str(data_string.decode('utf-8')) @@ -68,64 +71,79 @@ def get_hmac_sha256(key, msg): return hmac.new(key_bytes, msg_bytes, hashlib.sha256).hexdigest() +def extract_article_counts(filename, language, articles, debug_mode): + """ + Support multiple languages ('en' | 'es' | 'pt') + Running time optimized to O(M), which means only need to scan the whole file once + :param filename: + :param language: Different languages such as 'en', 'es', and 'pt' + :param articles: + :param debug_mode: + :return: + """ + counts = {} + articles_set = set(map(lambda x: x.lower(), articles)) + total = 0 + with open(filename, "r", encoding="utf8") as f: + for line in f: + content = line.strip().split() + if len(content) != 4: + print('unexpected article format: {0}'.format(line)) + continue + article_title = content[1].lower() + article_count = int(content[2]) + if content[0] == language: + total += article_count + if content[0] == language and article_title in articles_set: + if(debug_mode): + print("Find article {0}: {1}".format(article_title, line)) + counts[article_title] = article_count + if debug_mode: + print("Total number of counts for language {0} is {1}".format(language, total)) + counts['total'] = total + return counts + + +def extract_article_counts_orig(articles, debug_mode): + """ + The original method which extracts article counts by shell command grep (only support en articles). + As it is difficult to deal with other languages (utf-8 encoding), we choose to use python read files. + Another things is that it is slower to go over the whole file once and once again, the time complexity is O(NM), + where N is the number of articles and M is the lines in the file + In our new implementation extract_article_counts(), the time complexity is O(M), and it can cope with utf8 encoding + :param articles: + :param debug_mode: + :return: + """ + counts = {} + for article in articles: + if debug_mode: + print(' %s' % (article)) + out = text( + subprocess.check_output('LC_ALL=C grep -a -i "^en %s " raw2 | cat' % (article.lower()), shell=True)).strip() + count = 0 + if len(out) > 0: + for line in out.split('\n'): + fields = line.split() + if len(fields) != 4: + print('unexpected article format: [%s]' % (line)) + else: + count += int(fields[2]) + # print ' %4d %s'%(count, article) + counts[article.lower()] = count + if debug_mode: + print(' %d' % (count)) + print('getting total count...') + out = text(subprocess.check_output( + 'cat raw2 | LC_ALL=C grep -a -i "^en " | cut -d" " -f 3 | awk \'{s+=$1} END {printf "%.0f", s}\'', shell=True)) + total = int(out) + if debug_mode: + print(total) + counts['total'] = total + return counts + + def run(secret, download_limit=None, job_limit=None, sleep_time=1, job_type=0, debug_mode=False): - articles = [ - 'Influenza_B_virus', - 'Influenza_A_virus', - 'Human_flu', - 'Influenzavirus_C', - 'Oseltamivir', - 'Influenza', - 'Influenzavirus_A', - 'Influenza_A_virus_subtype_H1N1', - 'Zanamivir', - 'Influenza-like_illness', - 'Common_cold', - 'Sore_throat', - 'Flu_season', - 'Chills', - 'Fever', - 'Influenza_A_virus_subtype_H2N2', - 'Swine_influenza', - 'Shivering', - 'Canine_influenza', - 'Influenza_A_virus_subtype_H3N2', - 'Neuraminidase_inhibitor', - 'Influenza_pandemic', - 'Viral_pneumonia', - 'Influenza_prevention', - 'Influenza_A_virus_subtype_H1N2', - 'Rhinorrhea', - 'Orthomyxoviridae', - 'Nasal_congestion', - 'Gastroenteritis', - 'Rimantadine', - 'Paracetamol', - 'Amantadine', - 'Viral_neuraminidase', - 'Headache', - 'Influenza_vaccine', - 'Vomiting', - 'Cough', - 'Influenza_A_virus_subtype_H5N1', - 'Nausea', - 'Avian_influenza', - 'Influenza_A_virus_subtype_H7N9', - 'Influenza_A_virus_subtype_H10N7', - 'Influenza_A_virus_subtype_H9N2', - 'Hemagglutinin_(influenza)', - 'Influenza_A_virus_subtype_H7N7', - 'Fatigue_(medical)', - 'Myalgia', - 'Influenza_A_virus_subtype_H7N3', - 'Malaise', - 'Equine_influenza', - 'Cat_flu', - 'Influenza_A_virus_subtype_H3N8', - 'Antiviral_drugs', - 'Influenza_A_virus_subtype_H7N2', - ] - articles = sorted(articles) worker = text(subprocess.check_output("echo `whoami`@`hostname`", shell=True)).strip() print('this is [%s]'%(worker)) @@ -151,7 +169,20 @@ def run(secret, download_limit=None, job_limit=None, sleep_time=1, job_type=0, d return else: raise Exception('server response code (get) was %d'%(code)) - job = json.loads(text(req.readlines()[0])) + # Make the code compatible with mac os system + if platform == "darwin": + job_content = text(req.readlines()[1]) + else: + job_content = text(req.readlines()[0]) + if job_content == 'no jobs': + print('no jobs available') + if download_limit is None and job_limit is None: + time.sleep(60) + continue + else: + print('nothing to do, exiting') + return + job = json.loads(job_content) print('received job [%d|%s]'%(job['id'], job['name'])) # updated parsing for pageviews - maybe use a regex in the future #year, month = int(job['name'][11:15]), int(job['name'][15:17]) @@ -161,7 +192,9 @@ def run(secret, download_limit=None, job_limit=None, sleep_time=1, job_type=0, d print('downloading file [%s]...'%(url)) subprocess.check_call('curl -s %s > raw.gz'%(url), shell=True) print('checking file size...') - size = int(text(subprocess.check_output('ls -l raw.gz | cut -d" " -f 5', shell=True))) + # Make the code cross-platfrom, so use python to get the size of the file + # size = int(text(subprocess.check_output('ls -l raw.gz | cut -d" " -f 5', shell=True))) + size = os.stat("raw.gz").st_size if debug_mode: print(size) total_download += size @@ -180,29 +213,18 @@ def run(secret, download_limit=None, job_limit=None, sleep_time=1, job_type=0, d #subprocess.check_call('rm raw', shell=True) subprocess.check_call('mv raw raw2', shell=True) print('extracting article counts...') + + # Use python to read the file and extract counts, if you want to use the original shell method, please use counts = {} - for article in articles: + for language in Articles.available_languages: + lang2articles = {'en': Articles.en_articles, 'es': Articles.es_articles, 'pt': Articles.pt_articles} + articles = lang2articles[language] + articles = sorted(articles) if debug_mode: - print(' %s'%(article)) - out = text(subprocess.check_output('LC_ALL=C grep -a -i "^en %s " raw2 | cat'%(article.lower()), shell=True)).strip() - count = 0 - if len(out) > 0: - for line in out.split('\n'): - fields = line.split() - if len(fields) != 4: - print('unexpected article format: [%s]'%(line)) - else: - count += int(fields[2]) - #print ' %4d %s'%(count, article) - counts[article.lower()] = count - if debug_mode: - print(' %d'%(count)) - print('getting total count...') - out = text(subprocess.check_output('cat raw2 | LC_ALL=C grep -a -i "^en " | cut -d" " -f 3 | awk \'{s+=$1} END {printf "%.0f", s}\'', shell=True)) - total = int(out) - if debug_mode: - print(total) - counts['total'] = total + print("Language is {0} and target articles are {1}".format(language, articles)) + temp_counts = extract_article_counts("raw2", language, articles, debug_mode) + counts[language] = temp_counts + if not debug_mode: print('deleting files...') subprocess.check_call('rm raw2', shell=True) diff --git a/src/acquisition/wiki/wiki_extract.py b/src/acquisition/wiki/wiki_extract.py index 846c58c9e..19665cdea 100644 --- a/src/acquisition/wiki/wiki_extract.py +++ b/src/acquisition/wiki/wiki_extract.py @@ -1,4 +1,4 @@ -""" +""" =============== === Purpose === =============== @@ -12,36 +12,36 @@ === Changelog === ================= -2017-02-23 - * secrets and minor cleanup -2016-08-14 - * use pageviews instead of pagecounts-raw - * default job limit from 1000 to 100 +2017-02-23 + * secrets and minor cleanup +2016-08-14 + * use pageviews instead of pagecounts-raw + * default job limit from 1000 to 100 2015-08-11 - + Store total and other metadata in `wiki_meta` + + Store total and other metadata in `wiki_meta` 2015-05-21 - * Original version -""" + * Original version +""" -# standard library +# standard library from datetime import datetime, timedelta -import json - -# third party -import mysql.connector - -# first party -import delphi.operations.secrets as secrets - +import json + +# third party +import mysql.connector + +# first party +import delphi.operations.secrets as secrets + def floor_timestamp(timestamp): return datetime(timestamp.year, timestamp.month, timestamp.day, timestamp.hour) - - + + def ceil_timestamp(timestamp): return floor_timestamp(timestamp) + timedelta(hours=1) - - + + def round_timestamp(timestamp): before = floor_timestamp(timestamp) after = ceil_timestamp(timestamp) @@ -49,18 +49,18 @@ def round_timestamp(timestamp): return before else: return after - - + + def get_timestamp(name): # new parsing for pageviews compared to pagecounts - maybe switch to regex in the future #return datetime(int(name[11:15]), int(name[15:17]), int(name[17:19]), int(name[20:22]), int(name[22:24]), int(name[24:26])) return datetime(int(name[10:14]), int(name[14:16]), int(name[16:18]), int(name[19:21]), int(name[21:23]), int(name[23:25])) - - + + def run(job_limit=100): - # connect to the database - u, p = secrets.db.epi - cnx = mysql.connector.connect(user=u, password=p, database='epidata') + # connect to the database + u, p = secrets.db.epi + cnx = mysql.connector.connect(user=u, password=p, database='epidata') cur = cnx.cursor() # find jobs that are queued for extraction @@ -71,14 +71,17 @@ def run(job_limit=100): print('Processing data from %d jobs'%(len(jobs))) # get the counts from the json object and insert into (or update) the database - for (id, name, data) in jobs: + # Notice that data_collect contains data with different languages + for (id, name, data_collect) in jobs: print('processing job [%d|%s]...'%(id, name)) timestamp = round_timestamp(get_timestamp(name)) - for article in sorted(data.keys()): - count = data[article] - cur.execute('INSERT INTO `wiki` (`datetime`, `article`, `count`) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE `count` = `count` + %s', (str(timestamp), article, count, count)) - if article == 'total': - cur.execute('INSERT INTO `wiki_meta` (`datetime`, `date`, `epiweek`, `total`) VALUES (%s, date(%s), yearweek(%s, 6), %s) ON DUPLICATE KEY UPDATE `total` = `total` + %s', (str(timestamp), str(timestamp), str(timestamp), count, count)) + for language in data_collect.keys(): + data = data_collect[language] + for article in sorted(data.keys()): + count = data[article] + cur.execute('INSERT INTO `wiki` (`datetime`, `article`, `count`, `language`) VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE `count` = `count` + %s', (str(timestamp), article, count, language, count)) + if article == 'total': + cur.execute('INSERT INTO `wiki_meta` (`datetime`, `date`, `epiweek`, `total`, `language`) VALUES (%s, date(%s), yearweek(%s, 6), %s, %s) ON DUPLICATE KEY UPDATE `total` = `total` + %s', (str(timestamp), str(timestamp), str(timestamp), count, language, count)) # update the job cur.execute('UPDATE `wiki_raw` SET `status` = 3 WHERE `id` = %s', (id,)) @@ -86,7 +89,7 @@ def run(job_limit=100): cur.close() cnx.commit() cnx.close() - - + + if __name__ == '__main__': run() diff --git a/src/acquisition/wiki/wiki_update.py b/src/acquisition/wiki/wiki_update.py index 27d9eb0d5..411544810 100644 --- a/src/acquisition/wiki/wiki_update.py +++ b/src/acquisition/wiki/wiki_update.py @@ -49,6 +49,10 @@ def round_timestamp(timestamp): def get_timestamp(name): + # If the program is cold start (there are no previous names in the table, and the name will be None) + if name is None: + curr = datetime.now() + return datetime(curr.year, curr.month, curr.day, curr.hour, curr.minute, curr.second) # new parsing for pageviews compared to pagecounts - maybe switch to regex in the future #return datetime(int(name[11:15]), int(name[15:17]), int(name[17:19]), int(name[20:22]), int(name[22:24]), int(name[24:26])) return datetime(int(name[10:14]), int(name[14:16]), int(name[16:18]), int(name[19:21]), int(name[21:23]), int(name[23:25])) @@ -95,7 +99,7 @@ def run(): # find access logs newer than the most recent job new_logs = {} for (hash, name) in manifest: - if name > max_name: + if max_name is None or name > max_name: new_logs[name] = hash print(' New job: %s [%s]'%(name, hash)) print('Found %d new job(s)'%(len(new_logs))) diff --git a/src/acquisition/wiki/wiki_util.py b/src/acquisition/wiki/wiki_util.py new file mode 100644 index 000000000..ed3c743bc --- /dev/null +++ b/src/acquisition/wiki/wiki_util.py @@ -0,0 +1,159 @@ + + + +class Articles: + + # Notice that all languages must be two chars, because that `language` column in table `wiki` is CHAR(2) + available_languages = ['en', 'es', 'pt'] + + en_articles_flu = [ + 'Influenza_B_virus', + 'Influenza_A_virus', + 'Human_flu', + 'Influenzavirus_C', + 'Oseltamivir', + 'Influenza', + 'Influenzavirus_A', + 'Influenza_A_virus_subtype_H1N1', + 'Zanamivir', + 'Influenza-like_illness', + 'Common_cold', + 'Sore_throat', + 'Flu_season', + 'Chills', + 'Fever', + 'Influenza_A_virus_subtype_H2N2', + 'Swine_influenza', + 'Shivering', + 'Canine_influenza', + 'Influenza_A_virus_subtype_H3N2', + 'Neuraminidase_inhibitor', + 'Influenza_pandemic', + 'Viral_pneumonia', + 'Influenza_prevention', + 'Influenza_A_virus_subtype_H1N2', + 'Rhinorrhea', + 'Orthomyxoviridae', + 'Nasal_congestion', + 'Gastroenteritis', + 'Rimantadine', + 'Paracetamol', + 'Amantadine', + 'Viral_neuraminidase', + 'Headache', + 'Influenza_vaccine', + 'Vomiting', + 'Cough', + 'Influenza_A_virus_subtype_H5N1', + 'Nausea', + 'Avian_influenza', + 'Influenza_A_virus_subtype_H7N9', + 'Influenza_A_virus_subtype_H10N7', + 'Influenza_A_virus_subtype_H9N2', + 'Hemagglutinin_(influenza)', + 'Influenza_A_virus_subtype_H7N7', + 'Fatigue_(medical)', + 'Myalgia', + 'Influenza_A_virus_subtype_H7N3', + 'Malaise', + 'Equine_influenza', + 'Cat_flu', + 'Influenza_A_virus_subtype_H3N8', + 'Antiviral_drugs', + 'Influenza_A_virus_subtype_H7N2', + ] + + en_articles_noro = [ + 'Norovirus', + 'Diarrhea', + 'Dehydration', + 'Gastroenteritis', + 'Vomiting', + 'Abdominal_pain', + 'Nausea', + 'Foodborne_illness', + 'Rotavirus', + 'Fecal–oral_route', + 'Intravenous_therapy', + 'Oral_rehydration_therapy', + 'Shellfish', + 'Caliciviridae', + 'Leaky_scanning', + ] + + en_articles_dengue = [ + 'Dengue_fever', + 'Dengue_virus', + 'Aedes', + 'Aedes_aegypti', + 'Dengue_vaccine', + 'Mosquito', + 'Mosquito-borne_disease', + 'Blood_transfusion', + 'Paracetamol', + 'Fever', + 'Headache', + 'Rhinitis', + 'Flavivirus', + 'Exanthem', + 'Myalgia', + 'Arthralgia', + 'Thrombocytopenia', + 'Hematuria', + 'Nosebleed', + 'Petechia', + 'Nausea', + 'Vomiting', + 'Diarrhea', + ] + + en_articles = list(set(en_articles_flu + en_articles_noro + en_articles_dengue)) + + es_articles = [ + 'Dengue', + 'Virus_dengue', + 'Aedes', + 'Aedes_aegypti', + 'Culicidae', + 'Transfusión_de_sangre', + 'Paracetamol', + 'Fiebre', + 'Cefalea', + 'Coriza', + 'Flavivirus', + 'Exantema', + 'Mosquito', + 'Mialgia', + 'Artralgia', + 'Trombocitopenia', + 'Hematuria', + 'Epistaxis', + 'Petequia', + 'Náusea', + 'Vómito', + 'Diarrea', + ] + + pt_articles = [ + 'Dengue', + 'Vírus_da_dengue', + 'Aedes', + 'Aedes_aegypti', + 'Culicidae', + 'Transfusão_de_sangue', + 'Paracetamol', + 'Febre', + 'Cefaleia', + 'Coriza', + 'Flavivírus', + 'Exantema', + 'Mialgia', + 'Artralgia', + 'Trombocitopenia', + 'Hematúria', + 'Epistaxe', + 'Petéquia', + 'Náusea', + 'Vômito', + 'Diarreia', + ] diff --git a/src/client/delphi_epidata.py b/src/client/delphi_epidata.py index be0645e83..6a45afbad 100644 --- a/src/client/delphi_epidata.py +++ b/src/client/delphi_epidata.py @@ -168,7 +168,7 @@ def twitter(auth, locations, dates=None, epiweeks=None): # Fetch Wikipedia access data @staticmethod - def wiki(articles, dates=None, epiweeks=None, hours=None): + def wiki(articles, dates=None, epiweeks=None, hours=None, language='en'): """Fetch Wikipedia access data.""" # Check parameters if articles is None: @@ -179,6 +179,7 @@ def wiki(articles, dates=None, epiweeks=None, hours=None): params = { 'source': 'wiki', 'articles': Epidata._list(articles), + 'language': language, } if dates is not None: params['dates'] = Epidata._list(dates) diff --git a/src/server/api.php b/src/server/api.php index 0a7ce7f7e..903fcc0bf 100644 --- a/src/server/api.php +++ b/src/server/api.php @@ -272,15 +272,18 @@ function get_twitter($locations, $dates, $resolution) { // queries the `wiki` table // $articles (required): array of article titles +// $language (required): specify the language of articles we want to retrieve // $dates (required): array of date or epiweek values/ranges // $resolution (required): either 'daily' or 'weekly' // $hours (optional): array of hour values/ranges // if present, $hours determines which counts are used within each day; otherwise all counts are used // for example, if hours=[4], then only the 4 AM (UTC) stream is returned -function get_wiki($articles, $dates, $resolution, $hours) { +function get_wiki($articles, $language, $dates, $resolution, $hours) { // basic query info // in a few rare instances (~6 total), `total` is unreasonably high; something glitched somewhere, just ignore it - $table = '`wiki` w JOIN (SELECT * FROM `wiki_meta` WHERE `total` < 100000000) m ON m.`datetime` = w.`datetime`'; + // $table = '`wiki` w JOIN (SELECT * FROM `wiki_meta` WHERE `total` < 100000000) m ON m.`datetime` = w.`datetime`'; + // We select rows by language and then the problem is converted to the original one, and the rest of code can be same + $table = "( SELECT * FROM `wiki` WHERE `language` = '$language' ) w JOIN (SELECT * FROM `wiki_meta` WHERE `total` < 100000000 AND `language` = '$language' ) m ON m.`datetime` = w.`datetime`"; // build the date filter and set field names $fields_string = array('article'); $fields_int = array('count', 'total', 'hour'); @@ -745,9 +748,10 @@ function meta_delphi() { } } } else if($source === 'wiki') { - if(require_all($data, array('articles'))) { + if(require_all($data, array('articles', 'language'))) { // parse the request $articles = extract_values($_REQUEST['articles'], 'str'); + $language = $_REQUEST['language']; if(require_any($data, array('dates', 'epiweeks'))) { if(isset($_REQUEST['dates'])) { $resolution = 'daily'; @@ -758,7 +762,7 @@ function meta_delphi() { } $hours = isset($_REQUEST['hours']) ? extract_values($_REQUEST['hours'], 'int') : null; // get the data - $epidata = get_wiki($articles, $dates, $resolution, $hours); + $epidata = get_wiki($articles, $language, $dates, $resolution, $hours); store_result($data, $epidata); } }