Skip to content

Commit

Permalink
feat: support flashback table
Browse files Browse the repository at this point in the history
Patch provided by: mrhemingway <mrhemingway@163.com>
  • Loading branch information
mrdrivingduck authored and polardb-bot[bot] committed Jul 31, 2023
1 parent 2c59fe5 commit dbc715b
Show file tree
Hide file tree
Showing 120 changed files with 9,182 additions and 1,782 deletions.
4 changes: 2 additions & 2 deletions external/polar_monitor/polar_monitor_flashback_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ polar_stat_flashback_log_buf(PG_FUNCTION_ARGS)

ptr = polar_get_curr_flog_ptr(flog_instance->buf_ctl, &prev_ptr);
initalized_upto = polar_get_flog_buf_initalized_upto(flog_instance->buf_ctl);
polar_flog_get_keep_wal_lsn(flog_instance->buf_ctl, &keep_wal_lsn);
is_ready = polar_is_flog_buf_ready(flog_instance->buf_ctl);
keep_wal_lsn = flog_instance->buf_ctl->redo_lsn;
is_ready = POLAR_IS_FLOG_BUF_READY(flog_instance->buf_ctl);

values[i++] = LSNGetDatum(ptr);
values[i++] = LSNGetDatum(prev_ptr);
Expand Down
3 changes: 2 additions & 1 deletion polardb_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,8 @@ then

if [[ $enable_flashback_log == "on" ]];
then
echo "polar_enable_flashback_log = on" >> $pg_bld_master_dir/postgresql.conf
echo "polar_enable_flashback_log = on
polar_enable_fast_recovery_area = on" >> $pg_bld_master_dir/postgresql.conf
fi

# echo "max_wal_size = 16GB
Expand Down
8 changes: 8 additions & 0 deletions src/backend/access/heap/hio.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "storage/smgr.h"

/* POLAR */
#include "polar_flashback/polar_flashback.h"
#include "utils/guc.h"

static Buffer polar_relation_add_extra_blocks_and_return_last_buffer(Relation relation, BulkInsertState bistate);
Expand Down Expand Up @@ -674,6 +675,9 @@ polar_relation_add_extra_blocks_and_return_last_buffer(Relation relation, BulkIn
int index = 0;
char *bulk_buf_block = NULL;
BufferAccessStrategy strategy = NULL;
bool need_flog;

need_flog = polar_enable_fra(fra_instance);

if (bistate != NULL)
{
Expand Down Expand Up @@ -768,6 +772,10 @@ polar_relation_add_extra_blocks_and_return_last_buffer(Relation relation, BulkIn

PageInit(page, BufferGetPageSize(buffer), 0);

/* Insert the flashback log record for relation bulk extend */
if (need_flog)
polar_flog_rel_bulk_extend(flog_instance, buffer);

/*
* We mark all the new buffers dirty, but do nothing to write them
* out; they'll probably get used soon, and even if they are not, a
Expand Down
19 changes: 19 additions & 0 deletions src/backend/access/logindex/polar_logindex.c
Original file line number Diff line number Diff line change
Expand Up @@ -2360,3 +2360,22 @@ polar_logindex_update_promoted_info(logindex_snapshot_t logindex_snapshot, XLogR
info->old_rw_max_inserted_lsn = last_replayed_lsn;
info->old_rw_max_tid = logindex_snapshot->max_idx_table_id;
}

XLogRecPtr
polar_get_logindex_max_parsed_lsn(logindex_snapshot_t logindex_snapshot)
{
XLogRecPtr max_parsed_lsn = InvalidXLogRecPtr;

SpinLockAcquire(LOG_INDEX_SNAPSHOT_LOCK);
max_parsed_lsn = logindex_snapshot->max_parsed_lsn;
SpinLockRelease(LOG_INDEX_SNAPSHOT_LOCK);
return max_parsed_lsn;
}

void
polar_set_logindex_max_parsed_lsn(logindex_snapshot_t logindex_snapshot, XLogRecPtr lsn)
{
SpinLockAcquire(LOG_INDEX_SNAPSHOT_LOCK);
logindex_snapshot->max_parsed_lsn = lsn;
SpinLockRelease(LOG_INDEX_SNAPSHOT_LOCK);
}
2 changes: 1 addition & 1 deletion src/backend/access/logindex/polar_logindex_redo.c
Original file line number Diff line number Diff line change
Expand Up @@ -2610,7 +2610,7 @@ polar_logindex_find_first_fpi(polar_logindex_redo_ctl_t instance, XLogRecPtr sta
}
else
elog(WARNING, "The first WAL record of " POLAR_LOG_BUFFER_TAG_FORMAT ""
"from %lx to %lx is not a full page image", POLAR_LOG_BUFFER_TAG(tag),
" from %lx to %lx is not a full page image", POLAR_LOG_BUFFER_TAG(tag),
start_lsn, end_lsn);
}

Expand Down
2 changes: 1 addition & 1 deletion src/backend/access/logindex/polar_ringbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ polar_ringbuf_free_up(polar_ringbuf_t rbuf, size_t len, polar_interrupt_callback
CHECK_FOR_INTERRUPTS();

if (callback != NULL)
callback();
callback(rbuf);

pg_usleep(10);
continue;
Expand Down
40 changes: 40 additions & 0 deletions src/backend/access/transam/clog.c
Original file line number Diff line number Diff line change
Expand Up @@ -1176,3 +1176,43 @@ polar_remove_clog_local_cache_file(void)
{
polar_slru_remove_local_cache_file(ClogCtl);
}

/* POLAR */

/*
* POLAR: Get the minimal segment no in clog
*/
int
polar_get_clog_min_seg_no(void)
{
int min_seg_no = INT_MAX;

SlruScanDirectory(ClogCtl, polar_slru_find_min_seg, &min_seg_no);
return min_seg_no;
}

bool
polar_xid_in_clog_dir(TransactionId xid, const char *clog_dir)
{
int pageno = TransactionIdToPage(xid);
SlruCtlData ctl;

StrNCpy(ctl.Dir, clog_dir, sizeof(ctl.Dir));
return polar_slru_page_physical_exists(&ctl, pageno);
}

XidStatus
polar_get_xid_status(TransactionId xid, const char *clog_dir)
{
int pageno = TransactionIdToPage(xid);
int byteno = TransactionIdToByte(xid);
int bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
char *byteptr;
XidStatus status;
PGAlignedBlock clog_page;

polar_physical_read_fra_slru(clog_dir, pageno, clog_page.data);
byteptr = clog_page.data + byteno;
status = (*byteptr >> bshift) & CLOG_XACT_BITMASK;
return status;
}
6 changes: 6 additions & 0 deletions src/backend/access/transam/parallel.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
/* POLAR px */
#include "access/px_btbuild.h"

/* POLAR: flashback table */
#include "polar_flashback/polar_flashback_table.h"

/*
* We don't want to waste a lot of memory on an error queue which, most of
* the time, will process only a handful of small messages. However, it is
Expand Down Expand Up @@ -143,6 +146,9 @@ static const struct
{
"polar_px_bt_build_main", polar_px_bt_build_main
},
{
"polar_flashback_pages_woker_main", polar_flashback_pages_woker_main
}
/* POLAR End*/
};

Expand Down
115 changes: 106 additions & 9 deletions src/backend/access/transam/slru.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
/* POLAR end */

/* POLAR */
#include "polar_flashback/polar_fast_recovery_area.h"
#include "utils/guc.h"
#include "storage/polar_fd.h"

Expand Down Expand Up @@ -155,6 +156,7 @@ static void polar_slru_file_name_by_name(SlruCtl ctl, char *path, char *filename
static void polar_slru_file_dir(SlruCtl ctl, char *path);
static bool polar_slru_local_cache_read_page(SlruCtl ctl, int pageno, int slotno);
static bool polar_slru_local_cache_write_page(SlruCtl ctl, int pageno, int slotno);
static bool polar_slru_scan_dir_internal(SlruCtl ctl, SlruScanCallback callback, void *data, const char *path);

#define SlruFileName(a,b,c) polar_slru_file_name_by_seg(a,b,c)

Expand Down Expand Up @@ -1666,6 +1668,8 @@ restart:;
*
* NB: This does not touch the SLRU buffers themselves, callers have to ensure
* they either can't yet contain anything, or have already been cleaned out.
*
* POLAR: Add rename action for flashback table/database.
*/
static void
SlruInternalDeleteSegment(SlruCtl ctl, char *filename)
Expand All @@ -1689,7 +1693,11 @@ SlruInternalDeleteSegment(SlruCtl ctl, char *filename)
polar_slru_file_name_by_name(ctl, path, filename);
ereport(LOG,
(errmsg("removing file \"%s\"", path)));
polar_unlink(path);

if (polar_slru_seg_need_mv(fra_instance, ctl))
polar_mv_slru_seg_to_fra(fra_instance, filename, path);
else
polar_unlink(path);
}
}

Expand Down Expand Up @@ -1831,20 +1839,43 @@ SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
{
bool retval = false;
SlruShared shared = ctl->shared;
DIR *cldir;
struct dirent *clde;
int segno;
int segpage;
char path[MAXPGPATH];

/*
* POLAR: We add a local cache for slru files, so scan it first.
* Scan the shared storage while the return value is false.
*
* NB: If you want to scan the local cache and shared storage all files,
* the return value must be false.
*/
if (shared->polar_cache != NULL)
cldir = polar_allocate_dir(shared->polar_cache->dir_name);
else
{
snprintf(path, MAXPGPATH, "%s", shared->polar_cache->dir_name);
retval = polar_slru_scan_dir_internal(ctl, callback, data, path);
}

if (!retval)
{
polar_slru_file_dir(ctl, path);
cldir = polar_allocate_dir(path);
retval = polar_slru_scan_dir_internal(ctl, callback, data, path);
}

return retval;
}

/*
* POLAR: Slru scan directory internal function like SlruScanDirectory old version.
*/
static bool
polar_slru_scan_dir_internal(SlruCtl ctl, SlruScanCallback callback, void *data, const char *path)
{
bool retval = false;
DIR *cldir;
struct dirent *clde;
int segno;
int segpage;

cldir = polar_allocate_dir(path);
while ((clde = ReadDir(cldir, path)) != NULL)
{
size_t len;
Expand All @@ -1858,7 +1889,7 @@ SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
segpage = segno * SLRU_PAGES_PER_SEGMENT;

elog(DEBUG2, "SlruScanDirectory invoking callback on %s/%s",
ctl->Dir, clde->d_name);
path, clde->d_name);
retval = callback(ctl, clde->d_name, segpage, data);
if (retval)
break;
Expand Down Expand Up @@ -2169,3 +2200,69 @@ polar_slru_remove_local_cache_file(SlruCtl ctl)
if (shared && shared->polar_cache)
polar_local_cache_move_trash(shared->polar_cache->dir_name);
}

/*
* POLAR: SlruScanDirectory callback.
* This callback get minimal of all segments.
*/
bool
polar_slru_find_min_seg(SlruCtl ctl, char *filename, int segpage, void *data)
{
int seg_no;
int *min_seg_no = (int *) data;

seg_no = (int) strtol(filename, NULL, 16);
*min_seg_no = Min(seg_no, *min_seg_no);
return false; /* keep going */
}

/*
* POLAR: physical read fast recovery slru file.
*
* Like SlruPhysicalReadPage but we will report error when we can't find the file
* while SlruPhysicalReadPage just return a empty page.
*/
void
polar_physical_read_fra_slru(const char *slru_dir, int page_no, char *page)
{
SlruCtlData ctl;
int seg_no = page_no / SLRU_PAGES_PER_SEGMENT;
int rpageno = page_no % SLRU_PAGES_PER_SEGMENT;
int offset = rpageno * BLCKSZ;
char path[MAXPGPATH];
int fd;

StrNCpy(ctl.Dir, slru_dir, sizeof(ctl.Dir));
SlruFileName(&ctl, path, seg_no);

fd = polar_open_transient_file(path, O_RDWR | PG_BINARY);

if (fd < 0)
/*no cover line*/
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\": %m", path)));

pgstat_report_wait_start(WAIT_EVENT_SLRU_READ);

if (polar_pread(fd, page, BLCKSZ, offset) != BLCKSZ)
{
/*no cover begin*/
pgstat_report_wait_end();
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from file \"%s\": %m", path)));
/*no cover end*/
}

pgstat_report_wait_end();

if (CloseTransientFile(fd))
{
/*no cover begin*/
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not close fast recovery area slru file %s: %m", path)));
/*no cover end*/
}
}
Loading

0 comments on commit dbc715b

Please sign in to comment.