In [4]:
import os.path
import json
import uuid

namespace = uuid.UUID('90181196-fecf-5082-a4c1-411d4f314cda')
empty_uuid = uuid.uuid5(namespace, "")

def parse_spark(names):
    from pyspark.mllib.common import _py2java, _java2py
    parser = sc._jvm.org.globalnames.parser.spark.Parser()
    result = parser.parse(_py2java(sc, names), False, True)
    return _java2py(sc, result)

names = sc.parallelize(["Homo sapiens Linnaeus 1758",
                        "Salinator solida (Martens, 1878)",
                        "Taraxacum officinale F. H. Wigg."])
parse_spark(names).map(lambda x: json.loads(x)["canonical_name"]["value"]).collect()

['Homo sapiens', 'Salinator solida', 'Taraxacum officinale']

In [5]:
csvs_dir = 'csvs'

In [3]:
%%bash
mkdir -p mysql-export
mysql -B -h $MYSQL_HOST -u$MYSQL_USER --password=$MYSQL_PASS $MYSQL_DB \
    -e "SELECT id, \
               REPLACE(REPLACE(REPLACE(name, '\r\n', ' '), '\n', ' '), '\t', ' ') as name, \
               REPLACE(REPLACE(REPLACE(normalized, '\r\n', ' '), '\n', ' '), '\t', ' ') as normalized, \
               canonical_form_id, \
               has_words, \
               uuid, \
               has_groups, \
               surrogate \
        FROM name_strings \
        -- where id in (38990431, 78688931, 38618228, 83329994, 38618226, 83329995, 38618227, 83329996, 38619074, 81729304) \
        -- LIMIT 1000000" 1> mysql-export/name_strings.tsv



In [17]:
df = spark.read.csv(os.path.join("mysql-export", "name_strings.tsv"), header=True, quote="", sep="\t")

names = df.rdd.map(lambda x: x["name"])

parsed_names_json = parse_spark(names) \
  .map(lambda result: json.loads(result))

In [6]:
%%time

output_dir_canonicals = os.path.join(csvs_dir, "name_strings")

! rm -rf $output_dir_canonicals

print(output_dir_canonicals)

def extract_name_strings_fields(result_json):
    name_string_id = result_json["name_string_id"]
    verbatim = result_json["verbatim"]
    if result_json["parsed"]:
        canonical_name = result_json["canonical_name"]["value"]
        canonical_name_uuid = result_json["canonical_name"]["id"]
    else:
        canonical_name = ""
        canonical_name_uuid = str(empty_uuid)
    return "\t".join([name_string_id, verbatim, canonical_name_uuid, canonical_name])

canonical_names = parsed_names_json \
    .map(extract_name_strings_fields) \
    .distinct() \
    .saveAsTextFile(output_dir_canonicals)

csvs/name_strings
CPU times: user 200 ms, sys: 12 ms, total: 212 ms
Wall time: 17min 56s


In [12]:
%%time

word_targets = ["author_word", "genus", "uninomial", "year", 
                "approximate_year", "infrageneric_epithet", "specific_epithet"]

def extract_word_indexes(parsed_name_json, word_target):
    verbatim = parsed_name_json["verbatim"]
    name_string_id = parsed_name_json["name_string_id"]
    words_pos = filter(lambda j: j[0] == word_target, parsed_name_json["positions"])
    words = map(lambda word_pos: verbatim[word_pos[1]:word_pos[2]], words_pos)
    return map(lambda word: "\t".join([name_string_id, word]), words)

for word_target in word_targets:
    print("processing:", word_target)
    output_dir_word_target = os.path.join(csvs_dir, "word_indexes", word_target)
    
    ! rm -rf $output_dir_word_target
    
    parsed_names_json \
      .filter(lambda parsed_name: "positions" in parsed_name) \
      .flatMap(lambda parsed_name: extract_word_indexes(parsed_name, word_target)) \
      .distinct() \
      .saveAsTextFile(output_dir_word_target)

processing: author_word
processing: genus
processing: uninomial
processing: year
processing: approximate_year
processing: infrageneric_epithet
processing: specific_epithet
CPU times: user 1.33 s, sys: 256 ms, total: 1.58 s
Wall time: 2h 11min 20s


In [84]:
%%bash
mkdir -p mysql-export
mysql -B -h $MYSQL_HOST -u$MYSQL_USER --password=$MYSQL_PASS $MYSQL_DB \
    -e "SELECT \
        nsi.data_source_id , \
        ns.id , \
        REPLACE( \
          REPLACE( \
            REPLACE(name , '\r\n' , ' ') , \
            '\n' , \
            ' ' \
          ) , \
          '\t' , \
          ' ' \
        ) AS name , \
        REPLACE( \
          REPLACE( \
            REPLACE(url , '\r\n' , ' ') , \
            '\n' , \
            ' ' \
          ) , \
          '\t' , \
          ' ' \
        ) AS url \
      FROM name_string_indices AS nsi \
      JOIN name_strings AS ns ON nsi.name_string_id = ns.id \
      -- LIMIT 1000" 1> mysql-export/name_string_indices.tsv

bash: line 24: unexpected EOF while looking for matching `''
bash: line 28: syntax error: unexpected end of file


In [12]:
df = spark.read.csv(os.path.join("mysql-export", "name_string_indices.tsv"), header=True, sep='\t')

names = df.rdd.map(lambda x: x["name"])

output_dir_name_string_indices = os.path.join(csvs_dir, "name_string_indices")

! rm -rf $output_dir_name_string_indices

def name_string_index_result(value):
    result_json, name_string_row = value
    name_string_id = result_json["name_string_id"]
    url = name_string_row["url"].replace("\t", " ")
    return "\t".join([name_string_row["data_source_id"], name_string_id, url])

parsed_names_json = parse_spark(names) \
  .map(lambda result: json.loads(result)) \
  .zip(df.rdd) \
  .map(name_string_index_result)

parsed_names_json.saveAsTextFile(output_dir_name_string_indices)