# 使用timescaledb支持时序数据库

[timescaledb](https://docs.timescale.com/timescaledb/latest/)是一个开源的pg时序数据库插件,它的主要特性是:

1. 专用的表类型`超级表(hyper table)`和`分布式超级表(distributed hypertable)`用于专门处理时序数据
2. 专门针对时序数据的分表方案
3. 专门针对时序数据的冷数据压缩和存储方案
4. 连续聚合函数用于处理流数据
5. 自动过期方案
6. 自定义定时任务

这个插件经过索引优化和并行处理后查询性能很高,完全可以满足多数场景下对时序数据库的期待.

In [2]:
-- connection: postgres://postgres:postgres@localhost:5433/test

In [3]:
-- autocommit: true

switched autocommit mode to True

## 创建超级表

`timescaledb`通过专用的超级表类型保存和管理时间序列数据.而超级表需要通过不普通表转化得到,转换函数为

```sql
create_hypertable(
<原表名>.
<时间保存的列>,
[partitioning_column=>分区列],
[number_partitions=>分区数(int)],
[chunk_time_interval=>数据块保存的时间范围,默认1周(INTERVAL),
[create_default_indexes=>是否默认用时间列和分区列构造索引(boolean)],
[if_not_exists=>已经存在同名超表来时不创建并且不报错(boolean)],
[partitioning_func=>指定用于判断数据分区的函数名],
[associated_schema_name=>指定超表所在的schema名],
[associated_table_prefix=>内部超表块名称的前缀,默认值为'_hyper'(text)],
[migrate_data=>设置是否将要转化的原表中的数据迁移到新表,默认为`FALSE`(boolean)],
[time_partitioning_func=>指定将不兼容的主时间列值转换为兼容值的函数]
```

主要要注意的点有如下几个:

1. `chunk_time_interval`应该根据宿主机的内存大小来估算控制,一般来说我们应该控制所有活跃的存储块加起来的内存占用不超过总内存的25%.如果开始没有设置好,我们可以使用函数`set_chunk_time_interval(hypertable, chunk_time_interval)`重新设置每张表的块大小
2. 尽量保持原表没有约束,虽然超级表也一定程度上支持约束,但没有约束更加简单好维护
3. 尽量用空表构造超级表,如果原表中已经有数据了,迁移数据会阻塞表,尽量避免.


### 设置数据过期

时间序列数据库的一大特点是数据有时效性,我们通常需要为数据设置一个过期,超过这个时间范围的数据就删除以减小存储成本.

我们可以通过如下函数对数据过期时间进行设置:

```sql
add_retention_policy(
    <超表名>, 
    <超时时间>,
    [if_not_exists=>当已经设置时不重复设置且不报错(boolean)]
)
```

这个命令实际上是设置了一个定时任务,在到了时间后就会删除超过指定时间范围的数据块.如果要重置过期时间,只能先使用

```sql
remove_retention_policy(
    <超表名>,
    [if_not_exists=>当未设置时不重复删除且不报错(boolean)]
)
```
删除任务,然后再调用`add_retention_policy`重新设置.

如果我们设置过期时已经有数据了,为了避免删除过期数据不干净,可以通过函数

```sql
drop_chunks(
    <超表名>,
    <older_than>,
    [newer_than=>要删除的与之相比过新时间点(interval)],
    [verbose=>是否打印删除了的块名(boolean)]
)
```

来手动删除超过限期的数据.

## 优化超表性能

时间序列数据库的效率主要体现在3个方面:

1. 数据的批写入效率,一次可以写入越多的数据(吞吐量越高)效率越高
2. 数据的查询效率,一次复杂查询时间越短效率越高
3. 数据的保存效率,同样硬盘空间存的数据越多效率越高.

pg是给多少资源就能做多少事的数据库,在cpu和内存相同的情况下要优化超表的性能基本只有如下几个方面:

1. 通过数据分层分配存储介质提高io和查询效率
2. 通过压缩提高存储效率
3. 通过合理创建索引提高查询效率


### 数据分层

数据分层的最简单理解就是把热数据放在高速硬盘(ssd),把温数据放在低速硬盘(HDD).如何做到呢?就是利用pg的[TABLESPACE](http://postgres.cn/docs/12/sql-createtablespace.html).


我们应该将pg默认部署在ssd上,然后再在hdd上开辟一块空间给旧数据使用.我们使用的docker的standalone模式部署,可以很简单通过bind不同路径到容器内实现,比如

```yaml
version: "2.4"

x-log: &default-log
  options:
    max-size: "10m"
    max-file: "3"

services:
 timescaledb:
    build: hsz1273327/pg-allinone:0.0.1
    mem_limit: 2g
    restart: on-failure
    ports:
      - "5434:5432"
    environment:
      POSTGRES_PASSWORD: postgres
    volumes:
      - "/volume2/docker_deploy/storage/postgres/old_data:/old_data" #hdd
      - "./pgdata:/var/lib/postgresql/data" #ssd 
    logging:
      <<: *default-log
    command: ["-c", "max_connections=300"]
```

所以我们要在容器的`/old_data`创建tablespace

```SQL
CREATE TABLESPACE history
OWNER postgres
LOCATION '/old_data':
```

然后我们就可以使用`SELECT show_chunks(<超表>, older_than => INTERVAL '2 days')`这样的函数来查看有哪些符合要分层的存储块.然后用类似如下语句执行存储块的分层

```SQL
SELECT move_chunk(
  chunk => '_timescaledb_internal._hyper_1_4_chunk',
  destination_tablespace => 'history',
  index_destination_tablespace => 'history',
  reorder_index => '_timescaledb_internal._hyper_1_4_chunk_netdata_time_idx',
  verbose => TRUE
);
```

目前timescaledb并没有像数据过期一样给出定时数据分层的函数,不过我们可以通过定时任务自己写函数实现.

### 数据压缩

光是数据分层只是提高了ssd的利用效率而已,本质上只是做到了"好钢用在刀刃上",而数据压缩则可以通过牺牲一部分冷数据的查询效率换来空间利用率上的优势.

数据压缩分为两步

1. 启用数据压缩


    ```SQL
    ALTER TABLE metrics
    SET (
            timescaledb.compress,
            timescaledb.compress_segmentby = 'tag',
            timescaledb.compress_orderby = 'device_id, time DESC')
        );
    ```
    
    其中`timescaledb.compress_segmentby = 'tag'`含义是使用字段`tag`进行数据分段,`timescaledb.compress_segmentby`的值可以是单列或者多列的组合,但必须保证列非空.而且必须是有限的组合(换句话说就是可以枚举的范围).`timescaledb.compress_orderby`则确定了压缩数据的保存顺序,默认下它会是超表的时间字段.经过压缩后的数据的索引都会被删除,而`timescaledb.compress_segmentby`和`timescaledb.compress_orderby`指定的列则会查询效率相对高些,因此这两个值的挑选很重要.
    
2. 执行定时压缩

    ```SQL
    SELECT add_compression_policy(
            'metrics',
            INTERVAL '3 days',
            if_not_exists => TRUE
        );
    ```
    
    类似数据过期,timescaledb也提供了专用的定时任务来设置定时压缩,同样的还有`remove_compression_policy(<超表>,if_not_exists => TRUE)`用于删除定时任务.以及`compress_chunk(<块>,if_not_compressed=>True);`和`decompress_chunk(<块>,if_compressed=>True)`用于手动压缩和解压.


### 创建索引

超表默认会为时间列和分区列设置索引,而其他列的索引我们可以用标准的SQL语句像一般的表一样创建.索引是会占用资源的,而对于时间序列来说时间就是最重要的索引,因此通常不太建议乱创建索引产生资源的浪费.但超表本身并不限制设置索引,我们可以正常的为text,jsonb类型的字段设置索引.

但直接设置索引如果原本表里已经有数据了可能会造成阻塞,我们可以在索引创建语句后面加上`WITH (timescaledb.transaction_per_chunk)`来按块设置索引以降低阻塞时间.

```sql
CREATE INDEX IF NOT EXISTS metrics_idxgin ON metrics USING GIN (data) WITH (timescaledb.transaction_per_chunk);
```

## 定时任务


+ 定义执行过程

```sql
CREATE OR REPLACE PROCEDURE user_defined_action(job_id int, config jsonb) LANGUAGE PLPGSQL AS
$$
BEGIN
  RAISE NOTICE 'Executing action % with config %', job_id, config;
END
$$;
```

+ 设置任务

SELECT add_job('user_defined_action','1h');

## 一个完整的例子

我们以某只股票从一段时间内的日k线数据作为例子,比如我们认为一年内的数据算是分析时会用到的数据,热数据定义为10周内的数据,超出3个月的数据我们认为它意义不大.我们需要每隔4周的周六保存上这四周的数据到csv文件中.

表的列包括:

日期,股票代码,开盘价,收盘价,最高价,最低价,成交量,成交额,振幅,涨跌幅,涨跌额,流动股本,换手率.这些列.我们先构造表

### 使用python下载并处理数据

In [11]:
import akshare as ak
stock_zh_a_daily_qfq_df = ak.stock_zh_a_hist(symbol="601865", start_date="20100101", end_date="20210706", adjust="qfq")

In [12]:
stock_zh_a_daily_qfq_df

Unnamed: 0,日期,开盘,收盘,最高,最低,成交量,成交额,振幅,涨跌幅,涨跌额,换手率
0,2020-01-02,12.22,12.26,12.53,11.88,196174,244628268.0,5.45,2.85,0.34,13.08
1,2020-01-03,12.33,12.69,12.83,12.26,193695,248237527.0,4.65,3.51,0.43,12.91
2,2020-01-06,12.55,13.23,13.57,12.29,211957,283667696.0,10.09,4.26,0.54,14.13
3,2020-01-07,13.58,14.29,14.45,13.58,216339,310125728.0,6.58,8.01,1.06,14.42
4,2020-01-08,14.41,14.60,14.89,13.60,307620,443672704.0,9.03,2.17,0.31,20.51
...,...,...,...,...,...,...,...,...,...,...,...
360,2021-06-30,39.00,39.53,40.50,38.11,172259,680187328.0,5.98,-1.13,-0.45,3.84
361,2021-07-01,39.54,40.46,43.00,38.40,239915,968932176.0,11.64,2.35,0.93,5.35
362,2021-07-02,40.80,38.54,41.43,37.80,214496,835142656.0,8.97,-4.75,-1.92,4.79
363,2021-07-05,38.79,39.81,40.48,38.54,179305,709234576.0,5.03,3.30,1.27,4.00


处理数据

### 将数据写入数据库

### 自定义任务

## 查询

In [4]:
CREATE OR REPLACE PROCEDURE user_defined_action(job_id int, config jsonb) LANGUAGE PLPGSQL AS
$$
BEGIN
  RAISE NOTICE 'Executing action % with config %', job_id, config;
END
$$;

In [5]:
SELECT add_job('user_defined_action','1 minute');

1 row(s) returned.


add_job
1000


In [6]:
SELECT * FROM timescaledb_information.jobs;

2 row(s) returned.


job_id,application_name,schedule_interval,max_runtime,max_retries,retry_period,proc_schema,proc_name,owner,scheduled,config,next_start,hypertable_schema,hypertable_name
1,Telemetry Reporter [1],"1 day, 0:00:00",0:01:40,-1,1:00:00,_timescaledb_internal,policy_telemetry,postgres,True,,2021-07-07 01:25:22.422257+00:00,,
1000,User-Defined Action [1000],0:01:00,0:00:00,-1,0:05:00,public,user_defined_action,postgres,True,,2021-07-06 10:48:42.552708+00:00,,


In [7]:
SELECT delete_job(1000);

1 row(s) returned.


delete_job


In [8]:
SELECT * FROM timescaledb_information.jobs;

1 row(s) returned.


job_id,application_name,schedule_interval,max_runtime,max_retries,retry_period,proc_schema,proc_name,owner,scheduled,config,next_start,hypertable_schema,hypertable_name
1,Telemetry Reporter [1],"1 day, 0:00:00",0:01:40,-1,1:00:00,_timescaledb_internal,policy_telemetry,postgres,True,,2021-07-07 01:25:22.422257+00:00,,
