### Import Stuff

In [11]:
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.sql.functions._

### Create a SQL Context

In [12]:
val sqlContext = new SQLContext(sc)

### Now a UDFs, because Spark 1.4 does not have all of the functions we need
This does a simple string concat

In [13]:
val concat = udf((s1:String, s2:String) => s1 + s2)

### Create dataframes on the stores and receipts_by_store_date table

In [7]:
val stores_df = sqlContext.read.format("org.apache.spark.sql.cassandra").
      options(Map("keyspace"-> "retail", "table" -> "stores")).
      load()
      
val receipts_by_store_date_df = sqlContext.read.format("org.apache.spark.sql.cassandra").
      options(Map("keyspace"-> "retail", "table" -> "receipts_by_store_date")).
      load()

### Compute the sales_by_state
1. join receipts_by_store_date to store
2. group by state
3. sum by receipt_total
4. do a select to add the dummy column, rename columns, compute the region and round the totals

In [8]:
val sales_by_state_df = receipts_by_store_date_df.
      join(stores_df, stores_df("store_id") === receipts_by_store_date_df("store_id")).
      groupBy(stores_df("state")).
      sum("receipt_total").
      select(lit("dummy") alias "dummy", col("state"), concat( lit("US-"), col("state")) alias "region", col("SUM(receipt_total)") cast "Decimal(10,2)" alias ("receipts_total"))

In [9]:
sales_by_state_df show 10

+-----+-----+------+--------------+
|dummy|state|region|receipts_total|
+-----+-----+------+--------------+
|dummy|   MS| US-MS|    5940679.25|
|dummy|   MT| US-MT|    1916823.06|
|dummy|   TN| US-TN|   10767153.95|
|dummy|   NC| US-NC|   12888588.82|
|dummy|   ND| US-ND|    2901100.69|
|dummy|   NH| US-NH|    1957237.77|
|dummy|   AL| US-AL|    5797108.70|
|dummy|   NJ| US-NJ|   11580738.27|
|dummy|   TX| US-TX|   25657021.59|
|dummy|   NM| US-NM|    2988450.47|
+-----+-----+------+--------------+



### Save it to sales_by_state.  First truncate the table

In [14]:
sales_by_state_df.write.
      format("org.apache.spark.sql.cassandra").
      options(Map("keyspace" -> "retail",
                  "table" -> "sales_by_state")).
      mode(SaveMode.Overwrite).
      save()

In [15]:
%%cql select * from retail.sales_by_state limit 5

dummy,receipts_total,state,region
dummy,29802833.74,FL,US-FL
dummy,25657021.59,TX,US-TX
dummy,20938106.75,PA,US-PA
dummy,19994663.81,CA,US-CA
dummy,16736304.37,NY,US-NY
