Skip to content
Branch: master
Find file History
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Type Name Latest commit message Commit time
Failed to load latest commit information.


Apache Spark Quickstart

Below is a brief example using Apache Spark to load, query, and modify a real data set in Apache Kudu.

Start the Kudu Quickstart Environment

See the Apache Kudu quickstart documentation to setup and run the Kudu quickstart environment.

Install Spark

Install Apache Spark on your host machine by following the Apach Spark installation documentation

If you are on a Mac you can use Homebrew and install with:
brew install apache-spark

Download the data

To practice some typical operations with Kudu and Spark, we’ll use the San Francisco MTA GPS dataset. This dataset contains raw location data transmitted periodically from sensors installed on the buses in the SF MTA’s fleet.

  1. Download the sample data.

    The SF MTA’s site is often a bit slow, so we’ve mirrored a sample CSV file from the dataset at

    The original dataset uses DOS-style line endings, so we’ll convert it to UNIX-style during the upload process using tr.

    There is also a missing line break after the header, so we add it using sed.

    gunzip -c sfmtaAVLRawData01012013.csv.gz | tr -d '\r' | \
      sed 's/PREDICTABLE/PREDICTABLE\n/g' > sfmtaAVLRawData01012013.csv

Run the spark-shell with kudu-spark

Run the spark-shell with the kudu-spark package:

spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.10.0
The examples below assume you are in the spark-shell with the kudu-spark package.
The examples below use :paste to support multiline syntax. As noted in the spark-shell use CTRL + D after pasting the code.

Load and prepare the CSV data

  1. Read the CSV data.

    Read the plain text data into a Spark DataFrame and print the interpreted schema.

    val sfmta_raw ="csv")
      .option("header", "true")
      .option("inferSchema", "true")
    spark.sql("SELECT count(*) FROM sfmta_raw").show()
    spark.sql("SELECT * FROM sfmta_raw LIMIT 5").show()
  2. Prepare the data to load into Kudu.

    To preapare the data we will:

    • Convert the REPORT_TIME from a string into a timestamp.

    • Mark the primary key columns, REPORT_TIME and VEHICLE_TAG, as non-nullable.

      import org.apache.spark.sql.types._
      import org.apache.spark.sql.DataFrame
      def setNotNull(df: DataFrame, columns: Seq[String]) : DataFrame = {
        val schema = df.schema
        // Modify [[StructField] for the specified columns.
        val newSchema = StructType( {
          case StructField(c, t, _, m) if columns.contains(c) => StructField(c, t, nullable = false, m)
          case y: StructField => y
        // Apply new schema to the DataFrame
        df.sqlContext.createDataFrame(df.rdd, newSchema)
      val sftmta_time = sfmta_raw
        .withColumn("REPORT_TIME", to_timestamp($"REPORT_TIME", "MM/dd/yyyy HH:mm:ss"))
      val sftmta_prep = setNotNull(sftmta_time, Seq("REPORT_TIME", "VEHICLE_TAG"))
      spark.sql("SELECT count(*) FROM sftmta_prep").show()
      spark.sql("SELECT * FROM sftmta_prep LIMIT 5").show()

Load and prepare the Kudu table

  1. Create a new Kudu table

    Create a Kudu table with 3 replicas and 4 hash partitions using the schema defined by the sftmta_prep DataFrame.

    import collection.JavaConverters._
    import org.apache.kudu.client._
    import org.apache.kudu.spark.kudu._
    val kuduContext = new KuduContext("localhost:7051,localhost:7151,localhost:7251", spark.sparkContext)
    // Delete the table if it already exists.
    if(kuduContext.tableExists("sfmta_kudu")) {
    kuduContext.createTable("sfmta_kudu", sftmta_prep.schema,
      /* primary key */ Seq("REPORT_TIME", "VEHICLE_TAG"),
      new CreateTableOptions()
        .addHashPartitions(List("VEHICLE_TAG").asJava, 4))
    The table is deleted and recreated if it already exists.
  2. Load the Kudu table.

    Insert the prepared data into the Kudu table using the kuduContext.

    kuduContext.insertRows(sftmta_prep, "sfmta_kudu")
    // Create a DataFrame that points to the Kudu table we want to query.
    val sfmta_kudu =
    	.option("kudu.master", "localhost:7051,localhost:7151,localhost:7251")
    	.option("kudu.table", "sfmta_kudu")
    	// We need to use leader_only because Kudu on Docker currently doesn't
    	// support Snapshot scans due to `--use_hybrid_clock=false`.
    	.option("kudu.scanLocality", "leader_only")
    spark.sql("SELECT count(*) FROM sfmta_kudu").show()
    spark.sql("SELECT * FROM sfmta_kudu LIMIT 5").show()

Read and Modify Data

Now that the data is stored in Kudu, you can run queries against it. The following query finds the data point containing the highest recorded vehicle speed.

spark.sql("SELECT * FROM sfmta_kudu ORDER BY speed DESC LIMIT 1").show()

The output should look something like this:

| report_time | vehicle_tag | longitude          | latitude          | speed             | heading |
| 1357022342  | 5411        | -122.3968811035156 | 37.76665878295898 | 68.33300018310547 | 82      |

With a quick Google search we can see that this bus was traveling east on 16th street at 68MPH. At first glance, this seems unlikely to be true. Perhaps we do some research and find that this bus’s sensor equipment was broken and we decide to remove the data. With Kudu this is very easy to correct using Spark:

spark.sql("SELECT count(*) FROM sfmta_kudu WHERE vehicle_tag = 5411").show()
val toDelete = spark.sql("SELECT * FROM sfmta_kudu WHERE vehicle_tag = 5411")
kuduContext.deleteRows(toDelete, "sfmta_kudu")
spark.sql("SELECT count(*) FROM sfmta_kudu WHERE vehicle_tag = 5411").show()

Next steps

The above example showed how to load, query, and mutate a static dataset with Spark and Kudu. The real power of Kudu, however, is the ability to ingest and mutate data in a streaming fashion.

As an exercise to learn the Kudu programmatic APIs, try implementing a program that uses the SFMTA XML data feed to ingest this same dataset in real time into the Kudu table.

You can’t perform that action at this time.