Skip to content

2. Native Avro Support

Weiqing Yang edited this page Jun 2, 2017 · 1 revision

SHC support different data formats like Avro, Jason, etc. The use case below shows how spark supports Avro. User can persist the Avro record into HBase directly. Internally, the Avro schema is converted to a native Spark Catalyst data type automatically. Note that both key-value parts in an HBase table can be defined in Avro format.

  1. Define catalog for the schema mapping:
def catalog = s"""{
                     |"table":{"namespace":"default", "name":"Avrotable"},
                      |"rowkey":"key",
                      |"columns":{
                      |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
                      |"col1":{"cf":"cf1", "col":"col1", "type":"binary"}
                      |}
                      |}""".stripMargin

catalog is a schema for a HBase table named Avrotable. row key as key and one column col1. The rowkey also has to be defined in details as a column (col0), which has a specific cf (rowkey).

  1. Prepare the Data:
 object AvroHBaseRecord {
   val schemaString =
     s"""{"namespace": "example.avro",
         |   "type": "record",      "name": "User",
         |    "fields": [
         |        {"name": "name", "type": "string"},
         |        {"name": "favorite_number",  "type": ["int", "null"]},
         |        {"name": "favorite_color", "type": ["string", "null"]},
         |        {"name": "favorite_array", "type": {"type": "array", "items": "string"}},
         |        {"name": "favorite_map", "type": {"type": "map", "values": "int"}}
         |      ]    }""".stripMargin

   val avroSchema: Schema = {
     val p = new Schema.Parser
     p.parse(schemaString)
   }

   def apply(i: Int): AvroHBaseRecord = {
     val user = new GenericData.Record(avroSchema);
     user.put("name", s"name${"%03d".format(i)}")
     user.put("favorite_number", i)
     user.put("favorite_color", s"color${"%03d".format(i)}")
     val favoriteArray = new GenericData.Array[String](2, avroSchema.getField("favorite_array").schema())
     favoriteArray.add(s"number${i}")
     favoriteArray.add(s"number${i+1}")
     user.put("favorite_array", favoriteArray)
     import collection.JavaConverters._
     val favoriteMap = Map[String, Int](("key1" -> i), ("key2" -> (i+1))).asJava
     user.put("favorite_map", favoriteMap)
     val avroByte = AvroSedes.serialize(user, avroSchema)
     AvroHBaseRecord(s"name${"%03d".format(i)}", avroByte)
   }
 }

 val data = (0 to 255).map { i =>
    AvroHBaseRecord(i)
 }

schemaString is defined first, then it is parsed to get avroSchema. avroSchema is used to generate AvroHBaseRecord. data prepared by users is a local Scala collection which has 256 AvroHBaseRecord objects.

  1. Save DataFrame:
 sc.parallelize(data).toDF.write.options(
     Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
     .format("org.apache.spark.sql.execution.datasources.hbase")
     .save()

Given a data frame with specified schema catalog, above will create an HBase table with 5 regions and save the data frame inside.

  1. Load the DataFrame
def avroCatalog = s"""{
            |"table":{"namespace":"default", "name":"avrotable"},
            |"rowkey":"key",
            |"columns":{
              |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
              |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
            |}
          |}""".stripMargin

 def withCatalog(cat: String): DataFrame = {
     sqlContext
         .read
         .options(Map("avroSchema" -> AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog -> avroCatalog))
         .format("org.apache.spark.sql.execution.datasources.hbase")
         .load()
 }
 val df = withCatalog(catalog)

In withCatalog function, read returns a DataFrameReader that can be used to read data in as a DataFrame. The option function adds input options for the underlying data source to the DataFrameReader. There are two options: one is to set avroSchema as AvroHBaseRecord.schemaString, and one is to set HBaseTableCatalog.tableCatalog as avroCatalog. The load() function loads input in as a DataFrame. The date frame df returned by withCatalog function could be used to access the HBase table.

  1. SQL Query
 df.registerTempTable("avrotable")
 val c = sqlContext.sql("select count(1) from avrotable").

After loading df DataFrame, users can query data. registerTempTable registers df DataFrame as a temporary table using the table name avrotable. sqlContext.sql function allows the user to execute SQL queries.

Clone this wiki locally