# A List of Specimens For all Phenoscape Taxonomy



In [20]:
from pyspark.sql.functions import col, isnull, lower, split, sum
from pyspark.sql.types import IntegerType

In [21]:
# Load CSV after placing it on HDFS
ps_tax = (sqlContext
           .read
           .option("header", "true")
           .csv("/home/mjcollin/queryResults_phenoscape_taxonomy.csv")
           .cache()
           )

In [22]:
print(ps_tax.count())
ps_tax.printSchema()
ps_tax.show(3)

6727
root
 |-- matrix_taxon: string (nullable = true)
 |-- vto: string (nullable = true)
 |-- vto_label: string (nullable = true)

+--------------------+--------------------+--------------------+
|        matrix_taxon|                 vto|           vto_label|
+--------------------+--------------------+--------------------+
|Scyliorhinus retifer|http://purl.oboli...|Scyliorhinus retifer|
|     Mustelus laevis|http://purl.oboli...|   Mustelus mustelus|
| Cynocephalus volans|http://purl.oboli...| Cynocephalus volans|
+--------------------+--------------------+--------------------+
only showing top 3 rows



Check to see how distinct things are by vto_label

In [23]:
(ps_tax
.groupBy(col("vto_label"))
.count()
.orderBy(col("count"), ascending=False)
).show(10, truncate=False)

+-------------------------+-----+
|vto_label                |count|
+-------------------------+-----+
|Kassina senegalensis     |10   |
|Plicofollis argyropleuron|9    |
|Netuma thalassina        |9    |
|Chrysichthys auratus     |6    |
|Eoherpeton watsoni       |5    |
|Brustiarius solidus      |5    |
|Whatcheeria deltae       |5    |
|Panderichthys rhombolepis|5    |
|Greererpeton burkemorani |5    |
|Crassigyrinus scoticus   |5    |
+-------------------------+-----+
only showing top 10 rows



We need to trim this a bit to avoid multiple joined rows with a distinct. Also, set up the columns to make the join a bit shorter. Note that indexing past the lenght of an array conveniently results in a NULL, not an error.

In [24]:
ps_tax_split = (ps_tax
.select(col("vto"),
        lower(split(col("vto_label"), " ")[0]).alias("tok1"),
        lower(split(col("vto_label"), " ")[1]).alias("tok2"))
.distinct()
)
print(ps_tax_split.count())
ps_tax_split.show(3)

5918
+--------------------+-----------+------------+
|                 vto|       tok1|        tok2|
+--------------------+-----------+------------+
|http://purl.oboli...|squaliforma|    squalina|
|http://purl.oboli...|   neoarius|    graeffei|
|http://purl.oboli...|    cobitis|takatsuensis|
+--------------------+-----------+------------+
only showing top 3 rows



Join things up

In [None]:
idb_df = sqlContext.read.parquet("/guoda/data/idigbio-20171209T023310.parquet")

In [None]:
ps_specimen_list = (ps_tax_split
                   .join(idb_df.select(col("uuid"), col("family"), col("genus"), col("specificepithet"), col("geopoint")), 
                        (col("tok1") == col("genus")) & (col("tok2") == col("specificepithet"))
                         | (col("tok1") == col("family")),
                         "left"
                        )
                    .cache()
                   )
#ps_specimen_list.show(3)

In [None]:
print(ps_specimen_list.count())

How many specimens did we get by taxon?

In [None]:
taxa_counts = (ps_specimen_list
 .groupBy(col("genus"), col("specificepithet"))
 .count()
 .orderBy(col("count"), ascending=False)
 ).toPandas()
#.show(100, truncate=False)

In [None]:
%matplotlib inline
taxa_counts.plot(y="count")

What is present in Phenoscape that didn't join in iDigBio?

In [None]:
unmatched = (ps_specimen_list
.filter(isnull(col("uuid")))
.select(col("tok1"), col("tok2"))
.orderBy(col("tok1"))
)
print(unmatched.count())
unmatched.show(100, truncate=False)

In [None]:
(ps_specimen_list
.select(col("uuid"), col("vto"),
        col("family"), col("genus"), col("specificepithet"), 
        col("geopoint.lat").alias("lat"),
        col("geopoint.lon").alias("lon"))
.repartition(1)
.write
.mode('overwrite')
.csv("/tmp/pheno_specimen.csv", header=True)
)