## Creation of the data graphs and records graph for the preston-amazon data set

In [29]:
# Verify that a Spark Context is running
sc

In [30]:
spark_df = spark.read.parquet("./preston-amazon/data-processed/core.parquet")

In [31]:
spark_df.count()

3858

In [32]:
spark_df.columns

['http://rs.tdwg.org/dwc/terms/datasetID',
 'http://rs.tdwg.org/dwc/terms/specificEpithet',
 'http://rs.tdwg.org/dwc/terms/order',
 'http://rs.tdwg.org/dwc/terms/taxonID',
 'http://rs.tdwg.org/dwc/terms/country',
 'http://plazi.org/terms/1.0/basionymYear',
 'http://gbif.org/dwc/terms/1.0/canonicalName',
 'undefined0',
 'http://rs.tdwg.org/dwc/terms/basisOfRecord',
 'http://plazi.org/terms/1.0/combinationYear',
 'http://plazi.org/terms/1.0/basionymAuthors',
 'http://rs.tdwg.org/dwc/terms/scientificName',
 'http://rs.tdwg.org/dwc/terms/decimalLatitude',
 'http://rs.tdwg.org/dwc/terms/eventDate',
 'http://rs.tdwg.org/dwc/terms/waterBody',
 'http://rs.tdwg.org/dwc/terms/acceptedNameUsageID',
 'http://rs.tdwg.org/dwc/terms/locationID',
 'http://rs.tdwg.org/dwc/terms/taxonRank',
 'http://rs.tdwg.org/dwc/terms/institutionCode',
 'http://rs.tdwg.org/dwc/terms/phylum',
 'http://purl.org/dc/terms/references',
 'http://rs.tdwg.org/dwc/terms/originalNameUsageID',
 'http://rs.tdwg.org/dwc/terms/ind

### The fields selected to be used as index are:
1) `http://rs.tdwg.org/dwc/terms/scientificName`<br>
2) `http://rs.tdwg.org/dwc/terms/decimalLatitude`<br>
3) `http://rs.tdwg.org/dwc/terms/decimalLongitude`<br>
4) `http://rs.tdwg.org/dwc/terms/eventDate`<br>
5) `http://rs.tdwg.org/dwc/terms/locality`<br>
### NOTES:
- waterBody and locality are highly correlated, but locality seems to have more variability and is more close to verbatim
- All the other fields do not seem to be useful to uniquely identify the records, because:
    - They are derived from the selected fields
    - They are created in events posterior to the collection or by non collectors.  

In [33]:
spark_df.select('`http://rs.tdwg.org/dwc/terms/scientificName`').show(3)

+-------------------------------------------+
|http://rs.tdwg.org/dwc/terms/scientificName|
+-------------------------------------------+
|                       Acestrorhynchus g...|
|                       Charax sp. “Madeira”|
|                       Ageneiosus inermi...|
+-------------------------------------------+
only showing top 3 rows



In [34]:
import pandas as pd
import networkx as nx
from pyxdameraulevenshtein import normalized_damerau_levenshtein_distance

## Creation of the Data Graphs

### Scientific Name

In [65]:
sn_list = spark_df.select("`http://rs.tdwg.org/dwc/terms/scientificName`").distinct().rdd.flatMap(lambda x: x).collect()
print(len(sn_list))

708


In [44]:
# Declare the graph of scientific names
sn_G = nx.Graph()

In [45]:
# Add the nodes to the graph
i = 0
while i < len(sn_list):
    sn_G.add_node(sn_list[i])
    i = i + 1

In [46]:
%%time
# Add the edges to the graph
i = 0
j = 0
while i < len(sn_list):
    j = i+1
    while j < len(sn_list):
        if (i != j):
            sim = 1 - normalized_damerau_levenshtein_distance (sn_list[i], sn_list[j])
            if (sim > 0.6):
                sn_G.add_edge(sn_list[i], sn_list[j], similarity=sim )
        j = j + 1
    i = i + 1

CPU times: user 13.7 s, sys: 0 ns, total: 13.7 s
Wall time: 13.7 s


### Decimal Latitude

In [66]:
lat_list = spark_df.select("`http://rs.tdwg.org/dwc/terms/decimalLatitude`").na.drop(subset=["`http://rs.tdwg.org/dwc/terms/decimalLatitude`"]).distinct().rdd.flatMap(lambda x: x).collect()
print(len(lat_list))

65


In [67]:
# Declare the graph of scientific names
lat_G = nx.Graph()

In [68]:
# Add the nodes to the graph
i = 0
while i < len(lat_list):
    if lat_list[i]:
        lat_G.add_node(str(lat_list[i]))
    i = i + 1

In [69]:
%%time
# Add the edges to the graph
i = 0
j = 0
while i < len(lat_list):
    j = i+1
    while j < len(lat_list):
        if (i != j):
            sim = 1 - normalized_damerau_levenshtein_distance (lat_list[i], lat_list[j])
            if (sim > 0.6):
                lat_G.add_edge(lat_list[i], lat_list[j], similarity=sim )
        j = j + 1
    i = i + 1

CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 8.17 ms


### Decimal Longitude

In [70]:
lon_list = spark_df.select("`http://rs.tdwg.org/dwc/terms/decimalLongitude`").na.drop(subset=["`http://rs.tdwg.org/dwc/terms/decimalLongitude`"]).distinct().rdd.flatMap(lambda x: x).collect()
print(len(lon_list))

65


In [71]:
# Declare the graph of scientific names
lon_G = nx.Graph()

In [72]:
# Add the nodes to the graph
i = 0
while i < len(lon_list):
    lon_G.add_node(lon_list[i])
    i = i + 1

In [73]:
%%time
# Add the edges to the graph
i = 0
j = 0
while i < len(lon_list):
    j = i+1
    while j < len(lon_list):
        if (i != j):
            sim = 1 - normalized_damerau_levenshtein_distance (lon_list[i], lon_list[j])
            if (sim > 0.6):
                lon_G.add_edge(lon_list[i], lon_list[j], similarity=sim )
        j = j + 1
    i = i + 1

CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 8.39 ms


### Event Date

In [74]:
ed_list = spark_df.select("`http://rs.tdwg.org/dwc/terms/eventDate`").na.drop(subset=["`http://rs.tdwg.org/dwc/terms/eventDate`"]).distinct().rdd.flatMap(lambda x: x).collect()
print(len(ed_list))

64


In [75]:
# Declare the graph of scientific names
ed_G = nx.Graph()

In [76]:
# Add the nodes to the graph
i = 0
while i < len(ed_list):
    ed_G.add_node(ed_list[i])
    i = i + 1

In [77]:
%%time
# Add the edges to the graph
i = 0
j = 0
while i < len(ed_list):
    j = i+1
    while j < len(ed_list):
        if (i != j):
            sim = 1 - normalized_damerau_levenshtein_distance (ed_list[i], ed_list[j])
            if (sim > 0.6):
                ed_G.add_edge(ed_list[i], ed_list[j], similarity=sim )
        j = j + 1
    i = i + 1

CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 10.2 ms


### Locality

In [78]:
loc_list = spark_df.select("`http://rs.tdwg.org/dwc/terms/locality`").na.drop(subset=["`http://rs.tdwg.org/dwc/terms/locality`"]).distinct().rdd.flatMap(lambda x: x).collect()
print(len(loc_list))

30


In [79]:
# Declare the graph of scientific names
loc_G = nx.Graph()

In [80]:
# Add the nodes to the graph
i = 0
while i < len(loc_list):
    loc_G.add_node(loc_list[i])
    i = i + 1

In [81]:
%%time
# Add the edges to the graph
i = 0
j = 0
while i < len(loc_list):
    j = i+1
    while j < len(loc_list):
        if (i != j):
            sim = 1 - normalized_damerau_levenshtein_distance (loc_list[i], loc_list[j])
            if (sim > 0.6):
                loc_G.add_edge(loc_list[i], loc_list[j], similarity=sim )
        j = j + 1
    i = i + 1

CPU times: user 16 ms, sys: 0 ns, total: 16 ms
Wall time: 13.4 ms


## Creation of the Records Graph

In [85]:
spark_df_5 = spark_df.select('`http://rs.tdwg.org/dwc/terms/scientificName`', 
                  '`http://rs.tdwg.org/dwc/terms/decimalLatitude`', 
                  '`http://rs.tdwg.org/dwc/terms/decimalLongitude`', 
                  '`http://rs.tdwg.org/dwc/terms/eventDate`', 
                  '`http://rs.tdwg.org/dwc/terms/locality`').toDF('s_name','latitude','longitude', 'date', 'locality')
print(spark_df_5.count())

3858


In [86]:
spark_df_5.show(5)

+--------------------+---------+---------+--------+--------------------+
|              s_name| latitude|longitude|    date|            locality|
+--------------------+---------+---------+--------+--------------------+
|Acestrorhynchus g...|-11.47791|-67.99922|19/06/01|       Manuripi lake|
|Charax sp. “Madeira”|  -12.578|-64.14555|28/07/02|  Itenez-Blanco lake|
|Ageneiosus inermi...|-13.04162|-63.78932|26/07/02|Itenez-Blanco riv...|
|Cyphocharax spilu...|-13.05622|-62.75549|11/08/02|         Itenez lake|
|Moenkhausia dichr...|-13.17601|-63.25462|17/07/02|Itenez-San Antoni...|
+--------------------+---------+---------+--------+--------------------+
only showing top 5 rows



In [90]:
# Number of potential duplicates
import pyspark.sql.functions as f

spark_df_5.groupBy(spark_df_5.columns)\
    .count()\
    .where(f.col('count') > 1)\
    .select(f.sum('count'))\
    .show()

+----------+
|sum(count)|
+----------+
|       831|
+----------+



In [96]:
df_dup = spark_df_5.groupBy(spark_df_5.columns)\
    .count()\
    .where(f.col('count') > 1)\
    .orderBy(f.asc("s_name"))

In [99]:
df_dup.show(10, False)

+-----------------------------------+--------+---------+----+--------+-----+
|s_name                             |latitude|longitude|date|locality|count|
+-----------------------------------+--------+---------+----+--------+-----+
|Abascantus grandis Becker 1977     |null    |null     |null|null    |2    |
|Abascantus lobatus Stal 1864       |null    |null     |null|null    |2    |
|Ablaptus amazonus Stal 1864        |null    |null     |null|null    |2    |
|Ablaptus varicornis Walker 1867    |null    |null     |null|null    |2    |
|Adoxoplatys willineri Kormilev 1949|null    |null     |null|null    |2    |
|Agaclitus dromedarius Stal 1864    |null    |null     |null|null    |2    |
|Agaclitus fallenii Stal 1864       |null    |null     |null|null    |2    |
|Agrilus (Agrilus) caquetai         |null    |null     |null|null    |2    |
|Agrilus (Agrilus) florae           |null    |null     |null|null    |2    |
|Agroecus griseus Dallas 1851       |null    |null     |null|null    |2    |

In [110]:
spark_df.filter((f.col('`http://rs.tdwg.org/dwc/terms/scientificName`') == "Abascantus grandis Becker 1977")).toPandas()
#spark_df.filter((f.col('`http://rs.tdwg.org/dwc/terms/scientificName`') === "Abascantus grandis Becker 1977") & (f.col("`http://rs.tdwg.org/dwc/terms/decimalLatitude`") == "null")).show(2, False)

Unnamed: 0,http://rs.tdwg.org/dwc/terms/datasetID,http://rs.tdwg.org/dwc/terms/specificEpithet,http://rs.tdwg.org/dwc/terms/order,http://rs.tdwg.org/dwc/terms/taxonID,http://rs.tdwg.org/dwc/terms/country,http://plazi.org/terms/1.0/basionymYear,http://gbif.org/dwc/terms/1.0/canonicalName,undefined0,http://rs.tdwg.org/dwc/terms/basisOfRecord,http://plazi.org/terms/1.0/combinationYear,...,http://rs.tdwg.org/dwc/terms/family,http://rs.tdwg.org/dwc/terms/dynamicProperties,http://plazi.org/terms/1.0/verbatimScientificName,http://rs.tdwg.org/dwc/terms/eventRemarks,http://rs.tdwg.org/dwc/terms/class,http://rs.tdwg.org/dwc/terms/occurrenceID,http://rs.tdwg.org/dwc/terms/nomenclaturalStatus,http://rs.tdwg.org/dwc/terms/genus,http://purl.org/dc/terms/rightsHolder,http://www.w3.org/ns/prov#wasDerivedFrom
0,,,Hemiptera,F72087F4FFC2FFDB3F9400F381273A3C.taxon,,,Abascantus grandis,,,1977,...,Hemiptera (awaiting allocation),,"Abascantus grandis Becker, 1977",,Insecta,,,Abascantus,,hash://sha256/c61c2622391ae5b8fabe7003c3228934...
1,,,Hemiptera,F72087F4FFC2FFDB3F9400F381273A3C.taxon,,,Abascantus grandis,,,1977,...,Hemiptera (awaiting allocation),,"Abascantus grandis Becker, 1977",,Insecta,,,Abascantus,,hash://sha256/4256fd83db9270d2236776bc4bd45e22...


In [111]:
spark_df.filter((f.col('`http://rs.tdwg.org/dwc/terms/scientificName`') == "Abascantus lobatus Stal 1864")).toPandas()

Unnamed: 0,http://rs.tdwg.org/dwc/terms/datasetID,http://rs.tdwg.org/dwc/terms/specificEpithet,http://rs.tdwg.org/dwc/terms/order,http://rs.tdwg.org/dwc/terms/taxonID,http://rs.tdwg.org/dwc/terms/country,http://plazi.org/terms/1.0/basionymYear,http://gbif.org/dwc/terms/1.0/canonicalName,undefined0,http://rs.tdwg.org/dwc/terms/basisOfRecord,http://plazi.org/terms/1.0/combinationYear,...,http://rs.tdwg.org/dwc/terms/family,http://rs.tdwg.org/dwc/terms/dynamicProperties,http://plazi.org/terms/1.0/verbatimScientificName,http://rs.tdwg.org/dwc/terms/eventRemarks,http://rs.tdwg.org/dwc/terms/class,http://rs.tdwg.org/dwc/terms/occurrenceID,http://rs.tdwg.org/dwc/terms/nomenclaturalStatus,http://rs.tdwg.org/dwc/terms/genus,http://purl.org/dc/terms/rightsHolder,http://www.w3.org/ns/prov#wasDerivedFrom
0,,,Hemiptera,F72087F4FFC2FFDB3F94012786C73AC4.taxon,,,Abascantus lobatus,,,1864,...,Hemiptera (awaiting allocation),,"Abascantus lobatus Stål, 1864",,Insecta,,,Abascantus,,hash://sha256/c61c2622391ae5b8fabe7003c3228934...
1,,,Hemiptera,F72087F4FFC2FFDB3F94012786C73AC4.taxon,,,Abascantus lobatus,,,1864,...,Hemiptera (awaiting allocation),,"Abascantus lobatus Stål, 1864",,Insecta,,,Abascantus,,hash://sha256/4256fd83db9270d2236776bc4bd45e22...


In [112]:
print(spark_df.filter((f.col('`http://rs.tdwg.org/dwc/terms/scientificName`') == "Abascantus lobatus Stal 1864")).toPandas())

  http://rs.tdwg.org/dwc/terms/datasetID  \
0                                   None   
1                                   None   

  http://rs.tdwg.org/dwc/terms/specificEpithet  \
0                                         None   
1                                         None   

  http://rs.tdwg.org/dwc/terms/order    http://rs.tdwg.org/dwc/terms/taxonID  \
0                          Hemiptera  F72087F4FFC2FFDB3F94012786C73AC4.taxon   
1                          Hemiptera  F72087F4FFC2FFDB3F94012786C73AC4.taxon   

  http://rs.tdwg.org/dwc/terms/country  \
0                                 None   
1                                 None   

  http://plazi.org/terms/1.0/basionymYear  \
0                                    None   
1                                    None   

  http://gbif.org/dwc/terms/1.0/canonicalName undefined0  \
0                          Abascantus lobatus       None   
1                          Abascantus lobatus       None   

  http://rs.tdwg.org/dwc/terms/