In [1]:
import json

from pyspark.sql.functions import abs, col, explode, length, hash, udf, when, date_format
from pyspark.sql.types import *

In [2]:
idb_df = sqlContext.read.parquet("/guoda/data/idigbio-20190426T013311.parquet")

In [3]:
idb_dyn = (idb_df
               .filter(length(col("data.dwc:dynamicProperties")) > 2)
              .withColumn("props_str", col("data.dwc:dynamicProperties"))
              )

In [4]:
idb_dyn.count()

10374395

In [5]:
idb_dyn.select(col("props_str")).show(10, truncate=False)

+---------------------------------------------+
|props_str                                    |
+---------------------------------------------+
|{"life stage":"adult"}                       |
|{"sex":"Female"}                             |
|{"measurements":"ecotone area" }             |
|{ "solr_long_lat": "-71.303074,44.061185" }  |
|{ "solr_long_lat": "-71.319924,44.041766" }  |
|{ "solr_long_lat": "-71.304611,44.059277" }  |
|{ "solr_long_lat": "-71.3000108,44.0602311" }|
|{ "solr_long_lat": "-71.297795,44.062155" }  |
|{"measurements":"SV 71, Tail 175" }          |
|{ "solr_long_lat": "-109.74341,34.91267" }   |
+---------------------------------------------+
only showing top 10 rows



Need a udf that returns a mapType of string:string that explode will expand

In [6]:
def json_to_map(s):
    """Convert a string containing JSON into a dictionary,
    Skip flattening for now."""
    try:
        return json.loads(s)
    except:
        return {}
    
json_to_map_udf = udf(json_to_map, MapType(StringType(), StringType()))

print(json_to_map('{ "solr_long_lat": "-5.87403,30.49728", "related_record_types": "PreservedSpecimen|PreservedSpecimen", "related_record_links": "YPM-IP-530950|YPM-IP-530951" }'))



{'solr_long_lat': '-5.87403,30.49728', 'related_record_types': 'PreservedSpecimen|PreservedSpecimen', 'related_record_links': 'YPM-IP-530950|YPM-IP-530951'}


In [7]:
idb_map = (idb_dyn
 .withColumn("props_map", json_to_map_udf(col("props_str")))
)
idb_map.select(col("props_map")).show(10, truncate=False)

+--------------------------------------------+
|props_map                                   |
+--------------------------------------------+
|Map(life stage -> adult)                    |
|Map(sex -> Female)                          |
|Map(measurements -> ecotone area)           |
|Map(solr_long_lat -> -71.303074,44.061185)  |
|Map(solr_long_lat -> -71.319924,44.041766)  |
|Map(solr_long_lat -> -71.304611,44.059277)  |
|Map(solr_long_lat -> -71.3000108,44.0602311)|
|Map(solr_long_lat -> -71.297795,44.062155)  |
|Map(measurements -> SV 71, Tail 175)        |
|Map(solr_long_lat -> -109.74341,34.91267)   |
+--------------------------------------------+
only showing top 10 rows



In [18]:
idb_triples = (idb_map
 .select(col("uuid"), 
         col("recordset"),
         col("institutioncode"),
         explode(col("props_map")).alias("key", "value"))
)
idb_triples.cache()

DataFrame[uuid: string, recordset: string, institutioncode: string, key: string, value: string]

In [19]:
idb_triples.count()

31170327

In [20]:
idb_triples.show(20, truncate=False)

+------------------------------------+------------------------------------+---------------+--------------------+-------------------------+
|uuid                                |recordset                           |institutioncode|key                 |value                    |
+------------------------------------+------------------------------------+---------------+--------------------+-------------------------+
|44c2a8fa-5a06-44de-83e8-d29146031cfe|271a9ce9-c6d3-4b63-a722-cb0adc48863f|mcz            |life stage          |adult                    |
|eb6c2256-73ae-4c52-95c2-cfb3292ed994|271a9ce9-c6d3-4b63-a722-cb0adc48863f|mcz            |sex                 |Female                   |
|abeb01a0-c691-4262-924c-667d74187021|637d0f2f-a0b4-4f33-a1ad-bd0ab18b620d|uaz            |measurements        |ecotone area             |
|626b8a8e-8109-421b-b09d-8725c4e6cd79|76015dea-c909-4e6d-a8e1-3bf35763571e|ypm            |solr_long_lat       |-71.303074,44.061185     |
|951adfcd-c4e4-4173-ab08-e3

In [11]:
(idb_triples
 .groupBy(col("key"))
 .count()
 .sort(col("count"), ascending=False)
 .limit(1000)
 ).toPandas()

Unnamed: 0,key,count
0,recordtype,2946917
1,centroid,2946917
2,created,2946917
3,gbifid,2463714
4,determinations,2382962
5,subdepartment,2323969
6,gbifissue,1267532
7,registrationcode,1039765
8,donorname,905538
9,collectionkind,799900


In [21]:
(idb_triples
 .groupBy(col("institutioncode"))
 .count()
 .sort(col("count"), ascending=False)
 .limit(1000)
 ).toPandas()

Unnamed: 0,institutioncode,count
0,nhmuk,25313796
1,ypm,1271259
2,mcz,1193852
3,lacm,569006
4,sio,560147
5,wfvz,404292
6,omnh,288517
7,uwbm,239439
8,ummz,236954
9,ku,207037


In [12]:
(idb_triples
 .filter(col("key") == "NSF_TCN")
 .count()
)

77024

In [22]:
(idb_triples
 .filter(col("key") == "NSF_TCN")
 .groupBy(col("institutioncode"), col("value"))
 .count()
 .sort(col("count"), ascending=False)
 .limit(1000)
 ).toPandas()

Unnamed: 0,institutioncode,value,count
0,ypm,WIS,61477
1,ypm,FIC,15500
2,ypm,FIC WIS,47


Now let's write this out and go back and join to the main DF for some summaries

In [None]:
#(idb_triples
# .write
# .parquet("/tmp/idigbio-20171014T023306-json-triples.parquet")
#)

How much more information might we be able to find in records that records that are not JSON parsable?

In [None]:
(idb_triples
 .select(length(col("key")).alias("len_key"))
 .avg(col("len_key"))
 .show()
)

In [None]:
#joined = idb_dyn.join(idb_triples, idb_dyn["uuid"] == idb_triples["uuid"], "inner")


In [16]:
#joined.show(3, truncate=False)

KeyboardInterrupt: 

In [None]:
#joined.count()

Who provides the most additional information in valid JSON?

What is the ratio of JSON to non-JSON text? How much more do we have to work on?