## Setup

1. Create a Spark session
2. Add the iceberg-runtime Jar

In [1]:
spark

Waiting for a Spark session to start...

In [2]:
%AddJar file:///home/user/iceberg-runtime-0.1.3.jar

Starting download from file:///home/user/iceberg-runtime-0.1.3.jar
Finished download of iceberg-runtime-0.1.3.jar


# Drop and create a table in HDFS

[Spark Schema Helpers](https://netflix.github.io/iceberg/current/javadoc/index.html?com/netflix/iceberg/spark/SparkSchemaUtil.html)

In [3]:
import org.apache.hadoop.fs.Path
import org.apache.iceberg.hadoop.HadoopTables
import org.apache.iceberg.spark.SparkSchemaUtil

val path = "hdfs:/tmp/tables/job_metrics_tmp"

{ // use a block to avoid values (conf, etc.) getting caught in closures

    // remove the temp table if it already exists
    val conf = spark.sessionState.newHadoopConf()
    val fs = new Path(path).getFileSystem(conf)
    fs.delete(new Path(path), true /* recursive */ )

    // create the temp table using Spark utils to create a schema and partition spec
    val tables = new HadoopTables(conf)
    val schema = SparkSchemaUtil.schemaForTable(spark, "default.job_metrics")
    val spec = SparkSchemaUtil.specForTable(spark, "default.job_metrics")

    tables.create(schema, spec, path)

    // show the schema
    tables.load(path).schema
}


path = hdfs:/tmp/tables/job_metrics_tmp


table {
  1: event_utc_ms: optional long
  2: hostname: optional string
  3: jobflow: optional string
  4: job_name: optional string
  5: application_type: optional string
  6: record_id: optional string
  7: record_type: optional string
  8: user: optional string
  9: submit_time: optional long
  10: start_time: optional long
  11: finish_time: optional long
  12: run_host: optional string
  13: submit_host: optional string
  14: status: optional string
  15: cluster_id: optional string
  16: cluster_name: optional string
  17: queue: optional string
  18: genie_job_name: optional string
  19: genie_job_id: optional string
  20: job_uuid: optional string
  21: counters: optional string
  22: properties: optional string
  23: dateint: optional int
  24: hour: optional int
  25: batchid: optional string
}

# Load table partitions as a DataFrame

In [4]:
import org.apache.iceberg.spark.SparkTableUtil

// get a data frame with the table's partitions
val partitions = SparkTableUtil.partitionDF(spark, "default.job_metrics")
                               .filter($"format".contains("parquet") || $"format".contains("avro"))

display(partitions.limit(10))

partition,uri,format
"{dateint -> 20170316, hour -> 0, batchid -> merged_1}",s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=0/batchid=merged_1,org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
"{dateint -> 20170316, hour -> 1, batchid -> merged_1}",s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=1/batchid=merged_1,org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
"{dateint -> 20170316, hour -> 2, batchid -> merged_1}",s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=2/batchid=merged_1,org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
"{dateint -> 20170316, hour -> 3, batchid -> merged_1}",s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=3/batchid=merged_1,org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
"{dateint -> 20170316, hour -> 4, batchid -> merged_1}",s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=4/batchid=merged_1,org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
"{dateint -> 20170316, hour -> 5, batchid -> merged_1}",s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=5/batchid=merged_1,org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
"{dateint -> 20170316, hour -> 6, batchid -> merged_1}",s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=6/batchid=merged_1,org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
"{dateint -> 20170316, hour -> 7, batchid -> merged_1}",s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=7/batchid=merged_1,org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
"{dateint -> 20170316, hour -> 8, batchid -> merged_1}",s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=8/batchid=merged_1,org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
"{dateint -> 20170316, hour -> 9, batchid -> merged_1}",s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=9/batchid=merged_1,org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe


partitions = [partition: map<string,string>, uri: string ... 1 more field]


[partition: map<string,string>, uri: string ... 1 more field]

# List files, load metrics, and append to the table

* [Table API](https://netflix.github.io/iceberg/current/javadoc/index.html?com/netflix/iceberg/Table.html)
* [Append API](https://netflix.github.io/iceberg/current/javadoc/index.html?com/netflix/iceberg/AppendFiles.html)

In [5]:
import org.apache.iceberg.hadoop.HadoopTables
import org.apache.hadoop.conf.Configuration

partitions.repartition(100).flatMap { row =>

    // list the partition and read Parquet footers to get metrics
    SparkTableUtil.listPartition(row.getMap[String, String](0).toMap, row.getString(1), row.getString(2))

}.repartition(10) // avoid lots of manifests that would be merged later
 .mapPartitions { files =>

    // open the table and append the files from this partition
    val tables = new HadoopTables(new Configuration())
    val table = tables.load("hdfs:/tmp/tables/job_metrics_tmp")

    // fast appends will create a manifest for the new files
    val append = table.newFastAppend

    files.foreach { file =>
        append.appendFile(file.toDataFile(table.spec))
    }

    // commit the new files
    append.commit()

    Seq.empty[String].iterator

}.count




0

# Inspect the results

[Snapshot API](https://netflix.github.io/iceberg/current/javadoc/index.html?com/netflix/iceberg/Snapshot.html)

In [6]:
val tables = new HadoopTables(spark.sessionState.newHadoopConf())
val table = tables.load(path)

table.currentSnapshot

tables = org.apache.iceberg.hadoop.HadoopTables@1782cb95
table = hdfs:/tmp/tables/job_metrics_tmp


BaseSnapshot{id=1515605124481, timestamp_ms=1515605127199, manifests=[hdfs:/tmp/tables/job_metrics_tmp/metadata/695d8ab7-961c-4cef-94d7-367db5d8f7de-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/725154b3-92bd-4d00-9420-34a2866f2876-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/266e6040-d8ff-4713-92cb-0d806c7a3baf-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/3b0e9c88-03b0-4032-bf70-f9af43e00034-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/0747127e-895e-492e-b07e-a54627ee5534-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/db055992-1bf1-4fe7-a851-1eff0a05af55-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/1d5b7cb9-85bd-4088-ad26-a4e9562ad181-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/36db4143-8720-4060-9a8d-d17fa7dcf46f-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/46a079c3-8654-4ed5-9466-088320bda559-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/f239498c-7386-4f31-8421-518105ffbf6a-m0.avro]}

In [7]:
import scala.collection.JavaConverters._

table.currentSnapshot.addedFiles.asScala.size

7087

In [8]:
table.newAppend.commit // use a merge commit to create a single manifest

In [9]:
table.currentSnapshot

BaseSnapshot{id=1515605215920, timestamp_ms=1515605220253, manifests=[hdfs:/tmp/tables/job_metrics_tmp/metadata/213364b0-d97f-49bf-9126-7273b9784cfb-m0.avro]}