# **Sommelier**


 <font size="3"> 
    
***We are thrilled to present a new approach to retrieve a small piece of significant data: the Sommelier Sampling.***

# **INTRODUCTION:**

 
 <font size="3"> 
Before we get hands on with the development, some technical background is needed. With the raise of Big Data and Analytics, the demand for optimization on the way data is handled is growing every day. Sampling is a powerful but also feared technique for approximating query answers, since the error estimation is an open issue for the community. Luckily, experts take care of writing the reports that demonstrate the usability of it, along with rules and calculations of the expected error.  
<br/>
<br/>
More specifically, the background is separated in two blocks: the Microsoft and the BlinkDB solutions, both of them at the state of the art in query sampling. Moreover, we also mention the research that has already been made for the topic in Spark, although so far, we have only found implementations using RDD and nothing open-sourced. 


## QuickR: Lazily Approximating Complex AdHoc Queries in Big Data Clusters 


 <font size="3"> 
    
https://www.microsoft.com/en-us/research/wp-content/uploads/2016/06/quickr-2.pdf 

In the year 2016, Microsoft presents a system called QuickR that approximates the answer to complex queries by injecting samplers on the fly and without requiring pre-sampled data. Through a cost-based optimizer, they generate query plans with appropriate sample operators at the most adequate location, providing also an accuracy analysis to not miss any little group. The results had shown that QuickR improves substantially the performance in a large segment of TPC-DS benchmark queries, using a 2x fewer resources in the cluster and with a mean error of only the 10% in aggregations.  

How did they accomplish that? 

By analyzing the queries, the user proposes and taking into account the statistics of the tables, they can check whether is better to extract a part of the information and which technique should they execute to offer the most accurate answer. They had three different types of sampling (which we will summarize later with examples): 

- Uniform 

- Universe 

- Distinct 

They also implement some logical optimization rules for passing the operator through projection and join  (left, right, both or none of them)  to retrieve less data in early stages of the query, and execute an accuracy analysis of the result to satisfy the requirements of the user.  

An analysis of approximability of big data queries shows: 

1. Distribution of queries over input datasets is heavy-tailed. Individual queries use many columns and a diverse set of columns such that the additional storage space required to store samples can be prohibitively large. 

2. Queries typically have aggregation operators, large support, and output << input, so they are approximable. 

3. Several factors hinder approximability: queries use a diverse set of columns requiring extensive stratification. Many queries join large input relations. 

4. Queries are deep, involving multiple effective passes over data including nextwork shuffles. 

------
## Experience with Approximating Queries in Microsoft’s Production Big-Data Clusters 

 <font size="3"> 
    
    
https://www.microsoft.com/en-us/research/uploads/prod/2019/04/p698-kandula.pdf  

Three years later that the previous paper talking about the QuickR solution, the error approximation and pushdown rules, they published their overall experience (over tens of production clusters) on using this type of query time sampling. Just to remember, the two main steps of this technique are:  

1. Users add sampling operators on the query 

2. The Query Optimizer transforms the predicate pushing down the operators while ensuring the accuracy stills intact  


 <font size="3"> 
The information they found is valuable because it allows us to confirm that this solution is useful in production environments. Here we provide a resume of the conclusions, but you can compare the charts and results in the link of the paper.  

- All three samplers are used evenly 

- One third of the queries use multiple sample operators.  



<img src="images/microsoft.png" width="400" height="600">

 <font size="3"> 

- The median processing rate for samplers is over 100MBps. Over 95% of samplers process input at over 100Mbps (low processing rate means very little data!) 

- A significant majority of the samplers doesn’t have memory footprint 

- Distinct sampler has more complex implementation (it had to track groups that had a large number of rows) 

- About 30% of the samplers picks less than 1% fraction of input. Many samplers also pick 10% (this was the recommended setting of the manual they provided)  

- Fewer than 10% of the samplers use a probability assignment above 0.25 

- 80% of universe samples are over a single column 

- The 90% of the input column sets of distinct is between 1 and 6 

- Among the jobs that use samplers, only 2% never had to be re-executed. And over the 80% repeat at least 100x each.  

- The results on the TPC-H Benchmark showed that: 

- Roughly 8 of 22 queries are unsampled 

- Queries 5, 7, 8 and 9 improve substantially their processing cost  

------
## BlinkDB 
 <font size="3"> 
    
https://sameeragarwal.github.io/blinkdb_eurosys13.pdf 

BlinkDB proposes and implements an approximate query engine for running interactive SQL queries on large volumes of data. It supports ad-hoc queries with error and response time constraints. 

BlinkDB implements a multi-dimensional sampling strategy that builds and maitnains a variety of samples. Also, it implements a run-time dynamic sample selection strategy that uses parts of a sample to estimate query selectivity and chooses the best samples for satisfying query constraints.  

It handles a variety of queries with diverse error and time constraints. 2s on 17 TB of data with 90-98% accuracy. 

</font>

<img src="images/blinkdb.png" width="400" height="600">
<center>BlinkDB’s Implementation Stack </center>
<br/>
<br/>
 <font size="3"> 


To store differently stratified samples, a-priori storage techniques typically use sample storage of 1x to 10x the size of the input. We’ve seen in a benchmark that the smallest input set used by 20% of queries is 3PB. Hence, assuming input popularity can be predicted perfectly, covering 20% of queries will require between 3PB and 30PB of a-priori samples. Such a large sample set is already a substantial fraction of the total input size (120PB).  

------
# **UNDERSTANDING DATA**:

 <font size="3"> 
In this notebook we present a few examples to better understand the different types of samples that exist and when they should be used. <br> We create synthetic datasets, normally distributed data and skewed data affects query results when sample techniques are used. We will see how the cardinality between data is key. 


## Sampling types

 <font size="3"> 
The sampling types used for a query will vary depending on the data, data with normal distribution allows to use uniform sample. Skewed data, usually means that we need to `groupBy` the data so that we have at least some rows from every group in the data. We will use universe sample when two tables have the same size and are “row aligned”. 

 <font size="3"> 
Imagine we have two tables on our system: <br/>
<br/>

```sql 
    CREATE TABLE user ( 
        user_id LONG,  
        city_id LONG 
    ) 
    PRIMARY KEY user_id 
```
<br/>
    
```sql
    CREATE TABLE cities ( 
        city_id LONG, 
        name TEXT, 
        cash INT
    ) 
    PRIMARY KEY city_id 
```
  
<br/>
    
User_id and city_id are the identifiers for both tables, while Cash means the salary for that city. In our test we will do a join operation followed by a sum of the money, grouping by cities to show the differences among distributions and why is best to use one sample or another depending on the case. 
    

In [1]:
case class User(user_id: Int, city_id: Long)
case class Cities(city_id: Long, name: String, cash: Int = 100)
def randomizeCity= scala.util.Random.nextInt(5).toLong +1

Intitializing Scala interpreter ...

Spark Web UI available at http://10.192.235.138:4040
SparkContext available as 'sc' (version = 2.4.4, master = local[*], app id = local-1582552171248)
SparkSession available as 'spark'


defined class User
defined class Cities
randomizeCity: Long


------
## **Uniform sample**

 <font size="3"> 
Also known as Simple Random Sampling(SRS), it is assumed that the population is independent and identically distributed (i.i.d). The sample size required to reach a prespecified precision is based on the dispersion variance of the population and the survey precision required. Then the sample units are chosen from the population independently with equal probability, and inferences are conducted using the sample. I.e. We will use this kind of sample when every row from a table has the same probability of being selected. 
</font>
<br/>
<br/>


<img src="images/uniform-sample.png" width="400" height="600">

In [2]:
(1 to 100000).map(x => User(x, randomizeCity)).toDS.createOrReplaceTempView("users")
(1 to 5).map(x => Cities(x, "a")).toDS.createOrReplaceTempView("cities")

In [3]:
val query = spark.sql(s"""
                SELECT users.city_id, 
                       sum(cities.cash) AS res
                FROM users 
                JOIN cities 
                    ON users.city_id == cities.city_id
                GROUP BY users.city_id""")


query: org.apache.spark.sql.DataFrame = [city_id: bigint, res: bigint]


In [4]:
val uniform_sample =  spark.sql(s"""
                SELECT users.city_id, 
                       sum(cities.cash*100) AS res_sample
                FROM users TABLESAMPLE(1 PERCENT) 
                JOIN cities 
                    ON users.city_id == cities.city_id
                GROUP BY users.city_id""")

uniform_sample: org.apache.spark.sql.DataFrame = [city_id: bigint, res_sample: bigint]


In [5]:
query.show()

+-------+-------+
|city_id|    res|
+-------+-------+
|      5|1994000|
|      1|2016700|
|      3|1979600|
|      2|1986000|
|      4|2023700|
+-------+-------+



In [6]:
uniform_sample.show()

+-------+----------+
|city_id|res_sample|
+-------+----------+
|      5|   1850000|
|      1|   2090000|
|      3|   2330000|
|      2|   1870000|
|      4|   1980000|
+-------+----------+



In [7]:
query.createOrReplaceTempView("query")
uniform_sample.createOrReplaceTempView("sample")

In [8]:
val error = spark.sql(s"""
            SELECT query.city_id, 
                   abs(res - res_sample)/res*100 AS error 
            FROM query 
            JOIN sample 
                ON query.city_id == sample.city_id
""")

error: org.apache.spark.sql.DataFrame = [city_id: bigint, error: double]


In [9]:
error.show()

+-------+-----------------+
|city_id|            error|
+-------+-----------------+
|      5|7.221664994984955|
|      1|3.634650666931125|
|      3|17.70054556476056|
|      2|5.840886203423968|
|      4|2.159410979888323|
+-------+-----------------+



------
## **Distinct sample**

 <font size="3"> 
The uniform sample is simple but it has some issues that limit it from being used widely. Queries with group-by such as `SELECT X, SUM(Y) GROUP BY X` can miss groups in the answer, especially those corresponding to values of X that have low support. For such queries, we must use a different kind of sample, such as the distinct sampler which intuitively guarantees that at least a certain number of rows pass per distinct combination of values of a given column set. The distinct sample also helps when aggregates have high skew. When we have skewed data, few rows can contain high values (e.g revenue of a company) in a way that, for a given query that aggregates such values, these rows are crucial to obtain approximate results and hence they should have a higher probability of being selected. 
</font>
<br/>
<br/>
    
    
<img src="images/distinct-sample.png" width="400" height="600">

In [10]:
val users_data_1 = (1 to 300).map(x => User(x, randomizeCity)).toDS
val users_data_2 = (301 to 100000).map(x => User(x, 5)).toDS

users_data_1.union(users_data_2).createOrReplaceTempView("users")
(1 to 6).map(x => Cities(x, "a")).toDS.createOrReplaceTempView("cities")

users_data_1: org.apache.spark.sql.Dataset[User] = [user_id: int, city_id: bigint]
users_data_2: org.apache.spark.sql.Dataset[User] = [user_id: int, city_id: bigint]


In [11]:
val query_skewed = spark.sql(s"""
                SELECT users.city_id, 
                       sum(cities.cash) AS res
                FROM users 
                JOIN cities 
                    ON users.city_id == cities.city_id
                GROUP BY users.city_id""")

query_skewed: org.apache.spark.sql.DataFrame = [city_id: bigint, res: bigint]


In [12]:
query_skewed.show()

<br/> 
 <font size="4"> 
    - What happens if we try to perform a uniform sample on users table now that the table is <b>skewed</b>?
</font>
<br/> 
<br/> 

In [13]:
val uniform_sample_skewed =  spark.sql(s"""
                SELECT users.city_id, 
                       sum(cities.cash*100) AS res_sample
                FROM users TABLESAMPLE(1 PERCENT) 
                JOIN cities 
                    ON users.city_id == cities.city_id
                GROUP BY users.city_id""")

+-------+-------+
|city_id|    res|
+-------+-------+
|      5|9976800|
|      1|   5600|
|      3|   6500|
|      2|   5900|
|      4|   5200|
+-------+-------+



uniform_sample_skewed: org.apache.spark.sql.DataFrame = [city_id: bigint, res_sample: bigint]


In [14]:
uniform_sample_skewed.show()

+-------+----------+
|city_id|res_sample|
+-------+----------+
|      5|   9830000|
|      3|     10000|
|      4|     10000|
+-------+----------+



<br/> 
 <font size="4"> 
    - Some groups in the data are <b>missing</b>! Moreover, we obtain <b>high error</b> in the output result.
</font>
<br/> 
<br/> 

In [15]:
query_skewed.createOrReplaceTempView("query")
uniform_sample_skewed.createOrReplaceTempView("sample")

In [16]:
val error = spark.sql(s"""
            SELECT query.city_id, 
                   abs(res - res_sample)/res*100 AS error 
            FROM query 
            JOIN sample 
                ON query.city_id == sample.city_id
""")

error: org.apache.spark.sql.DataFrame = [city_id: bigint, error: double]


In [17]:
error.show()

+-------+------------------+
|city_id|             error|
+-------+------------------+
|      5|1.4714136797369899|
|      3| 53.84615384615385|
|      4|  92.3076923076923|
+-------+------------------+



<br/> 
 <font size="4"> 
    - Now we are going to take into account the groups in the data, we <b>groupBy</b> city to take at least some rows from each group.
</font>
<br/> 
<br/>

In [44]:
val stratified = spark.sql(s"""
                SELECT *, 
                       IF((count*0.01) >= 3,0.01, cast(1.0 as double)) AS p
                FROM 
                (
                    SELECT city_id, COUNT(*) AS count 
                    FROM users 
                    GROUP BY city_id
                    
                )""")

stratified: org.apache.spark.sql.DataFrame = [city_id: bigint, count: bigint ... 1 more field]


In [45]:
stratified.createOrReplaceTempView("stratified")

In [46]:
val users_sample = spark.sql(s"""
                SELECT user_id, users.city_id, p
                FROM users 
                JOIN stratified 
                    ON stratified.city_id == users.city_id 
                WHERE RAND() <= p""")



users_sample: org.apache.spark.sql.DataFrame = [user_id: int, city_id: bigint ... 1 more field]


In [47]:
users_sample.createOrReplaceTempView("users_sample")

The size of the sample is approximate

In [64]:
val sample_size = users_sample.count.toDouble/users_data_1.union(users_data_2).count


sample_size: Double = 0.01235


In [65]:
val distinct_sample = spark.sql(s"""
                SELECT users_sample.city_id, 
                       sum(cities.cash / p) AS res_sample
                FROM users_sample
                JOIN cities 
                    ON users_sample.city_id == cities.city_id
                GROUP BY users_sample.city_id""")

distinct_sample: org.apache.spark.sql.DataFrame = [city_id: bigint, res_sample: double]


In [66]:
distinct_sample.show()

+-------+----------+
|city_id|res_sample|
+-------+----------+
|      5|   1.003E7|
|      1|    5600.0|
|      3|    6500.0|
|      2|    5900.0|
|      4|    5200.0|
+-------+----------+



<br/> 
 <font size="4"> 
    - As you can see, we have now obtained a very <b>good result</b>. All the groups are represented and the output error is conceivable.
</font>
<br/> 
<br/>

In [67]:
distinct_sample.createOrReplaceTempView("sample")

In [68]:
val error = spark.sql(s"""
            SELECT query.city_id, 
                   abs(res - res_sample)/res*100 AS error 
            FROM query 
            JOIN sample 
                ON query.city_id == sample.city_id
""")

error: org.apache.spark.sql.DataFrame = [city_id: bigint, error: double]


In [69]:
error.show()

+-------+------------------+
|city_id|             error|
+-------+------------------+
|      5|0.5332371100954214|
|      1|               0.0|
|      3|               0.0|
|      2|               0.0|
|      4|               0.0|
+-------+------------------+



------
## **Universe sample**

<font size="3"> 
When two large tables are joined with a shared key, uniform sampling both the join inputs is not useful. Distinct sampling both the inputs has limited gains if the join keys have many columns and hence, many distinct values. Universe sample allows to sample the inputs of joins. It picks a p fraction of the values of the columns in a set (two tables of the same size). E.g. select all rows with module 500. 

</font> 

<br/>
<br/>
<img src="images/universe-sample.png" width="400" height="600">

In [70]:

(1 to 500000).map(x => User(x, x)).toDF().createOrReplaceTempView("users")
(1 to 500000).map(x => Cities(x, "a")).toDF().createOrReplaceTempView("cities")


In [71]:
val query = spark.sql(s"""
                    SELECT users.city_id, sum(cities.cash) AS res
                    FROM users 
                    JOIN cities 
                        ON users.city_id == cities.city_id
                    GROUP BY users.city_id""")


query: org.apache.spark.sql.DataFrame = [city_id: bigint, res: bigint]


In [72]:


val sampling_after_join = spark.sql(s"""
                    SELECT city_id,
                           sum(cash) AS res_sample
                    FROM (
                        SELECT users.city_id, cities.cash 
                        FROM users 
                        JOIN cities 
                            ON users.city_id == cities.city_id
                    ) TABLESAMPLE (1 PERCENT)
                    GROUP BY city_id
                    """)


sampling_after_join: org.apache.spark.sql.DataFrame = [city_id: bigint, res_sample: bigint]


In [73]:
%%time
query.show(5)

+-------+---+
|city_id|res|
+-------+---+
|     26|100|
|     29|100|
|    474|100|
|    964|100|
|   1677|100|
+-------+---+
only showing top 5 rows

Time: 10.25024151802063 seconds.



In [74]:
%%time
sampling_after_join.show(5)

+-------+----------+
|city_id|res_sample|
+-------+----------+
|  17043|       100|
|  29824|       100|
|  37261|       100|
|  38510|       100|
|  75411|       100|
+-------+----------+
only showing top 5 rows

Time: 9.633416175842285 seconds.



In [75]:
val users_sample = spark.sql(s"""SELECT * FROM users WHERE city_id % 500 == 0""")
users_sample.createOrReplaceTempView("users_sample")

val cities_sample = spark.sql(s"""SELECT * FROM cities WHERE city_id % 500 == 0""")
cities_sample.createOrReplaceTempView("cities_sample")


val universe_sample = spark.sql(s"""
                    SELECT users_sample.city_id,
                           sum(cities_sample.cash) AS res_sample
                    FROM users_sample 
                    JOIN cities_sample 
                        ON users_sample.city_id == cities_sample.city_id
                    GROUP BY users_sample.city_id""")

users_sample: org.apache.spark.sql.DataFrame = [user_id: int, city_id: bigint]
cities_sample: org.apache.spark.sql.DataFrame = [city_id: bigint, name: string ... 1 more field]
universe_sample: org.apache.spark.sql.DataFrame = [city_id: bigint, res_sample: bigint]


In [76]:
%%time
universe_sample.show(5)

+-------+----------+
|city_id|res_sample|
+-------+----------+
| 265000|       100|
| 294500|       100|
| 350000|       100|
| 372000|       100|
| 419000|       100|
+-------+----------+
only showing top 5 rows

Time: 1.7970502376556396 seconds.



<font size="3"> 
As you can see, there's a notable difference in the time of processing the join on one query and the other, and the same result is obtained (since the cash is the same on every user, but also if the column follows a normal distribution, it wouldn't alterate significantly the result). 

------
# **PROPOSAL:**

 <font size="3"> 

Right now, the spark sample implementation does not allow the logical operator to be pushed down in the Logical Plan Tree. The next Figure shows a simple Logical Plan Tree: 
</font>
<img src="images/wopushdown.png" width="300" height="600">
 
 <font size="3"> 
We want to integrate push down for the sampler operator. In this case the sampler would be performed at the largest table (user). 
</font>
 
<img src="images/pushdowned.png" width="300" height="600">


 <font size="3"> 
   
So far, we are discussing two possible implementations. We can directly implement the Microsoft approach, but we are considering a different approach that makes it easier for the user. We are calling it Sommelier. 
    
    
- The Microsoft way 

The user needs to take into account the weight of the rows when the sample is performed, if the query involves a count or a sum operation, we need to weight the result according to the sample precision. 
<br/>
    <br/>
```sql
    SELECT SUM(cities.cash)*weight
    FROM ( 
        SELECT * 
        FROM user, cities 
        WHERE user.city_id == cities.city_id 
    ) 
    TABLESAMPLE (1% TOLERANCE) as weight  
    GROUP BY (cities.city_id) 
    // Could also be TABLESAMPLE (1% PERCENT) as weight 
```
   <br/>

- Sommelier 

We want to make it easier for the user, in the Sommelier approach the user only needs to add the max error that can be allowed when using samples in the whole query. 

```sql
    SELECT SUM(cities.cash)
    FROM ( 
        SELECT * 
        FROM user, cities
        WHERE user.city_id == cities.city_id 
    )  
    GROUP BY (c.city_id) 
    TABLESAMPLE (1% TOLERANCE) 
```

------
# **OVERALL SCHEMA:**

 <font size="3"> 
Since this project is aiming to modify and/or extend the current implementation of Spark SQL, we want to provide an intuition on the schema.  Our first intention is to develop outside the spark code, since SparkSessionExtension exists. This particular class was launched in Spark 2.2, it is pluggable and extensible and his basic function is let the user add customized extensions to the Catalyst Query Optimizer. The supported customizations include: 

- Custom Parser 

- Custom Analysis Rules (Analyzer Rules and Check Analysis Rules) 

- Custom Logical Rules (Optimizer Rules) 

- Custom Spark Strategies (Planning Strategies) 

- External Catalog Listeners 
    

 


<img src="images/spark-custom.png" width="700" height="600">

 <font size="3"> 
We propose an extension of the following components: 

1. **Parser.**

First things first, we need to include some changes in the predicates to let the user try our new way of sampling. As seen in latest sections, this can appear as a new column w (which maybe doesn’t really need to be detected in the parser, but as a Catalyst Expression), or changing the TABLESAMPLE operator.  

Currently, the last one only supports ROWS and PERCENT as arguments, so the intention is to include a TOLERANCE clause. The detection of the new attribute is a must for triggering the subsequent processes. 

2. **New logical operators.**

In order to define different types of sampling, we need to include them in the Logical Plan to become more efficient detecting and optimizing them. The uniform one is already present on the tree as the basic Sample operator, which selects rows following a random distribution.  

Spark would choose between the Universe Sample and Distinct Sample  depending on the characteristics of the tables below and the type of the query the user specifies (use Distinct if they follow a more skewed distribution...) 

3. **Logical Optimization Rules.**

Some rules for the above new operators would improve the performance of the sampling by pushing down the sample operator in later stages of the query. We have to be able to detect the presence of Distinct, Universe and Uniform samples and have control over the Logical Plan tree.  

4. **Error Estimation**

Before each step of the whole process, we have to check that the user tolerance condition is maintained intact, meaning that a set of stats about the estimated error should be calculated. This would include attributes like selectivity, cardinality and so on.  

<img src="images/modification.png" width="700" height="600">