Skip to content

SQL to Scalding

Oscar Boykin edited this page Nov 8, 2016 · 1 revision

Table of Contents

  • Motivation
  • Create datasets
  • Simple Count
  • Count distinct
  • Count, Count distinct, Sum in one query
  • Where
  • Order by X, Y limit N
  • Union
  • Group and Aggregate
  • Join
  • Histogram, Ntile
  • TODO

Motivation

SQL is a popular language for data analytics. Scalding is a relative newcomer that is more powerful and complex. The goal of this document is to translate commonly used SQL idioms to Scalding type-safe API which is preferred over the fields-based API. We are using Vertica SQL variant that is based on PSQL and has support for analytic functions. We have purposely picked trivial example datasets so that it is easy to experiment using the REPL and view intermediate results to get a better understanding of what each method does. More information on how to use the REPL is in Scalding REPL and Learning Scalding with Alice.

Prerequisites:

  • Elementary knowledge of Scala
  • Basic ability to decipher types in Scalding methods

You should not expect Scalding to be as intuitive as SQL but at the same time it is not as hard as it may seem when you see the plethora of classes and methods in the Scalding docs.

To get a deeper understanding of monoids like QTree, please see Learning Algebird Monoids with REPL

Create datasets

SQL

CREATE TABLE test.allsales(
  state VARCHAR(20), 
  name VARCHAR(20), 
  sales INT
);

INSERT INTO test.allsales VALUES('CA', 'A', 60);
INSERT INTO test.allsales VALUES('CA', 'A', 20);
INSERT INTO test.allsales VALUES('VA', 'B', 15);
COMMIT;

pwagle=> select * from test.allsales;
 state | name | sales 
-------+------+-------
 CA    | A    |    60
 VA    | B    |    15
 CA    | A    |    20
(3 rows)

Scalding

scala> case class Sale(state: String, name: String, sale: Int)
defined class Sale

scala> val salesList = List(Sale("CA", "A", 60), Sale("CA", "A", 20), Sale("VA", "B", 15))
salesList: List[Sale] = List(Sale(CA,A,60), Sale(CA,A,20), Sale(VA,B,15))

scala> val salesPipe = TypedPipe.from(salesList)
salesPipe: com.twitter.scalding.typed.TypedPipe[Sale] = IterablePipe(List(Sale(CA,A,60), Sale(CA,A,15), Sale(VA,B,20)))

Simple Count

SQL

pwagle=> select count(1) from test.allsales;
 count 
-------
     3

Scalding

scala> salesPipe.groupAll.size.values.dump
3

Count distinct

SQL

pwagle=> select count(distinct state) from test.allsales;
 count 
-------
     2

Scalding

scala> salesPipe.map{x => x.state}.distinct.groupAll.size.values.dump
2

Count, Count distinct, Sum in one query

SQL

pwagle=> select count(1), count(distinct state), sum(sales) from test.allsales;
 count | count | sum 
-------+-------+-----
     3 |     2 |  95

Scalding

scala> salesPipe.map{x => (1, Set(x.state), x.sale) }.groupAll.sum.values.map{ case(count, set, sum) => (count, set.size, sum) }.dump
(3,2,95)

The above query will have performance issues if count(distinct state) is large. This can be solved in two ways:

  • Group by state first (TODO)
  • Using an approximate data structure like HyperLogLog (TODO)

Also see Aggregation using Algebird Aggregators.

Where

SQL

select state, name, sales
from test.allsales
where
state = 'CA';

Scalding

salesPipe.filter(sale => sale.state == "CA").values.dump

scala> salesPipe.filter(sale => (sale.state == "CA")).dump
Sale(CA,A,60)
Sale(CA,A,20)

Order by X, Y limit N

SQL

select state, name, sale
from test.allsales
order by state, name
limit 1;

Scalding

scala> object SaleOrderingWithState extends Ordering[Sale] {
     |   def compare(a: Sale, b: Sale) = a.state compare b.state
     | }
defined module SaleOrderingWithState

scala> implicit val saleOrderingWithState = SaleOrderingWithState
saleOrderingWithState: SaleOrderingWithState.type = SaleOrderingWithState$@3c91b77c

scala> salesPipe.groupAll.sorted.values.dump
Sale(CA,A,60)
Sale(CA,A,20)
Sale(VA,B,15)

salesPipe.groupAll.sorted.take(1).values.dump

salesPipe.groupAll.sortedTake(1).values.dump

Union

SQL

select state, name, sales from test.allsales
UNION ALL
select state, name, sales from test.allsales2

Scalding

scala> val salesPipe1 = TypedPipe.from(salesList)
salesPipe1: com.twitter.scalding.typed.TypedPipe[Sale] = IterablePipe(List(Sale(CA,A,60), Sale(CA,A,20), Sale(VA,B,15)))

scala> val salesPipe2 = TypedPipe.from(salesList)
salesPipe2: com.twitter.scalding.typed.TypedPipe[Sale] = IterablePipe(List(Sale(CA,A,60), Sale(CA,A,20), Sale(VA,B,15)))

scala> (salesPipe1 ++ salesPipe2).dump
Sale(CA,A,60)
Sale(CA,A,20)
Sale(VA,B,15)
Sale(CA,A,60)
Sale(CA,A,20)
Sale(VA,B,15)

Group and Aggregate

SQL

pwagle=> select state, count(1), count(distinct name), sum(sales) 
pwagle-> from test.allsales
pwagle-> group by state;
 state | count | count | sum 
-------+-------+-------+-----
 CA    |     2 |     1 |  80
 VA    |     1 |     1 |  15

Scalding

scala> salesPipe.map{x => (x.state, (1, Set(x.name), x.sale))}.sumByKey.dump
(CA,(2,Set(A),80))
(VA,(1,Set(B),15))

scala> salesPipe.map{x => (x.state, (1, Set(x.name), x.sale))}.sumByKey.mapValues{ case (count, set, sum) => (count, set.size, sum)}.dump
(CA,(2,1,80))
(VA,(1,1,15))

Join

Scalding

scala> case class Table1Row(field1: String, val1: Int)
defined class Table1Row

scala> case class Table2Row(field2: String, val2: Int)
defined class Table2Row

scala> val table1List = List(Table1Row("a", 1), Table1Row("b", 2))
table1List: List[Table1Row] = List(Table1Row(a,1), Table1Row(b,2))

scala> val table1Pipe = TypedPipe.from(List(Table1Row("a", 1), Table1Row("b", 2)))
table1Pipe: com.twitter.scalding.typed.TypedPipe[Table1Row] = IterablePipe(List(Table1Row(a,1), Table1Row(b,2)))

scala> val table2Pipe = TypedPipe.from(List(Table2Row("b", 3), Table2Row("c", 4)))
table2Pipe: com.twitter.scalding.typed.TypedPipe[Table2Row] = IterablePipe(List(Table2Row(b,3), Table2Row(c,4)))

scala> val table1PipeGroup = table1Pipe.groupBy { table1Row => table1Row.field1 }
table1PipeGroup: com.twitter.scalding.typed.Grouped[String,Table1Row] = IdentityReduce(scala.math.Ordering$String$@464dfa23,com.twitter.scalding.typed.TypedPipeFactory@1f0ed511,None)

scala> val table2PipeGroup = table2Pipe.groupBy { table2Row => table2Row.field2 }
table2PipeGroup: com.twitter.scalding.typed.Grouped[String,Table2Row] = IdentityReduce(scala.math.Ordering$String$@464dfa23,com.twitter.scalding.typed.TypedPipeFactory@6d73d27c,None)

scala> val join = table1PipeGroup.join(table2PipeGroup)
join: com.twitter.scalding.typed.CoGrouped[String,(Table1Row, Table2Row)] = com.twitter.scalding.typed.CoGroupable$$anon$3@1c75af7a

scala> join.dump
(b,(Table1Row(b,2),Table2Row(b,3)))

scala> val leftJoin = table1PipeGroup.leftJoin(table2PipeGroup)
leftJoin: com.twitter.scalding.typed.CoGrouped[String,(Table1Row, Option[Table2Row])] = com.twitter.scalding.typed.CoGroupable$$anon$3@7c067391

scala> leftJoin.dump
(a,(Table1Row(a,1),None))
(b,(Table1Row(b,2),Some(Table2Row(b,3))))

scala> val outerJoin = table1PipeGroup.outerJoin(table2PipeGroup)
outerJoin: com.twitter.scalding.typed.CoGrouped[String,(Option[Table1Row], Option[Table2Row])] = com.twitter.scalding.typed.CoGroupable$$anon$3@3daae803

scala> outerJoin.dump
(a,(Some(Table1Row(a,1)),None))
(b,(Some(Table1Row(b,2)),Some(Table2Row(b,3))))
(c,(None,Some(Table2Row(c,4))))

Histogram, Ntile

SQL

TODO

Scalding Histogram Fields-based Only

val inputTp: TypedPipe[Int] = TypedPipe.from(List(5, 2, 3, 3, 4, 4, 4, 1, 15, 30))
val p = inputTp.toPipe(('value)) 
val p1 = p.groupAll { group => group.histogram('value -> 'histogram) }
  .map('histogram -> ('min, 'q1, 'median, 'q3, 'max, 'mean)) {
     x: Histogram => (x.min, x.q1, x.median, x.q3, x.max, x.mean)
   }
val outputTp = p1.toTypedPipe[(Double, Double, Double, Double, Double, Double)](('min, 'q1, 'median, 'q3, 'max, 'mean))
outputTp.dump
(1.0,3.0,4.0,5.0,30.0,7.1)

Scalding QTree

val inputTp: TypedPipe[Int] = TypedPipe.from(List(5, 2, 3, 3, 4, 4, 4, 1, 15, 30))
implicit val qtSemigroup = new QTreeSemigroup[Long](6)
val v = inputTp.map {x => QTree(x)}.groupAll.sum.values

scala> val inputTp: TypedPipe[Int] = TypedPipe.from(List(5, 2, 3, 3, 4, 4, 4, 1, 15, 30))
inputTp: com.twitter.scalding.package.TypedPipe[Int] = IterablePipe(List(5, 2, 3, 3, 4, 4, 4, 1, 15, 30))

scala> val v = inputTp.map {x => QTree(x)}.groupAll.sum.values
<console>:41: error: Cannot find Semigroup type class for com.twitter.algebird.QTree[Long]
       val v = inputTp.map {x => QTree(x)}.groupAll.sum.values
                                                    ^

scala> implicit val qtSemigroup = new QTreeSemigroup[Long](6)
qtSemigroup: com.twitter.algebird.QTreeSemigroup[Long] = com.twitter.algebird.QTreeSemigroup@4e92c2ed

scala> val v = inputTp.map {x => QTree(x)}.groupAll.sum.values
v: com.twitter.scalding.typed.TypedPipe[com.twitter.algebird.QTree[Long]] = com.twitter.scalding.typed.TypedPipeFactory@7925e180

scala> v.map { q => (q.count, q.upperBound, q.lowerBound, q.quantileBounds(.5), q.quantileBounds(.95)) }.dump
(10,32.0,0.0,(4.0,5.0),(15.0,16.0))

TODO

Analytic / Window Functions (Rank, Ntile, Lag/Lead)

Running Total, Moving Average, Sessionization

Contents

Getting help

Documentation

Matrix API

Third Party Modules

Videos

How-tos

Tutorials

Articles

Other

Clone this wiki locally