# "Principles of Real-Time Analytics Course"
> "Chapter 6 Querying"

- toc: false
- branch: master
- badges: true
- hide_binder_badge: true
- comments: false
- categories: [real-time, analytics, fastpages, jupyter]
- image: images/tb_Logo Navbar.png
- hide: false
- search_exclude: true


In [None]:
#collapse
#@title Mount your Google Drive to save and use local files
from google.colab import drive
drive.mount('/content/gdrive', force_remount=False)

% cd "/content/gdrive/My Drive/Colab Notebooks/Tinybird"

Mounted at /content/gdrive
/content/gdrive/My Drive/Colab Notebooks/Tinybird


In [None]:
#@title Install Tinybird CLI and your token
!pip install tinybird-cli -q -U

import os

if not os.path.isdir('./datasources'):
  !tb init

if not os.path.isfile('.tinyb'): 
  !tb auth

In [None]:
#@title Helper function
def write_text_to_file(filename, text):
  with open(filename, 'w') as f: f.write(text)

Notebook Resources

|Use         | Create     |
| :----------------- |:-------------|
|taxi.datasource|products_join_sku.datasource|
|events.datasource|ch_06_querying.pipe|
|products.datasource||

# Querying
Everything discussed so far is valuable but not enough to provide real value: only when you turn data into information can you start delivering real value. Here we will connect the dots with all the previous sections.

The key is understanding what happens when you write an SQL query. We are not just talking about understanding an “EXPLAIN ANALYZE” but also all the pragmatic things that you are interested in: understanding joins, denormalization, when to group, when to filter, etc. The concepts covered here are fundamental for your day-to-day work.


## How Analytics Databases Work
The two important things that analytics databases do to improve query times are
1. Parallelization at all levels
 - CPU: use different CPUs to process splitting the load using the map-reduce algorithms
 - Vectorization: use SSE extension to process more than one value at a time 
 - Machines: process data on several machines.
2. Approximate data structures: the best known is [HyperLogLog](https://en.wikipedia.org/wiki/HyperLogLog) that allows you to count the number of different items without using huge amounts of memory.



Analytics databases leverage these two characteristics to be as efficient as possible and do as many tricks as possible during the query planning stage.

Let’s see an example with a simple sum in a setup with several machines:

`SELECT sum(column) FROM table`

1. Send a sum to several machines
2. Each machine will split the table in several chunks and will raise many threads.
3. Every thread will process the sum of several chunks.
4. Inside every thread each core will use SSE instructions to add numbers if possible.
5. Every machine will collect the partial sum processed for every thread and send it to one of them.
6. That last machine sums everything and prints the final number.

When writing queries in common transactional databases like PostgreSQL, minor errors in queries are not generally a big deal. However, when working with several billion rows, one more single unnecessary operation per row could mean a lot of time.

In the next points, we are going to share some of our techniques and things we take into account to avoid inefficiencies and optimize our query performance.


## Query just the Data you Need
There is no faster query than the one that reads no data. This seems obvious but it’s really important. If you need three columns, just get three columns and use indexes. The less data you read the faster the query will be. This is valid for any database or data system.


## Data Order is Key
Remember that memory speed is faster if you have data in the same area.

If we try to execute a query over a table where the data we want is scattered all over the disk/memory then the computer is going to have a hard time getting all the data: 

- the OS will need to get a lot of pages from the disk into memory. Only part of the data will be useful so you will be reading more data than needed.
- memory , as we know, reaches its maximum throughput when you read data in batches, so you will not be using memory at full capacity.
- CPUs will be waiting for data.

So the key, and we will repeat this over and over again, is to have the data sorted so you get the max from disk, memory and CPU.


## Lightweight and Indexed Operations First (filters, prefilters)
This is also obvious but do leverage the indexes as much as possible. Try to use prefilters if possible when your filters are highly selective. Prefiltering allows you to discard a huge amount of data before fetching the rest of the table. Most databases optimize this by themselves but use it explicitly because they sometimes fail to optimize.


## Understanding Algorithmic Complexity
Not all the queries use the same algorithms. This may seem obvious too but it is very easy to forget.


### Do Less Complex Queries First
Taking algorithmic complexity into account, you should always start doing the less complex operations first in order to reduce the data amount as much as possible as soon as possible because the less data that reaches the complex algorithm, the better.



To understand this, let’s look at a Tinybird example comparing simple aggregation vs group (with different cardinality) vs quantiles on the New York City Taxi Trip dataset.

A simple filter is cheap:

In [None]:
!tb sql --stats "SELECT sum(trip_distance) FROM taxi WHERE passenger_count = 4"

[0m** Query took 0.04662201 seconds
** Rows read: 84,152,418
** Bytes read: 589.04 MB[0m
----------------------
| [1;32msum(trip_distance)[0m |
----------------------
| 5397081.2494365405 |
----------------------


Even cheaper if it uses some indexes. If you check the taxi data source, you can see that the field `tpep_pickup_datetime` is included in the sorting keys. This index drastically reduces the amount of data scanned.

In [None]:
!tb sql --stats "SELECT sum(trip_distance) FROM taxi WHERE tpep_pickup_datetime BETWEEN '2019-01-08 00:00:00' AND '2019-01-10 00:00:00'"

[0m** Query took 0.002169568 seconds
** Rows read: 565,040
** Bytes read: 4.52 MB[0m
----------------------
| [1;32msum(trip_distance)[0m |
----------------------
| 1350207.2199482508 |
----------------------


A group is less cheap:

In [None]:
!tb sql --stats "SELECT passenger_count, sum(trip_distance) FROM taxi GROUP BY passenger_count"

[0m** Query took 0.092038628 seconds
** Rows read: 84,152,418
** Bytes read: 589.07 MB[0m
----------------------------------------
| [1;32mpassenger_count[0m | [1;32msum(trip_distance)[0m |
----------------------------------------
|               0 |  4324380.059772689 |
|               7 | 1353.2999991159886 |
|               1 |  173897027.1861675 |
|               6 |  6165941.639370818 |
|               9 | 1178.8199969492853 |
|               2 |   40079166.5865672 |
|               5 |  10282229.68939986 |
|               8 | 1485.7999991793185 |
|               3 | 11034402.128915215 |
|               4 | 5397081.2494365405 |
----------------------------------------


But if we aggregate with more cardinality:

In [None]:
!tb sql --stats "SELECT pulocationid, sum(trip_distance) FROM taxi GROUP BY pulocationid LIMIT 10"

[0m** Query took 0.072901086 seconds
** Rows read: 84,152,418
** Bytes read: 673.22 MB[0m
-------------------------------------
| [1;32mpulocationid[0m | [1;32msum(trip_distance)[0m |
-------------------------------------
|           55 |  21302.74000561051 |
|           69 |  33956.49999191426 |
|          211 | 1604516.2397562545 |
|          161 |  8128339.988580041 |
|          250 | 12550.219992961735 |
|          136 | 18966.369995670393 |
|           30 |  300.3599983230233 |
|          108 |  8479.139993175864 |
|          168 |  63242.28003138304 |
|          218 |   27640.2600096222 |
-------------------------------------


Aggregation functions vary in complexity: 
 - `sum`, `avg`, `count`` are cheap
 - `quantiles`, `uniq` are more expensive.

In [None]:
!tb sql --stats "SELECT sum(trip_distance) FROM taxi"

[0m** Query took 0.027133523 seconds
** Rows read: 84,152,418
** Bytes read: 336.61 MB[0m
----------------------
| [1;32msum(trip_distance)[0m |
----------------------
| 251184246.45962504 |
----------------------


In [None]:
!tb sql --stats "SELECT avg(trip_distance) FROM taxi"

[0m** Query took 0.022843246 seconds
** Rows read: 84,152,418
** Bytes read: 336.61 MB[0m
----------------------
| [1;32mavg(trip_distance)[0m |
----------------------
|   2.98487259700161 |
----------------------


In [None]:
!tb sql --stats "SELECT count(trip_distance) FROM taxi"

[0m** Query took 0.022469396 seconds
** Rows read: 84,152,418
** Bytes read: 336.61 MB[0m
------------------------
| [1;32mcount(trip_distance)[0m |
------------------------
|             84152418 |
------------------------


In [None]:
!tb sql --stats "SELECT quantiles(0.99)(trip_distance) FROM taxi"

[0m** Query took 0.049346474 seconds
** Rows read: 84,152,418
** Bytes read: 336.61 MB[0m
----------------------------------
| [1;32mquantiles(0.99)(trip_distance)[0m |
----------------------------------
| [19.22089933395386]            |
----------------------------------


In [None]:
!tb sql --stats "SELECT uniq(trip_distance) FROM taxi"

[0m** Query took 0.057897964 seconds
** Rows read: 84,152,418
** Bytes read: 336.61 MB[0m
-----------------------
| [1;32muniq(trip_distance)[0m |
-----------------------
|                7317 |
-----------------------


The `sum` takes less than half of the time of `quantiles` processing the same amount of data.

### Formula for the Time a Query Takes
To make an estimation of the time a query is going to take, it is good to have a formula like this in the head:
```
data_size * compression_factor * main_memory_speed 
+ data_decompression_factor/cores
+ rows*K*simple_data_operations/cores
+ rows*K2*complex_data_operations/cores  
+ complex_data_operations*log(rows) 
```
The formula is not correct, things are more complex than that but the important concept here is on which factors the query time depends the most:
- the scanned memory
- the time it takes a CPU to decompress the data / cores
- the number of rows
- the number of simple operations
- the number of complex operations (agg, joins).

The message is to use as little data as possible and the simplest possible operations.


## Use Joins Instead of Denormalizing
People generally say that joins are slow and in some places joins are forbidden. For example, some banks avoid joins as much as possible in their legacy databases to avoid overloading them.

However, sometimes you can’t avoid doing joins. If done in the right way they are not so slow. They may even sometimes be better because they simplify ingestion.



We talked previously about events and dimensions tables. The combination of those tables to achieve denormalization is one of the most common uses for joins:

`SELECT a, b, c, d.e, d.f FROM events JOIN dimension AS d USING a`

To avoid this kind of operation, one possibility is to do it at insertion time and put the result in another table (more on this in [Best Practices for Views](https://colab.research.google.com/github/AlisonJD/RTACourse/blob/main/07_Best_Practices_for_Views.ipynb)). This way, that final table contains the `e` and `f` columns (from the dimension table) and you don’t need to run an expensive join.

However, denormalization is not always possible; sometimes dimension values change over time and you can’t afford to update all the values in the main table.

### How You Should Do Joins

#### Tip 1: Join on a Single Column using Equality
Try to join using just one column and always doing equal comparisons. This will allow you to use hash join (or whatever algorithm the database of your choice uses to do fast joins).

`SELECT a, b, c, d FROM events 
JOIN products USING a`

Hash tables in ClickHouse are optimised for these sorts of operations.


#### Tip 2: Join After Filtering
Bad:
`SELECT a, b, c, d FROM events 
JOIN dimensions USING a WHERE b > 10`

For example, using Tinybird's events and products datasets.

In [None]:
!tb sql --stats "SELECT date, product_id as sku, user_id, event \
FROM events JOIN products USING sku WHERE user_id > 800000" | head -n 9

** Query took 1.450686627 seconds
** Rows read: 3,047,365
** Bytes read: 155.92 MB
-----------------------------------------
date: 2020-08-17 23:32:47
sku: 6c4aed5e-1aaa-11eb-b677-acde48001122
user_id: 919740
event: remove_item_from_cart
-----------------------------------------



Better:
```
SELECT a, b, c, d 
FROM (SELECT a, b, c, d FROM events 
WHERE b > 10)
JOIN dimensions 
USING a
```

In [None]:
!tb sql --stats "SELECT date, product_id AS sku, user_id, event \
FROM (SELECT date, product_id, user_id, event FROM events WHERE user_id> 800000) \
JOIN products USING sku" | head -n 9

** Query took 1.924532513 seconds
** Rows read: 2,899,909
** Bytes read: 144.72 MB
-----------------------------------------
date: 2019-01-01 00:00:08
sku: 6c1a3ae2-1aaa-11eb-8595-acde48001122
user_id: 974346
event: add_item_to_cart
-----------------------------------------


Another example, this time using the `taxi` data:

In [None]:
!tb sql --stats "SELECT passenger_count, trip_distance, ratecodeid, pulocationid, dolocationid, borough as puborough \
FROM taxi \
ANY LEFT JOIN taxi__zone_lookup \
ON (pulocationid = locationid) \
WHERE borough = 'Queens'" | head -n 11

** Query took 0.010661189 seconds
** Rows read: 2,161,667
** Bytes read: 38.91 MB
--------------------
passenger_count: 1
trip_distance: 16.28
ratecodeid: 1
pulocationid: 132
dolocationid: 123
puborough: Queens
--------------------


In [None]:
!tb sql --stats "SELECT passenger_count, trip_distance, ratecodeid, pulocationid, dolocationid, borough as puborough \
FROM taxi \
ANY LEFT JOIN (SELECT locationid, borough FROM taxi__zone_lookup WHERE borough = 'Queens') \
ON (pulocationid = locationid)"  | head -n 11

** Query took 0.01251338 seconds
** Rows read: 2,358,633
** Bytes read: 42.46 MB
--------------------
passenger_count: 1
trip_distance: 1.21
ratecodeid: 1
pulocationid: 186
dolocationid: 161
puborough: 
--------------------


Databases usually optimize this by themselves (hence these times may be very similar) but if the query is complex the query analyzer may not detect it, so it’s better to make these prefilters explicit.

#### Tip 3: Collapse Columns Before Join
If you have to join by several columns, try to collapse them.

Bad:
```
SELECT a, b, c, d FROM events 
JOIN dimensions 
USING a, b
```

Better:
```
SELECT a, b, a_and_b_concatenated c, d FROM events 
JOIN dimensions 
USING a_and_b_concatenated`
```

Create a destination Data Source for the materialized view.

In [None]:
filename="datasources/taxi_filter_mv.datasource"
text='''
SCHEMA >
    `tpep_pickup_datetime` DateTime,
    `tpep_dropoff_datetime` DateTime,
    `passenger_count` Nullable(Int16),
    `trip_distance` Float32,
    `ratecodeid` Nullable(Int16),
    `pulocationid` Int32,
    `dolocationid` Int32,
    `key1` String,
    `key2` String

ENGINE MergeTree
ENGINE_SORTING_KEY tpep_pickup_datetime
'''

write_text_to_file(filename, text)

In [None]:
!tb push datasources/taxi_filter_mv.datasource

[0m** Processing datasources/taxi_filter_mv.datasource[0m
[0m** Building dependencies[0m
[0m** Running taxi_filter_mv [0m
[92m** 'taxi_filter_mv' created[0m
[0m** Not pushing fixtures[0m


In [None]:
filename = 'pipes/taxi_filtered.pipe'
text = '''
DESCRIPTION >
  get taxi data from Feb 2019 to April 2019 and precalculate join keys from concatenated columns

NODE taxi_filter
DESCRIPTION >
    get taxi data from Feb 2019 to April 2019 and precalculate join keys
SQL >
SELECT 
  tpep_pickup_datetime,
  tpep_dropoff_datetime,
  passenger_count,
  trip_distance,
  ratecodeid,
  pulocationid,
  dolocationid,
  concat(toString(pulocationid), toString(dolocationid), toString(passenger_count), toString(ratecodeid)) as key1,
  concat(toString(dolocationid), toString(pulocationid), toString(passenger_count), toString(ratecodeid)) as key2
FROM taxi
WHERE toDate(tpep_pickup_datetime) BETWEEN '2019-01-02' AND '2019-01-04'

TYPE materialized
DATASOURCE taxi_filter_mv
'''

write_text_to_file(filename, text)

In [None]:
!tb push 'pipes/taxi_filtered.pipe'  --populate --force

[0m** Processing pipes/taxi_filtered.pipe[0m
[0m** Building dependencies[0m
[0m** Running taxi_filtered [0m
[0m** Materialized node 'taxi_filter' using the Data Source 'taxi_filter_mv'[0m
[0m** Populating job url https://api.tinybird.co/v0/jobs/914eab47-fa66-4d78-b416-506ba78b4e29[0m
[92m** 'taxi_filtered' created[0m
[0m** Not pushing fixtures[0m


In [None]:
!tb sql --stats --format csv "SELECT passenger_count, trip_distance, ratecodeid, t1.key1, t1.key2, t2.key1, t2.key2 \
FROM taxi_filtered t1 \
JOIN  taxi_filtered t2 \
ON ( \
   t1.pulocationid = t2.dolocationid \
  and t2.pulocationid = t1.dolocationid \
  and t1.passenger_count = t2.passenger_count \
  and t1.ratecodeid = t2.ratecodeid \
)" | head -n 5

** Query took 1.894542963 seconds
** Rows read: 729,513
** Bytes read: 34.73 MB
"passenger_count","trip_distance","ratecodeid","key1","key2","t2.key1","t2.key2"
1,2.3,1,"14825511","25514811","25514811","14825511"


In [None]:
!tb sql --stats --format csv "SELECT passenger_count, trip_distance, ratecodeid, t1.key1, t1.key2, t2.key1, t2.key2 \
FROM taxi_filtered t1 \
JOIN  taxi_filtered t2 \
ON (t1.key1 = t2.key2) \
" | head -n 5

** Query took 1.458210793 seconds
** Rows read: 754,089
** Bytes read: 26.02 MB
"passenger_count","trip_distance","ratecodeid","key1","key2","t2.key1","t2.key2"
1,2.3,1,"14825511","25514811","25514811","14825511"


#### Tip 4: Use Hash Access Join

If your database supports hash access to dimensions tables, use it.

Bad:
```
SELECT a, b, c, d FROM events 
JOIN dimensions 
USING a 
```

Better: 
```
SELECT a, b, c, joinGet(dimensions, d, a) AS d FROM events
````

Create a join table with a Join engine (join table data is always located in the RAM so only keep the columns you need).

In [None]:
filename="datasources/products_join_sku.datasource"
text='''
DESCRIPTION join engine added to the generated datasource
SCHEMA >
    `sku` String,
    `color` String,
    `section_id` Int16,
    `title` String

ENGINE "Join"
ENGINE_JOIN_STRICTNESS "ANY"
ENGINE_JOIN_TYPE "LEFT"
ENGINE_KEY_COLUMNS "sku"
'''

write_text_to_file(filename, text)

In [None]:
!tb push ./datasources/products_join_sku.datasource

[0m** Processing ./datasources/products_join_sku.datasource[0m
[0m** Building dependencies[0m
[0m** Running products_join_sku [0m
[92m** 'products_join_sku' created[0m
[0m** Not pushing fixtures[0m


In [None]:
!tb datasource append products_join_sku https://storage.googleapis.com/tinybird-assets/datasets/guides/products_1.csv

[0m** 🥚 starting import process[0m
[92m** 🐥 done[0m
[92m** Appended 1200000 new rows[0m
[92m** Total rows in products_join_sku: None[0m
[92m** Data appended to Data Source 'products_join_sku' successfully![0m
[0m** Data pushed to products_join_sku[0m


In [None]:
!tb datasource append products_join_sku https://storage.googleapis.com/tinybird-assets/datasets/guides/products_2.csv

[0m** 🥚 starting import process[0m
[92m** 🐥 done[0m
[92m** Appended 1241156 new rows[0m
[92m** Total rows in products_join_sku: None[0m
[92m** Data appended to Data Source 'products_join_sku' successfully![0m
[0m** Data pushed to products_join_sku[0m


In [None]:
!tb sql --stats "SELECT date, product_id, user_id, event, products.color \
FROM events ANY LEFT JOIN products ON product_id=sku" | head -n 10

** Query took 1.32154823 seconds
** Rows read: 3,227,589
** Bytes read: 213.74 MB
------------------------------------------------
date: 2019-05-24 15:38:48
product_id: 66c9ec22-1aaa-11eb-bac0-acde48001122
user_id: 699482
event: search
color: orchid2
------------------------------------------------


In [None]:
!tb sql --stats "SELECT date, product_id, user_id, event, joinGet(products_join_sku, 'color', product_id) AS color \
FROM events" | head -n 10

** Query took 0.00666804 seconds
** Rows read: 32,768
** Bytes read: 2.49 MB
------------------------------------------------
date: 2016-01-19 03:15:48
product_id: 6a3915a4-1aaa-11eb-8182-acde48001122
user_id: 103334
event: remove_item_from_cart
color: chartreuse4
------------------------------------------------


In ClickHouse joinGet is a better way to join; using a join table that uses a Join engine.

Here's another example, using the `taxi` data.

In [None]:
!tb sql --stats "SELECT borough, pulocationid, tpep_pickup_datetime, tpep_dropoff_datetime \
FROM taxi ANY LEFT JOIN taxi__zone_lookup \
ON (locationid = pulocationid)" | head -n 10

** Query took 0.011364939 seconds
** Rows read: 3,144,241
** Bytes read: 37.73 MB
---------------------------------------------------------------------------
| borough   | pulocationid | tpep_pickup_datetime | tpep_dropoff_datetime |
---------------------------------------------------------------------------
| Manhattan |          234 | 2019-03-01 23:39:52  | 2019-03-02 00:11:33   |
| Manhattan |           79 | 2019-03-01 23:48:11  | 2019-03-02 00:01:27   |
| Manhattan |          211 | 2019-03-01 23:53:45  | 2019-03-02 00:21:40   |
| Manhattan |          141 | 2019-03-02 00:01:10  | 2019-03-02 00:01:10   |


In [None]:
!tb sql --stats "SELECT joinGet('taxi__zone_lookup', 'borough', pulocationid) as borough, pulocationid, tpep_pickup_datetime, tpep_dropoff_datetime \
FROM taxi" | head -n 10

** Query took 0.003973199 seconds
** Rows read: 131,010
** Bytes read: 1.57 MB
---------------------------------------------------------------------------
| borough   | pulocationid | tpep_pickup_datetime | tpep_dropoff_datetime |
---------------------------------------------------------------------------
| Manhattan |           87 | 2019-06-01 00:00:03  | 2019-06-01 00:10:35   |
| Manhattan |           79 | 2019-06-01 00:00:04  | 2019-06-01 00:20:19   |
| Manhattan |          137 | 2019-06-01 00:00:05  | 2019-06-01 00:15:21   |
| Manhattan |          107 | 2019-06-01 00:00:08  | 2019-06-01 00:08:42   |


#### Tip 5: Use Group Array Join

Use `goupArray` and `arrayJoin` to reduce the number of keys in a join to speed up the join.

```
ClickHouse
:) SET max_threads = 4

:) 

CREATE TABLE t_left
ENGINE = MergeTree
ORDER BY a AS
SELECT number AS a
FROM numbers(1000000)

:) 

SELECT *
FROM t_left
LIMIT 5

┌─a─┐
│ 0 │
│ 1 │
│ 2 │
│ 3 │
│ 4 │
└───┘

:) 

CREATE TABLE t_right
ENGINE = MergeTree
ORDER BY a AS
SELECT
    toUInt64(number / 100) AS a,
    number AS b
FROM numbers(1000000 * 100)

:) SELECT * FROM t_right LIMIT 5

SELECT *
FROM t_right
LIMIT 5

┌─a─┬─b─┐
│ 0 │ 0 │
│ 0 │ 1 │
│ 0 │ 2 │
│ 0 │ 3 │
│ 0 │ 4 │
└───┴───┘

:) SELECT '** regular join'

┌─'** regular join'─┐
│ ** regular join   │
└───────────────────┘

:) 

SELECT *
FROM t_left
ALL LEFT JOIN t_right USING (a)
FORMAT Null

0 rows in set. Elapsed: 1.127 sec. Processed 101.00 million rows, 1.61 GB (89.62 million rows/s., 1.43 GB/s.)

:) SELECT '** array join'

┌─'** array join'─┐
│ ** array join   │
└─────────────────┘

:) 

SELECT
    a,
    arrayJoin(b)
FROM t_left
ALL LEFT JOIN
(
    SELECT
        a,
        groupArray(b) AS b
    FROM t_right
    GROUP BY a
) AS __ USING (a)
FORMAT Null

0 rows in set. Elapsed: 0.754 sec. Processed 101.00 million rows, 1.61 GB (133.93 million rows/s., 2.13 GB/s.)
```



Using the `taxi` data for a more complex query in Tinybird:

In [None]:
!tb sql --stats "SELECT \
  tpep_dropoff_datetime, \
  dolocationid, \
  b.dolocationid next_location, \
  b.tpep_pickup_datetime next_time \
FROM ( \
  SELECT * \
  FROM taxi \
  WHERE toDate(tpep_dropoff_datetime) IN ('2019-01-02', '2019-01-04') \
) a \
ALL LEFT JOIN ( \
  SELECT pulocationid, dolocationid, tpep_pickup_datetime \
  FROM taxi \
  WHERE toDate(tpep_pickup_datetime) IN ('2019-01-02', '2019-01-04') \
) b \
ON (a.dolocationid = b.pulocationid) \
WHERE tpep_dropoff_datetime BETWEEN next_time - INTERVAL 10 MINUTE AND next_time" | head -n 10

** Query took 0.146963303 seconds
** Rows read: 88,501,255
** Bytes read: 359.05 MB
------------------------------------------------------------------------------
| tpep_dropoff_datetime | dolocationid | next_location | next_time           |
------------------------------------------------------------------------------
| 2019-01-02 00:00:00   |          223 |           263 | 2019-01-02 00:06:42 |
| 2019-01-02 00:00:00   |          223 |             7 | 2019-01-02 00:09:56 |
| 2019-01-02 00:09:29   |          116 |           151 | 2019-01-02 00:17:41 |
| 2019-01-02 00:08:41   |          211 |            68 | 2019-01-02 00:14:48 |


In [None]:
!tb sql --stats "SELECT \
  tpep_dropoff_datetime, \
  dolocationid, \
  next_locations.1 as next_location, \
  next_locations.2 as next_time \
FROM ( \
  SELECT \
    tpep_dropoff_datetime, \
    dolocationid, \
    arrayJoin(b.next_locations) as next_locations \
  FROM ( \
    SELECT * \
    FROM taxi \
    WHERE toDate(tpep_dropoff_datetime) IN ('2019-01-02', '2019-01-04') \
  ) a \
  ALL LEFT JOIN ( \
    SELECT pulocationid, groupArray((dolocationid, tpep_pickup_datetime)) next_locations \
    FROM taxi \
    WHERE toDate(tpep_pickup_datetime) IN ('2019-01-02', '2019-01-04') \
    GROUP BY pulocationid \
  ) b \
  ON (a.dolocationid = b.pulocationid) \
) \
WHERE tpep_dropoff_datetime BETWEEN next_time - INTERVAL 10 MINUTE AND next_time" | head -n 10

** Query took 0.095593983 seconds
** Rows read: 88,275,968
** Bytes read: 358.15 MB
------------------------------------------------------------------------------
| tpep_dropoff_datetime | dolocationid | next_location | next_time           |
------------------------------------------------------------------------------
| 2019-01-02 00:00:00   |          223 |           263 | 2019-01-02 00:06:42 |
| 2019-01-02 00:00:00   |          223 |             7 | 2019-01-02 00:09:56 |
| 2019-01-02 00:09:29   |          116 |           151 | 2019-01-02 00:17:41 |
| 2019-01-02 00:08:41   |          211 |           244 | 2019-01-02 00:08:47 |


####Tip 6: Test Grouping Before or After a Join
Sometimes joins work pretty well. Let’s say we have a high cardinality column and we need to group by it. Grouping is an expensive operation, especially with high cardinality, so if a join reduces the cardinality it’s sometimes worth doing it before the join.

Depending on the granularity, the grouping algorithm you are using and how your table is sorted it may happen the other way round, that grouping before joining is faster. When this is not clear, the best approach is to test which performs better.


#### Tip 7: Use IN (subquery) Instead of JOIN for Filtering

Bad:
```
SELECT a, b, c, d FROM events 
JOIN dimensions 
USING a
```

Better:
```
SELECT a, b, c, d FROM events
WHERE a IN (SELECT a FROM dimensions)
```


In [None]:
!tb sql --stats "SELECT date, product_id AS sku, user_id, event FROM events JOIN products USING sku" | head -n 9

** Query took 1.447204927 seconds
** Rows read: 3,227,589
** Bytes read: 169.62 MB
-----------------------------------------
date: 2020-05-30 13:05:06
sku: 6b2d0cc2-1aaa-11eb-b24d-acde48001122
user_id: 785598
event: view
-----------------------------------------


In [None]:
!tb sql --stats "SELECT date, product_id AS sku, user_id, event FROM events WHERE sku IN (SELECT sku FROM products)" | head -n 9

** Query took 1.066458252 seconds
** Rows read: 2,473,924
** Bytes read: 112.34 MB
-----------------------------------------
date: 2019-01-01 00:00:24
sku: 6763c806-1aaa-11eb-abbd-acde48001122
user_id: 523730
event: search
-----------------------------------------


#### Tip 8: Avoid Distributed Joins. 
It is better to have a copy of the data you want to join (the dimensions tables) in each machine.

## Parallelization Doesn't Always Help
Let's see how two different queries work in a different way regarding parallelization, using a ClickHouse table with 50 M rows of 500 000 different items. The first ranking query is complex:

```
SELECT a, avg(b) c FROM parallel_stuff 
GROUP BY a 
ORDER BY c DESC 
LIMIT 10
```

The second query is simpler:

`SELECT avg(a) d, avg(b) c FROM parallel_stuff`


This is how the table was created:

```
ClickHouse
:) 

CREATE TABLE parallel_stuff
(
    `a` Int32,
    `b` Int32
)
ENGINE = MergeTree
ORDER BY a

:) 

INSERT INTO parallel_stuff SELECT
    rand() % 500000,
    rand() % 1000000
FROM numbers(50000000)
```

|max_threads|Complex query time (secs.)|Simpler query time (secs.)
| :-------------:|-------------:| --------:|
| 1| 0.529|0.224|
| 2| 0.303|0.130|
| 3| 0.203|0.105|
| 4| 0.198|0.088|
| 5| 0.185|0.080|
| 6| 0.170|0.050|



For the ranking query the decrease in time is not linear, so it does not matter how many cores we add, speed will not improve significantly.

For the simpler query, the decrease is almost linear, so adding more cores will increase the speed (until memory bandwidth reaches its limit).

That’s also why in different use cases it is worth using a more complex compression algorithm.

## Use the Right Algorithm and the Best Function for the Job

Generally, OLAP databases use probabilistic methodologies for many functionalities and algorithms. Depending on the data structure used to store the data, some functions could provide approximations faster than exact results. The error ratio for these functions is usually specified in the database documentation, as well as advice about when is more or less convenient to use them.

`Uniq` vs `UniqExact` is a good example. Both do the same thing but for one small detail: `uniq` is not exact, under some conditions it returns results with an error, whereas `uniqExact` returns the correct value. The downside of `uniqExact` is that it usually takes orders of magnitude more time to run. 

In [None]:
!tb sql --stats "SELECT uniq(user_id) FROM events"

[0m** Query took 0.119051723 seconds
** Rows read: 100,000,000
** Bytes read: 800 MB[0m
-----------------
| [1;32muniq(user_id)[0m |
-----------------
|       1001943 |
-----------------


In [None]:
!tb sql --stats "SELECT uniqExact(user_id) FROM events"

[0m** Query took 0.874007401 seconds
** Rows read: 100,000,000
** Bytes read: 800 MB[0m
----------------------
| [1;32muniqExact(user_id)[0m |
----------------------
|            1000000 |
----------------------


Choosing the right algorithm means understanding the business case and deciding how much accuracy against speed the final user needs. Sometimes you really need exact values but most of the time a good approximation is good enough. Note that you need to know when that error is low because otherwise you return a misleading fast result. Examples of this include `uniq`,`quantiles` and `topK`.

## Key Points for Queries
 - Work with ordered data (not data that is scattered all around the memory).
 - Understand how your database accesses data and uses parallelization.
 - Query as little data as possible and as few bytes as possible; leverage indexes and filters.
 - All databases have mechanisms for understanding query complexity and how the query is executed. Understand how your database works so that you can optimise your queries.


## Pipe for Querying

In [None]:
filename = 'pipes/ch_06_querying.pipe'
text = '''
DESCRIPTION >
  the queries from the notebook 06_Querying

NODE simple_filter
DESCRIPTION >
    A simple filter is cheap
SQL >
    SELECT sum(trip_distance) 
    FROM taxi 
    WHERE passenger_count = 4

NODE filter_with_index
DESCRIPTION >
    Even cheaper if it uses some indexes
SQL > 
    SELECT sum(trip_distance) 
    FROM taxi 
    WHERE tpep_pickup_datetime 
    BETWEEN '2019-01-08 00:00:00' AND '2019-01-10 00:00:00'

NODE group
DESCRIPTION >
A group is less cheap
SQL >
    SELECT passenger_count, sum(trip_distance) 
    FROM taxi 
    GROUP BY passenger_count

NODE group_more_cardinality
DESCRIPTION >
  But if we aggregate with more cardinality
SQL >
    SELECT pulocationid, sum(trip_distance) 
    FROM taxi 
    GROUP BY pulocationid 

NODE sum
DESCRIPTION >
  Sum is cheap
SQL >
      SELECT sum(trip_distance)
      FROM taxi

NODE average
DESCRIPTION >
  Average is cheap
SQL >
    SELECT avg(trip_distance)
    FROM taxi

NODE count
DESCRIPTION >
  Count is cheap
SQL >
    SELECT count(trip_distance) 
    FROM taxi

NODE quantiles
DESCRIPTION >
  Quantiles are more expensive
SQL >
    SELECT quantiles(0.99)(trip_distance) 
    FROM taxi

NODE uniq
DESCRIPTION >
  Uniq is more expensive
SQL >
    SELECT uniq(trip_distance)
    FROM taxi

NODE single_record
DESCRIPTION >
  sample record from events
SQL >
    SELECT * 
    FROM events
    LIMIT 1

NODE filter_string
DESCRIPTION >
  filter on a string column
SQL >  
    SELECT count()
    FROM events
    WHERE position(event, 'item') > 0

NODE filter_integer
DESCRIPTION >
  filter on a integer column
SQL >  
    SELECT count()
    FROM events
    WHERE user_id > 500000

NODE join_before_filtering
DESCRIPTION >
  Expensive join
SQL >
    SELECT date, product_id as sku, user_id, event 
    FROM events JOIN products 
    USING sku 
    WHERE user_id > 900000

NODE join_after_filtering
DESCRIPTION >
  Cheaper join
SQL >
    SELECT date, product_id 
    AS sku, user_id, event 
    FROM (SELECT date, product_id, user_id, event FROM events WHERE user_id>900000) 
    JOIN products 
    USING sku

NODE without_collapsed_columns
DESCRIPTION >
    using original columns
SQL >
    SELECT passenger_count, trip_distance, ratecodeid, t1.key1, t1.key2, t2.key1, t2.key2
    FROM taxi_filtered t1
    JOIN  taxi_filtered t2
    ON (
      t1.pulocationid = t2.dolocationid
      and t2.pulocationid = t1.dolocationid
      and t1.passenger_count = t2.passenger_count
      and t1.ratecodeid = t2.ratecodeid
    )

NODE collapse_columns
DESCRIPTION >
    concatenate columns before doing join
SQL >
    SELECT passenger_count, trip_distance, ratecodeid, t1.key1, t1.key2, t2.key1, t2.key2
    FROM taxi_filtered t1
    JOIN  taxi_filtered t2
    ON (t1.key1 = t2.key2)

NODE without_joinGet
DESCRIPTION >
  Without using hash join
SQL >
    SELECT date, product_id, user_id, event, products.color 
    FROM events 
    ANY LEFT JOIN products O
    ON product_id=sku

NODE with_joinGet
DESCRIPTION >
  With hash join
SQL >
    SELECT date, product_id, user_id, event, joinGet(products_join_sku, 'color', product_id) 
    AS color 
    FROM events

NODE taxi_without_joinGet
DESCRIPTION >
  Without using hash join
SQL >
    SELECT borough, pulocationid, tpep_pickup_datetime, tpep_dropoff_datetime 
    FROM taxi ANY LEFT JOIN taxi__zone_lookup 
    ON (locationid = pulocationid)

NODE taxi_with_joinGet
DESCRIPTION >
  With hash join
SQL >
    SELECT joinGet('taxi__zone_lookup', 'borough', pulocationid) as borough, pulocationid, tpep_pickup_datetime, tpep_dropoff_datetime
    FROM taxi

NODE taxi_without_Group_Array
DESCRIPTION >
  Without using group array for the join
SQL >
    SELECT 
    tpep_dropoff_datetime, 
    dolocationid, 
    b.dolocationid next_location, 
    b.tpep_pickup_datetime next_time 
    FROM ( 
      SELECT * 
      FROM taxi 
      WHERE toDate(tpep_dropoff_datetime) IN ('2019-01-02', '2019-01-04') 
      ) a 
    ALL LEFT JOIN ( 
    SELECT pulocationid, dolocationid, tpep_pickup_datetime 
    FROM taxi 
    WHERE toDate(tpep_pickup_datetime) IN ('2019-01-02', '2019-01-04') 
    ) b 
    ON (a.dolocationid = b.pulocationid) 
    WHERE tpep_dropoff_datetime BETWEEN next_time - INTERVAL 10 MINUTE AND next_time

NODE taxi_with_Group_Array
DESCRIPTION >
  Using group array for the join
SQL >  
    SELECT 
      tpep_dropoff_datetime, 
      dolocationid, 
      next_locations.1 as next_location, 
      next_locations.2 as next_time 
    FROM ( 
      SELECT 
        tpep_dropoff_datetime, 
        dolocationid, 
        arrayJoin(b.next_locations) as next_locations 
      FROM ( 
        SELECT * 
        FROM taxi 
      WHERE toDate(tpep_dropoff_datetime) IN ('2019-01-02', '2019-01-04') 
      ) a 
    ALL LEFT JOIN ( 
      SELECT pulocationid, groupArray((dolocationid, tpep_pickup_datetime)) next_locations 
      FROM taxi 
      WHERE toDate(tpep_pickup_datetime) IN ('2019-01-02', '2019-01-04') 
      GROUP BY pulocationid 
      ) b 
    ON (a.dolocationid = b.pulocationid) 
    ) 
    WHERE tpep_dropoff_datetime BETWEEN next_time - INTERVAL 10 MINUTE AND next_time

NODE without_IN_(subquery)
DESCRIPTION >
    Without IN (subquery) 
SQL >
    SELECT date, product_id AS sku, user_id, event 
    FROM events 
    JOIN products 
    USING sku

NODE with_IN_(subquery)
DESCRIPTION >
    With IN (subquery) 
SQL >
    SELECT date, product_id AS sku, user_id, event 
    FROM events 
    WHERE sku IN (SELECT sku FROM products)

NODE with_uniq
DESCRIPTION >
    With uniq (approximate)
SQL >
    SELECT uniq(user_id) 
    FROM events

NODE with_uniqExact
DESCRIPTION >
    With uniqExact
SQL >
    SELECT uniqExact(user_id) 
    FROM events
'''

write_text_to_file(filename, text)

In [None]:
! tb push 'pipes/ch_06_querying.pipe' --force --no-check

[0m** Processing pipes/ch_06_querying.pipe[0m
[0m** Building dependencies[0m
[0m** Running ch_06_querying [0m
[92m** => Test endpoint at https://api.tinybird.co/v0/pipes/ch_06_querying.json[0m
[92m** 'ch_06_querying' created[0m
[0m** Not pushing fixtures[0m


## [Course Outline](https://colab.research.google.com/github/AlisonJD/RTACourse/blob/main/01_Getting_Started.ipynb)

|Previous Notebook       |Next Notebook|
| :----------------- |:-------------|
|5. [Data Storage Inside the Analytics Database](https://colab.research.google.com/github/AlisonJD/RTACourse/blob/main/05_Data_Storage_Inside_the_Analytics_Database.ipynb)|7. [Best Practices for Views](https://colab.research.google.com/github/AlisonJD/RTACourse/blob/main/07_Best_Practices_for_Views.ipynb)|