Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Not able to use WRITE_DISPOSITION property from Spark job #43

Closed
MubbleAnkur opened this issue Jan 19, 2017 · 7 comments
Closed

Not able to use WRITE_DISPOSITION property from Spark job #43

MubbleAnkur opened this issue Jan 19, 2017 · 7 comments

Comments

@MubbleAnkur
Copy link

MubbleAnkur commented Jan 19, 2017

I am trying to write data to a BigQuery table, requirement is to overwrite the existing data in the table. For that purpose i am trying to use the property
BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_KEY to override the default value which is WRITE_APPEND, but it is still appending the data to the table.

I am using Apache Spark with Scala and running the job in the Cloud Dataproc cluster environment.

Here is the piece of code I am trying:

BigQueryConfiguration.configureBigQueryOutput(conf, projectId, outputDataSetId, outputTableId, outputTableSchema)
conf.set(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_KEY, "WRITE_TRUNCATE")
conf.set("mapreduce.job.outputformat.class", classOf[BigQueryOutputFormat[_,_]].getName)
rdd.map(data => (null, data)).saveAsNewAPIHadoopDataset(conf)
@MubbleAnkur MubbleAnkur changed the title Couldn't be able to use WRITE_DISPOSITION property from spark job Back Couldn't be able to use WRITE_DISPOSITION property from spark job Jan 19, 2017
@dennishuo
Copy link
Contributor

dennishuo commented Feb 20, 2017

The OUTPUT_TABLE_WRITE_DISPOSITION_KEY was introduced as part of the addition of the IndirectBigQueryOutputFormat, and only applies if you're using the IndirectBigQueryOutputFormat.

It doesn't work to plumb it into the older BigQueryOutputFormat because in that flow, data is committed into the destination table in each commitTask call, meaning separate worker tasks independently commit their portion of the whole job, and thus require WRITE_APPEND.

The newer IndirectBigQueryOutputFormat works by first buffering all the data into GCS first, and then on commitJob actually loads the data from GCS into BigQuery, which is why it's able t support the WRITE_DISPOSITION. For large jobs it's better to use the newer IndirectBigQueryOutputFormat anyways because it only requires one BigQuery "load' job per Hadoop/Spark job, as opposed to one BigQuery job per task. To configure the IndirectBigQueryOutputFormat you should use com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration.

We'll open a tracking issue internally for improving the documentation for all this.

@MubbleAnkur
Copy link
Author

MubbleAnkur commented Mar 8, 2017

@dennishuo Thanks for the solution, but is it possible for you to explain it through an example?

@manojpatil05
Copy link

manojpatil05 commented Mar 24, 2017

@MubbleAnkur , @dennishuo Following up the thread. I have same requirement. And did not find any documentation around it.

I tried to change the configuration:

conf.set("mapreduce.job.outputformat.class", classOf[IndirectBigQueryOutputFormat[,]].getName)

And it has thrown exception:

Exception in thread "main" java.io.IOException: Must supply value for configuration settings: mapred.bq.output.gcs.fileformat, mapred.bq.output.gcs.outputformatclass

Looking for documentation on fileformat and outputformatclass.

@dennishuo
Copy link
Contributor

dennishuo commented Mar 24, 2017

Ah sorry, looks like there was a piece of documenting this that fell through the cracks during development. (And sorry I missed the ping a couple weeks ago, have been traveling for various conferences so it's been easy to miss some emails). I'll file an issue internally to try to get the examples updated, but in the meantime I'll just inline a WordCount example in its entirety which uses both the InputFormat and the new indirect OutputFormat. The most relevant lines are:

BigQueryOutputConfiguration.configure(
    conf,
    outputQualifiedTableId,
    outputSchema,
    outputGcsPath,
    BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
    TextOutputFormat.class);

. . .

job.setOutputFormatClass(IndirectBigQueryOutputFormat.class);

And in its entirety:

package com.google.cloud.hadoop.io.bigquery.samples;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration;
import com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/**
 * An example Hadoop WordCount program that counts the number of times a word appears in a BigQuery
 * table column.
 */
public class WordCount {

  /** The configuration key used to specify the BigQuery field name ("column name"). */
  public static final String WORDCOUNT_WORD_FIELDNAME_KEY = "mapred.bq.samples.wordcount.word.key";
  /**
   * Default value for the configuration entry specified by WORDCOUNT_WORD_FIELDNAME_KEY. Examples:
   * 'word' in publicdata:samples.shakespeare or 'repository_name' in
   * publicdata:samples.github_timeline.
   */
  public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word";

  /**
   * This is the mapper for our WordCount job. For input, it consumes a LongWritable and JsonObject
   * as the key and value. These correspond to a row identifier and Json representation of the row's
   * values/columns. For output, it produces Text and a LongWritable as the key and value. these
   * correspond to the word and a count for the number of times it has occurred.
   */
  public static class Map extends Mapper<LongWritable, JsonObject, Text, LongWritable> {

    private static final LongWritable ONE = new LongWritable(1);
    private Text word = new Text();
    private String wordKey;

    @Override
    public void setup(Context context) throws IOException, InterruptedException {
      // Find the runtime-configured key for the field name we're looking for in the map task.
      Configuration conf = context.getConfiguration();
      wordKey = conf.get(WORDCOUNT_WORD_FIELDNAME_KEY, WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT);
    }

    @Override
    public void map(LongWritable key, JsonObject value, Context context)
        throws IOException, InterruptedException {
      JsonElement countElement = value.get(wordKey);
      if (countElement != null) {
        String wordInRecord = countElement.getAsString();
        word.set(wordInRecord);
        context.write(word, ONE);
      }
    }
  }

  /**
   * This is the reducer for our WordCount job. For input, it consumes the Text and LongWritable
   * that the mapper produced. For output, it produces a JsonObject and NullWritable. The JsonObject
   * represents the data that will be loaded into BigQuery.
   */
  public static class Reduce extends Reducer<Text, LongWritable, JsonObject, NullWritable> {

    @Override
    public void reduce(Text key, Iterable<LongWritable> values, Context context)
        throws IOException, InterruptedException {
      // Add up the values to get a total number of occurrences of our word.
      long count = 0;
      for (LongWritable val : values) {
        count = count + val.get();
      }

      JsonObject jsonObject = new JsonObject();
      jsonObject.addProperty("Word", key.toString());
      jsonObject.addProperty("Count", count);
      context.write(jsonObject, NullWritable.get());
    }
  }

  /**
   * Configures and runs the main Hadoop job. Takes a String[] of 5 parameters with the format:
   * <code>[ProjectId] [QualifiedInputTableId] [InputTableFieldName] [QualifiedOutputTableId]
   * [GcsOutputPath]</code>
   *
   * <p><strong>ProjectId</strong> - Project under which to issue the BigQuery operations. Also
   * serves as the default project for table IDs which don't explicitly specify a project for the
   * table.
   *
   * <p><strong>QualifiedInputTableId</strong> - Input table ID of the form <code>
   * (Optional ProjectId):[DatasetId].[TableId]</code>
   *
   * <p><strong>InputTableFieldName</strong> - Name of the field to count in the input table, e.g.
   * 'word' in publicdata:samples.shakespeare or 'repository_name' in
   * publicdata:samples.github_timeline.
   *
   * <p><strong>QualifiedOutputTableId</strong> - Input table ID of the form <code>
   * (Optional ProjectId):[DatasetId].[TableId]</code>
   *
   * <p><strong>GcsOutputPath</strong> - The output path to store temporary GCS data, e.g. <code>
   * gs://bucket/dir/</code>
   *
   * @param args a String[] containing projectId, fullyQualifiedInputTableId, and
   *     fullyQualifiedOutputTableId.
   * @throws IOException on IO Error.
   * @throws InterruptedException on Interrupt.
   * @throws ClassNotFoundException if not all classes are present.
   */
  public static void main(String[] args)
      throws IOException, InterruptedException, ClassNotFoundException {

    // GenericOptionsParser is a utility to parse command line arguments generic to the Hadoop
    // framework. This example won't cover the specifics, but will recognize several standard
    // command line arguments, enabling applications to easily specify a namenode, a
    // ResourceManager, additional configuration resources etc.
    GenericOptionsParser parser = new GenericOptionsParser(args);
    args = parser.getRemainingArgs();

    // Make sure we have the right parameters.
    if (args.length != 5) {
      System.out.println(
          "Usage: hadoop jar bigquery_wordcount.jar [ProjectId] [QualifiedInputTableId] "
              + "[InputTableFieldName] [QualifiedOutputTableId] [GcsOutputPath]\n"
              + "    ProjectId - Project under which to issue the BigQuery operations. Also serves "
              + "as the default project for table IDs which don't explicitly specify a project for "
              + "the table.\n"
              + "    QualifiedInputTableId - Input table ID of the form "
              + "(Optional ProjectId):[DatasetId].[TableId]\n"
              + "    InputTableFieldName - Name of the field to count in the input table, e.g. "
              + "'word' in publicdata:samples.shakespeare or 'repository_name' in "
              + "publicdata:samples.github_timeline.\n"
              + "    QualifiedOutputTableId - Input table ID of the form "
              + "(Optional ProjectId):[DatasetId].[TableId]\n"
              + "    GcsOutputPath - The output path to store temporary GCS data, e.g. "
              + "gs://bucket/dir/");
      System.exit(1);
    }

    // Get the individual parameters from the command line.
    String projectId = args[0];
    String inputQualifiedTableId = args[1];
    String inputTableFieldId = args[2];
    String outputQualifiedTableId = args[3];
    String outputGcsPath = args[4];

    // Define the schema we will be using for the output BigQuery table.
    List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>();
    outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING"));
    outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER"));
    TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema);

    // Create the job and get it's configuration.
    Job job = new Job(parser.getConfiguration(), "wordcount");
    Configuration conf = job.getConfiguration();

    // Set the job-level projectId.
    conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

    // Configure input and output.
    BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);
    BigQueryOutputConfiguration.configure(
        conf,
        outputQualifiedTableId,
        outputSchema,
        outputGcsPath,
        BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
        TextOutputFormat.class);

    conf.set(WORDCOUNT_WORD_FIELDNAME_KEY, inputTableFieldId);

    // This helps Hadoop identify the Jar which contains the mapper and reducer by specifying a
    // class in that Jar. This is required if the jar is being passed on the command line to Hadoop.
    job.setJarByClass(WordCount.class);

    // Tell the job what data the mapper will output.
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setInputFormatClass(GsonBigQueryInputFormat.class);
    job.setOutputFormatClass(IndirectBigQueryOutputFormat.class);

    job.waitForCompletion(true);

    // After the job completes, make sure to clean up the Google Cloud Storage export paths.
    GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());
  }
}

@manojpatil05
Copy link

manojpatil05 commented Mar 27, 2017

@dennishuo : It will be great if you could provide Spark/Scala example. Because here conf object is of Configuration class but in Spark it is derived from SparkSession.sparkContext.hadoopConfiguration which does not write any data to the Google BigQuery table using saveAsNewAPIHadoopDataset(conf) with new set of configuration mentioned in your example.

@manojpatil05
Copy link

manojpatil05 commented Mar 27, 2017

It worked with different definition of configuration:

BigQueryOutputConfiguration.configure(
    conf,
    projectId,
    outputDatasetId,
    outputTableId,outputSchema,
    Temp_Gcs_path,
    BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
    classOf[TextOutputFormat[_,_]])

. . .

conf.set("mapreduce.job.outputformat.class", classOf[IndirectBigQueryOutputFormat[,]].getName)

Thank you for your help @dennishuo

@manojpatil05
Copy link

manojpatil05 commented Mar 28, 2017

@dennishuo : Though it worked but processing time is very big (executed it on Google Cloud Dataproc).

It creates near about 200 tasks and for each task below gets executed:

17/03/28 13:06:58 INFO com.google.cloud.hadoop.io.bigquery.BigQueryFactory: Creating BigQuery from default credential.
17/03/28 13:06:58 INFO com.google.cloud.hadoop.io.bigquery.BigQueryFactory: Creating BigQuery from given credential.
17/03/28 13:07:07 INFO com.google.cloud.hadoop.io.bigquery.output.ForwardingBigQueryFileOutputFormat: Delegating functionality to 'TextOutputFormat'

Sometimes, some tasks get failed with java.net.SocketTimeoutException: Read timed out error.

Is it something to do with configuration. I am testing it on small data (near about 5-6 lines.)

@medb medb changed the title Couldn't be able to use WRITE_DISPOSITION property from spark job Not able to use WRITE_DISPOSITION property from Spark job Aug 21, 2019
@medb medb closed this as completed Aug 21, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants