In [1]:
diff = sqlContext.read.parquet("/guoda/outputs/idigbio-20180414T023309-20180519T023311.parquet")

In [2]:
diff.count()

193482

In [4]:
diff.printSchema()

root
 |-- barcodevalue: string (nullable = true)
 |-- basisofrecord: string (nullable = true)
 |-- bed: string (nullable = true)
 |-- canonicalname: string (nullable = true)
 |-- catalognumber: string (nullable = true)
 |-- class: string (nullable = true)
 |-- collectioncode: string (nullable = true)
 |-- collectionid: string (nullable = true)
 |-- collectionname: string (nullable = true)
 |-- collector: string (nullable = true)
 |-- commonname: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- coordinateuncertainty: float (nullable = true)
 |-- country: string (nullable = true)
 |-- countrycode: string (nullable = true)
 |-- county: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- coreid: string (nullable = true)
 |    |-- dc:rights: string (nullable = true)
 |    |-- dcterms:accessRights: string (nullable = true)
 |    |-- dcterms:bibliographicCitation: string (nullable = true)
 |    |-- dcterms:language: string (nullable = true)
 |    |-- d

Can't write this structure out as a CSV, need to flatten it first. Dropping to SQL syntax will let us use string manipulation to construct the query.

In [5]:
sqlContext.registerDataFrameAsTable(diff, "diff")

In [20]:
(spark.sql("select collector, data.id, data.`dwc:year` as data_dwc_year from diff").show(3))

+--------------------+--------------------+-------------+
|           collector|                  id|data_dwc_year|
+--------------------+--------------------+-------------+
|         m. a. faust|http://n2t.net/ar...|         2009|
|                null|URI:catalog:ROM:B...|         null|
|collector(s): ken...|http://arctos.dat...|         1981|
+--------------------+--------------------+-------------+
only showing top 3 rows



So to refer to values in struct columns, use a dot, to escape the :, use backticks. To do both, don't backtick the struct column name.

Now we should be ok to iterate over columns and construct a list of things to select.

Hey, there's examples! https://docs.databricks.com/spark/latest/spark-sql/complex-types.html None quite fit this case.

Also, consider selectExpr() instead of full SQL.

In [43]:
for s in diff.schema:
    print(s.dataType)

StringType
StringType
StringType
StringType
StringType
StringType
StringType
StringType
StringType
StringType
StringType
StringType
FloatType
StringType
StringType
StringType
StructType(List(StructField(coreid,StringType,true),StructField(dc:rights,StringType,true),StructField(dcterms:accessRights,StringType,true),StructField(dcterms:bibliographicCitation,StringType,true),StructField(dcterms:language,StringType,true),StructField(dcterms:license,StringType,true),StructField(dcterms:modified,StringType,true),StructField(dcterms:references,StringType,true),StructField(dcterms:rights,StringType,true),StructField(dcterms:rightsHolder,StringType,true),StructField(dcterms:source,StringType,true),StructField(dcterms:type,StringType,true),StructField(dwc:VerbatimEventDate,StringType,true),StructField(dwc:acceptedNameUsage,StringType,true),StructField(dwc:accessRights,StringType,true),StructField(dwc:associatedMedia,StringType,true),StructField(dwc:associatedOccurrences,StringType,true),StructFi

In [57]:
# Only deals with one layer of structs, could make recursive if we ever end 
# up with multiple levels of nesting

def easy_name(s):
    """Remove special characters in column names"""
    return s.replace(':', '_')

def escape_name(s):
    """Escape special characters in columns names"""
    return "`{0}`".format(s)


selects = []
for s in diff.schema:
    name = str(s.name)
    dataType = str(s.dataType)
    if "StructType" in str(s.dataType):
        struct_name = str(s.name)
        for f in s.dataType:
            name = str(f.name)
            selects.append("{0}.{1} as {0}_{2}".format(easy_name(struct_name), 
                                                       escape_name(name), 
                                                        easy_name(name)))
    else:
        selects.append("{0} as {1}".format(escape_name(name), easy_name(name)))
        pass

for s in selects:
    print(s)
        
        

`barcodevalue` as barcodevalue
`basisofrecord` as basisofrecord
`bed` as bed
`canonicalname` as canonicalname
`catalognumber` as catalognumber
`class` as class
`collectioncode` as collectioncode
`collectionid` as collectionid
`collectionname` as collectionname
`collector` as collector
`commonname` as commonname
`continent` as continent
`coordinateuncertainty` as coordinateuncertainty
`country` as country
`countrycode` as countrycode
`county` as county
data.`coreid` as data_coreid
data.`dc:rights` as data_dc_rights
data.`dcterms:accessRights` as data_dcterms_accessRights
data.`dcterms:bibliographicCitation` as data_dcterms_bibliographicCitation
data.`dcterms:language` as data_dcterms_language
data.`dcterms:license` as data_dcterms_license
data.`dcterms:modified` as data_dcterms_modified
data.`dcterms:references` as data_dcterms_references
data.`dcterms:rights` as data_dcterms_rights
data.`dcterms:rightsHolder` as data_dcterms_rightsHolder
data.`dcterms:source` as data_dcterms_source
dat

In [33]:
# A cool approach found on line but explode doesn't do structs, only array and map
#from pyspark.sql.functions import explode, first, col, monotonically_increasing_id
#long = (diff
#   .withColumn("id", monotonically_increasing_id())
#   .select("id", explode("data").alias("col"))
#   .select("id", "col.*"))
#long.show(10, truncate=False)

Now we have a list of things that we can select directly to flatten our dataframe.

Have to turn on column case sensativity to deal with multiple verbatimEventDate capitalizations. See https://redmine.idigbio.org/issues/2760

In [65]:
sqlContext.sql('set spark.sql.caseSensitive=true')

DataFrame[key: string, value: string]

In [67]:
flat = diff.selectExpr(selects)

In [68]:
flat.show(1, truncate=False)

+------------+-----------------+----+-------------+-------------+-----------+--------------+------------+--------------+-----------+----------+-------------+---------------------+-------------+-----------+------+-----------+--------------+-------------------------+----------------------------------+---------------------+--------------------+---------------------+-----------------------+-------------------+-------------------------+-------------------+-----------------+--------------------------+--------------------------+---------------------+------------------------+------------------------------+----------------------------+-----------------------------+----------------------------+-----------------------+----------------------+------------+-----------------+----------------------+--------------+---------------+-----------------------+---------------------+------------------+----------------------------+--------------------------------------+----------------+--------------------+----

In [69]:
flat.printSchema()

root
 |-- barcodevalue: string (nullable = true)
 |-- basisofrecord: string (nullable = true)
 |-- bed: string (nullable = true)
 |-- canonicalname: string (nullable = true)
 |-- catalognumber: string (nullable = true)
 |-- class: string (nullable = true)
 |-- collectioncode: string (nullable = true)
 |-- collectionid: string (nullable = true)
 |-- collectionname: string (nullable = true)
 |-- collector: string (nullable = true)
 |-- commonname: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- coordinateuncertainty: float (nullable = true)
 |-- country: string (nullable = true)
 |-- countrycode: string (nullable = true)
 |-- county: string (nullable = true)
 |-- data_coreid: string (nullable = true)
 |-- data_dc_rights: string (nullable = true)
 |-- data_dcterms_accessRights: string (nullable = true)
 |-- data_dcterms_bibliographicCitation: string (nullable = true)
 |-- data_dcterms_language: string (nullable = true)
 |-- data_dcterms_license: string (nullable = t

Now this should be writable as a CSV

In [70]:
(flat
 .write
 .format("com.databricks.spark.csv")
 .mode("overwrite")
 .option("header", "false")
 .save("/tmp/test_ee_export.csv")
)

Do need the header though, a separate file is acceptable for now so let's see what this makes. Can use getmerge and concatenation during publishing to combine them.



In [75]:
(flat
 .limit(1)
 .coalesce(1)
 .write
 .format("com.databricks.spark.csv")
 .mode("overwrite")
 .option("header", "true")
 .save("/tmp/test_ee_export_one_line.csv")
)