Build an index of unique values in each column and keep the recordset from whence they came.

In [1]:
idb_df = sqlContext.read.parquet("/guoda/data/idigbio-20170130.parquet")
idb_df.count()

75569035

In [5]:
idb_df.printSchema()

root
 |-- barcodevalue: string (nullable = true)
 |-- basisofrecord: string (nullable = true)
 |-- bed: string (nullable = true)
 |-- canonicalname: string (nullable = true)
 |-- catalognumber: string (nullable = true)
 |-- class: string (nullable = true)
 |-- collectioncode: string (nullable = true)
 |-- collectionid: string (nullable = true)
 |-- collectionname: string (nullable = true)
 |-- collector: string (nullable = true)
 |-- commonname: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- coordinateuncertainty: float (nullable = true)
 |-- country: string (nullable = true)
 |-- countrycode: string (nullable = true)
 |-- county: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- coreid: string (nullable = true)
 |    |-- dc:rights: string (nullable = true)
 |    |-- dcterms:accessRights: string (nullable = true)
 |    |-- dcterms:bibliographicCitation: string (nullable = true)
 |    |-- dcterms:language: string (nullable = true)
 |    |-- d

List of the columns that we want to summarize. These are the columns of current interest to the DwC group at the moment. Many columns have unique identifiers, free text, or already normalized vocab so it doesn't make sense to summarize everything for the purpose of trying to do everything at this point. A future more formal data product might.

NB: There is no data.dwc:language,         "data.dwc:license",         "data.dwc:organismScope", NB: There is no data.dwc:language,         "data.dwc:license",         "data.dwc:organismScope", type data in idb data in idb

In [25]:
columns = [
        "recordset",
        "data.dwc:basisOfRecord",
        "data.dwc:day",
        "data.dwc:disposition",
        "data.dwc:establishmentMeans",
        "data.dwc:geodeticDatum",
        "data.dwc:georeferenceVerificationStatus",
        "data.dwc:identificationQualifier",
        "data.dwc:identificationVerificationStatus",

        "data.dwc:lifeStage",
        "data.dwc:month",
        "data.dwc:nomenclaturalCode",
        "data.dwc:nomenclaturalStatus",
        "data.dwc:occurrenceStatus",

        "data.dwc:preparations",
        "data.dwc:reproductiveCondition",
        "data.dwc:sex",
        "data.dwc:typeStatus",
        "data.dwc:taxonRank",
        "data.dwc:taxonomicStatus"
        ]

In [30]:
# Some fancy code from the internet. Note this transform could also be done 
# by manually typing out a bunch of unions, one for each column. For columns
# with highly repetitious values this is a reasonable all-at-once approach.
# For a more general solution, looping over the columns will result in much
# smaller data sizes and is more appropriate for a long term data product.

from pyspark.sql.functions import array, col, explode, struct, lit


def to_long(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])



In [31]:
k_v_pairs = to_long(idb_df.select(columns), ["recordset"])
k_v_pairs.head(10)

[Row(recordset='c778d536-e9d4-422e-8e34-1d74a01f4c48', key='dwc:basisOfRecord', val='PreservedSpecimen'),
 Row(recordset='c778d536-e9d4-422e-8e34-1d74a01f4c48', key='dwc:day', val='20'),
 Row(recordset='c778d536-e9d4-422e-8e34-1d74a01f4c48', key='dwc:disposition', val=None),
 Row(recordset='c778d536-e9d4-422e-8e34-1d74a01f4c48', key='dwc:establishmentMeans', val='native'),
 Row(recordset='c778d536-e9d4-422e-8e34-1d74a01f4c48', key='dwc:geodeticDatum', val=None),
 Row(recordset='c778d536-e9d4-422e-8e34-1d74a01f4c48', key='dwc:georeferenceVerificationStatus', val=None),
 Row(recordset='c778d536-e9d4-422e-8e34-1d74a01f4c48', key='dwc:identificationQualifier', val=None),
 Row(recordset='c778d536-e9d4-422e-8e34-1d74a01f4c48', key='dwc:identificationVerificationStatus', val=None),
 Row(recordset='c778d536-e9d4-422e-8e34-1d74a01f4c48', key='dwc:lifeStage', val='U'),
 Row(recordset='c778d536-e9d4-422e-8e34-1d74a01f4c48', key='dwc:month', val='9')]

In [32]:
k_v_pairs.count()

1435811665

In [34]:
index = (k_v_pairs
         .groupBy(k_v_pairs.recordset, k_v_pairs.key, k_v_pairs.val)
         .count()
         )
index.head(10)

[Row(recordset='d7b285d4-2643-45ee-9302-b0c3d51dda5c', key='dwc:identificationQualifier', val=None, count=260089),
 Row(recordset='eaa5f19e-ff6f-4d09-8b55-4a6810e77a6c', key='dwc:geodeticDatum', val='World Geodetic System 1984', count=238766),
 Row(recordset='eaa5f19e-ff6f-4d09-8b55-4a6810e77a6c', key='dwc:nomenclaturalStatus', val=None, count=257180),
 Row(recordset='0b17c21a-f7e2-4967-bdf8-60cf9b06c721', key='dwc:taxonomicStatus', val=None, count=1978136),
 Row(recordset='b6ec6203-09db-4d6e-8cba-ee4bebd2934c', key='dwc:geodeticDatum', val=None, count=79532),
 Row(recordset='91c5eec8-0cdc-4be2-9a99-a15ae5ec3edc', key='dwc:basisOfRecord', val='PreservedSpecimen', count=66480),
 Row(recordset='215eeaf0-0a88-409e-a75d-aec98b7c41eb', key='dwc:establishmentMeans', val=None, count=147801),
 Row(recordset='09b18522-5643-478f-86e9-d2e34440d43e', key='dwc:month', val='08', count=45883),
 Row(recordset='92dd8c8e-c048-4f0a-9b5d-2ee627d2f553', key='dwc:identificationVerificationStatus', val=None,

In [35]:
index.count()

549228

In [37]:
(index
 .write
 .parquet("/outputs/rs-col-val-index-20170130.parquet")
)

In [38]:
(index
 .write
 .format("com.databricks.spark.csv")
 .option("header", "false")
 .save("/outputs/rs-col-val-index-20170130.csv")
)