In [None]:
sc

## BigQuery code snippes


In [None]:
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration
import com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat
import com.google.gson.JsonObject
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

In [None]:
@transient
val conf = sc.hadoopConfiguration


In [None]:


// Input parameters.
val fullyQualifiedInputTableId = "publicdata:samples.shakespeare"
val projectId = conf.get("fs.gs.project.id")
val bucket = conf.get("fs.gs.system.bucket")

println(projectId, bucket)

In [None]:

// Input configuration.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)
conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket)
BigQueryConfiguration.configureBigQueryInput(conf, fullyQualifiedInputTableId)



In [None]:
// Output parameters.
val outputTableId = projectId + ":wordcount_dataset.wordcount_output"
// Temp output bucket that is deleted upon completion of job.
val outputGcsPath = ("gs://" + bucket + "/hadoop/tmp/bigquery/wordcountoutput")

// Output configuration.
// Let BigQueery auto-detect output schema (set to null below).
BigQueryOutputConfiguration.configure(conf,
                                      outputTableId,
                                      null,
                                      outputGcsPath,
                                      BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
                                      classOf[TextOutputFormat[_,_]])

In [None]:
conf.set("mapreduce.job.outputformat.class",
         classOf[IndirectBigQueryOutputFormat[_,_]].getName)

In [None]:



// Truncate the table before writing output to allow multiple runs.
/*
conf.set(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_KEY,
         "WRITE_TRUNCATE") */


In [None]:

// Helper to convert JsonObjects to (word, count) tuples.
def convertToTuple(record: JsonObject) : (String, Long) = {
 
  val word = record.get("word").getAsString.toLowerCase
  val count = record.get("word_count").getAsLong
    
  return (word, count)
}

// Helper to convert (word, count) tuples to JsonObjects.
def convertToJson(pair: (String, Long)) : JsonObject = {
  val ts = System.currentTimeMillis
  val word = pair._1
  val count = pair._2
  val jsonObject = new JsonObject()
  jsonObject.addProperty("word", word)
  jsonObject.addProperty("word_count", count)
  //jsonObject.addProperty("ts",ts)
  return jsonObject
}

In [None]:
// Load data from BigQuery.
val tableData = sc.newAPIHadoopRDD(
    conf,
    classOf[GsonBigQueryInputFormat],
    classOf[LongWritable],
    classOf[JsonObject])

// Perform word count.
val wordCounts = (tableData
    .map(entry => convertToTuple(entry._2))
    .reduceByKey(_ + _))

// Display 10 results.
wordCounts.take(10).filter(x => x._2 > 100).foreach(l => println(l))



In [None]:
wordCounts.sortBy(x => -x._2).take(10).foreach(println)

In [None]:
val small = wordCounts.filter(x => x._2> 26000)

In [None]:
small.count

In [None]:
// Write data back into a new BigQuery table.
// IndirectBigQueryOutputFormat discards keys, so set key to null.
(small
    .map(pair => (null, convertToJson(pair)))
    .saveAsNewAPIHadoopDataset(conf))