In [15]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{Dataset, Row, DataFrame, Column, DataFrameWriter, SaveMode}
import org.apache.spark.sql.functions.{concat, lit,concat_ws}

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

spark = org.apache.spark.sql.SparkSession@9e5c7ec


In [None]:
## Helper Functions

In [2]:
def asArrayDelimited(c: Column) = concat(concat_ws("|", c))

asArrayDelimited: (c: org.apache.spark.sql.Column)org.apache.spark.sql.Column


In [17]:
implicit class MyPimpedDataFrameWriter[T](dfw: DataFrameWriter[T]) {
    
    def saveAsCsv(path: String): Unit = {
        dfw
            .mode(SaveMode.Overwrite)
            .format("csv")
            .option("delimiter","\t")
            .save(path)
    }
}

defined class MyPimpedDataFrameWriter


In [None]:
## Convert to CSV for Neo4j

In [18]:
val dfCompanyNames = spark.read.json("/abr/vertices/company-names/*.txt")

dfCompanyNames
    .select("id","data.value","meta.graphs")
    .withColumn("graphs",asArrayDelimited($"graphs"))
    .write
    .saveAsCsv("/abr/vertices.csv/company-names/")

dfCompanyNames = [data: struct<value: string>, id: string ... 1 more field]


[data: struct<value: string>, id: string ... 1 more field]

In [19]:
val dfBusinessNames = spark.read.json("/abr/vertices/business-names/*.txt")

dfBusinessNames
    .select("id","data.value","meta.graphs")
    .withColumn("graphs",asArrayDelimited($"graphs"))
    .write
    .saveAsCsv("/abr/vertices.csv/business-names/")

dfBusinessNames = [data: struct<value: string>, id: string ... 1 more field]


[data: struct<value: string>, id: string ... 1 more field]

In [20]:
val dfABN = spark.read.json("/abr/vertices/{abn-abr,asic-abn-without-an-abr}/*.txt")

dfABN
    .select("id",
             "data.abn",
             "data.abnStatus",
             "data.entityType",
             "data.entityTypeDescription",
             "data.gstStatus",
             "data.gstStatusFrom",
             "data.value",
             "meta.graphs")
    .withColumn("graphs",asArrayDelimited($"graphs"))
    .write
    .saveAsCsv("/abr/vertices.csv/abn/")

dfABN = [data: struct<abn: string, abnStatus: string ... 6 more fields>, id: string ... 1 more field]


[data: struct<abn: string, abnStatus: string ... 6 more fields>, id: string ... 1 more field]

In [21]:
val dfACN = spark.read.json("/abr/vertices/acn-abr-and-company-names/*.txt")

dfACN
    .select("id","data.value","meta.graphs")
    .withColumn("graphs",asArrayDelimited($"graphs"))
    .write
    .saveAsCsv("/abr/vertices.csv/acn/")

dfACN = [data: struct<value: string>, id: string ... 1 more field]


[data: struct<value: string>, id: string ... 1 more field]

In [None]:
val dfLegalEntity = spark.read.json("/abr/vertices/legal-entity/*.txt")

dfLegalEntity
    .select("id",
            "data.address.postCode",
            "data.address.state",
            "data.familyName",
            "data.givenNames",
            "data.title",
            "data.type",
            "meta.graphs"
           )
    .withColumn("graphs",asArrayDelimited($"graphs"))
    .withColumn("givenNames",asArrayDelimited($"givenNames"))
    .write
    .saveAsCsv("/abr/vertices.csv/legal-entity/")

In [None]:
val dfMainEntity = spark.read.json("/abr/vertices/main-entity/*.txt")

dfMainEntity
    .select("id",
            "data.address.postCode",
            "data.address.state",
            "data.nonIndividualName",
            "data.type",
            "meta.graphs"
           )
    .withColumn("graphs",asArrayDelimited($"graphs"))
    .write
    .saveAsCsv("/abr/vertices.csvmain-entity/")

In [11]:
val dfOtherEntity = spark.read.json("/abr/vertices/other-entity/*.txt")

dfOtherEntity
    .select("id",
            "data.nonIndividualName",
            "data.type",
            "meta.graphs"
           )
    .withColumn("graphs",asArrayDelimited($"graphs"))
    .write.format("csv").save("/abr/vertices.csv/other-entity/")

dfOtherEntity = [data: struct<nonIndividualName: string, type: string>, id: string ... 1 more field]


lastException: Throwable = null


[data: struct<nonIndividualName: string, type: string>, id: string ... 1 more field]