@@ -1806,6 +1806,12 @@ class Rdb_transaction {
1806
1806
*/
1807
1807
int64_t m_n_mysql_tables_in_use = 0 ;
1808
1808
1809
+ /*
1810
+ MariaDB's group commit:
1811
+ */
1812
+ bool commit_ordered_done;
1813
+ bool commit_ordered_res;
1814
+
1809
1815
/*
1810
1816
for distinction between rdb_transaction_impl and rdb_writebatch_impl
1811
1817
when using walk tx list
@@ -2431,6 +2437,8 @@ class Rdb_transaction_impl : public Rdb_transaction {
2431
2437
THDVAR (m_thd, write_ignore_missing_column_families);
2432
2438
m_is_two_phase = rocksdb_enable_2pc;
2433
2439
2440
+ commit_ordered_done= false ;
2441
+
2434
2442
/*
2435
2443
If m_rocksdb_reuse_tx is null this will create a new transaction object.
2436
2444
Otherwise it will reuse the existing one.
@@ -2643,6 +2651,7 @@ class Rdb_writebatch_impl : public Rdb_transaction {
2643
2651
bool is_tx_started () const override { return (m_batch != nullptr ); }
2644
2652
2645
2653
void start_tx () override {
2654
+ commit_ordered_done= false ; // Do we need this here?
2646
2655
reset ();
2647
2656
write_opts.sync = (rocksdb_flush_log_at_trx_commit == FLUSH_LOG_SYNC);
2648
2657
write_opts.disableWAL = THDVAR (m_thd, write_disable_wal);
@@ -2831,8 +2840,7 @@ static bool rocksdb_flush_wal(handlerton* hton __attribute__((__unused__)))
2831
2840
*/
2832
2841
static int rocksdb_prepare (handlerton* hton, THD* thd, bool prepare_tx)
2833
2842
{
2834
- // This is "ASYNC_COMMIT" feature which is only in webscalesql
2835
- bool async=false ;
2843
+ bool async=false ; // This is "ASYNC_COMMIT" feature which is only present in webscalesql
2836
2844
2837
2845
Rdb_transaction *&tx = get_tx_from_thd (thd);
2838
2846
if (!tx->can_prepare ()) {
@@ -2842,7 +2850,8 @@ static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx)
2842
2850
(!my_core::thd_test_options (thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) {
2843
2851
/* We were instructed to prepare the whole transaction, or
2844
2852
this is an SQL statement end and autocommit is on */
2845
- #ifdef MARIAROCKS_NOT_YET // disable prepare/commit
2853
+
2854
+ #ifdef MARIAROCKS_NOT_YET // Crash-safe slave does not work yet
2846
2855
std::vector<st_slave_gtid_info> slave_gtid_info;
2847
2856
my_core::thd_slave_gtid_info (thd, &slave_gtid_info);
2848
2857
for (const auto &it : slave_gtid_info) {
@@ -2852,31 +2861,50 @@ static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx)
2852
2861
#endif
2853
2862
2854
2863
if (tx->is_two_phase ()) {
2864
+
2865
+ /*
2866
+ MariaDB: the following branch is never taken.
2867
+ We always flush at Prepare and rely on RocksDB's internal Group Commit
2868
+ to do some grouping.
2869
+ */
2855
2870
if (thd->durability_property == HA_IGNORE_DURABILITY || async) {
2856
2871
tx->set_sync (false );
2857
2872
}
2873
+
2874
+ /*
2875
+ MariaDB: do not flush logs if we are running in a non-crash-safe mode.
2876
+ */
2877
+ if (!rocksdb_flush_log_at_trx_commit)
2878
+ tx->set_sync (false );
2879
+
2858
2880
XID xid;
2859
2881
thd_get_xid (thd, reinterpret_cast <MYSQL_XID *>(&xid));
2860
2882
if (!tx->prepare (rdb_xid_to_string (xid))) {
2861
2883
return HA_EXIT_FAILURE;
2862
2884
}
2863
- if (thd->durability_property == HA_IGNORE_DURABILITY )
2885
+
2886
+ /*
2887
+ MariaDB: our Group Commit implementation does not use the
2888
+ hton->flush_logs call (at least currently) so the following is not
2889
+ needed (TODO: will we need this for binlog rotation?)
2890
+ */
2864
2891
#ifdef MARIAROCKS_NOT_YET
2865
- (rocksdb_flush_log_at_trx_commit != FLUSH_LOG_NEVER)) {
2892
+ if (thd->durability_property == HA_IGNORE_DURABILITY )
2893
+ (rocksdb_flush_log_at_trx_commit != FLUSH_LOG_NEVER))
2866
2894
&&
2867
2895
THDVAR (thd, flush_log_at_trx_commit))
2868
2896
#endif
2869
- {
2870
2897
#ifdef MARIAROCKS_NOT_YET
2898
+ {
2871
2899
// MariaRocks: disable the
2872
2900
// "write/sync redo log before flushing binlog cache to file"
2873
2901
// feature. See a869c56d361bb44f46c0efeb11a8f03561676247
2874
2902
/* *
2875
2903
we set the log sequence as '1' just to trigger hton->flush_logs
2876
2904
*/
2877
2905
thd_store_lsn (thd, 1 , DB_TYPE_ROCKSDB);
2878
- #endif
2879
2906
}
2907
+ #endif
2880
2908
}
2881
2909
2882
2910
DEBUG_SYNC (thd, " rocksdb.prepared" );
@@ -3026,6 +3054,50 @@ static int rocksdb_recover(handlerton* hton, XID* xid_list, uint len)
3026
3054
return count;
3027
3055
}
3028
3056
3057
+
3058
+ /*
3059
+ Handle a commit checkpoint request from server layer.
3060
+
3061
+ InnoDB does this:
3062
+ We put the request in a queue, so that we can notify upper layer about
3063
+ checkpoint complete when we have flushed the redo log.
3064
+ If we have already flushed all relevant redo log, we notify immediately.
3065
+
3066
+ MariaRocks just flushes everything right away ATM
3067
+ */
3068
+
3069
+ static void rocksdb_checkpoint_request (handlerton *hton,
3070
+ void *cookie)
3071
+ {
3072
+ const rocksdb::Status s= rdb->SyncWAL ();
3073
+ // TODO: what to do on error?
3074
+ if (s.ok ())
3075
+ {
3076
+ rocksdb_wal_group_syncs++;
3077
+ commit_checkpoint_notify_ha (hton, cookie);
3078
+ }
3079
+ }
3080
+
3081
+ /*
3082
+ @param all: TRUE - commit the transaction
3083
+ FALSE - SQL statement ended
3084
+ */
3085
+ static void rocksdb_commit_ordered (handlerton *hton, THD* thd, bool all)
3086
+ {
3087
+ // Same assert as InnoDB has
3088
+ DBUG_ASSERT (all || (!thd_test_options (thd, OPTION_NOT_AUTOCOMMIT |
3089
+ OPTION_BEGIN)));
3090
+ Rdb_transaction *&tx = get_tx_from_thd (thd);
3091
+
3092
+ tx->set_sync (false );
3093
+
3094
+ /* This will note the master position also */
3095
+ tx->commit_ordered_res = tx->commit ();
3096
+ tx->commit_ordered_done = true ;
3097
+
3098
+ }
3099
+
3100
+
3029
3101
static int rocksdb_commit (handlerton* hton, THD* thd, bool commit_tx)
3030
3102
{
3031
3103
DBUG_ENTER_FUNC ();
@@ -3045,6 +3117,16 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx)
3045
3117
if (tx != nullptr ) {
3046
3118
if (commit_tx || (!my_core::thd_test_options (thd, OPTION_NOT_AUTOCOMMIT |
3047
3119
OPTION_BEGIN))) {
3120
+ /*
3121
+ This will not add anything to commit_latency_stats, and this is correct
3122
+ right?
3123
+ */
3124
+ if (tx->commit_ordered_done )
3125
+ {
3126
+ thd_wakeup_subsequent_commits (thd, 0 );
3127
+ DBUG_RETURN ((tx->commit_ordered_res ? HA_ERR_INTERNAL_ERROR: 0 ));
3128
+ }
3129
+
3048
3130
/*
3049
3131
We get here
3050
3132
- For a COMMIT statement that finishes a multi-statement transaction
@@ -3053,6 +3135,7 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx)
3053
3135
if (tx->commit ()) {
3054
3136
DBUG_RETURN (HA_ERR_ROCKSDB_COMMIT_FAILED);
3055
3137
}
3138
+ thd_wakeup_subsequent_commits (thd, 0 );
3056
3139
} else {
3057
3140
/*
3058
3141
We get here when committing a statement within a transaction.
@@ -3076,6 +3159,7 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx)
3076
3159
DBUG_RETURN (HA_EXIT_SUCCESS);
3077
3160
}
3078
3161
3162
+
3079
3163
static int rocksdb_rollback (handlerton *const hton, THD *const thd,
3080
3164
bool rollback_tx) {
3081
3165
Rdb_transaction *&tx = get_tx_from_thd (thd);
@@ -3882,11 +3966,19 @@ static int rocksdb_init_func(void *const p) {
3882
3966
rocksdb_hton->state = SHOW_OPTION_YES;
3883
3967
rocksdb_hton->create = rocksdb_create_handler;
3884
3968
rocksdb_hton->close_connection = rocksdb_close_connection;
3969
+
3885
3970
rocksdb_hton->prepare = rocksdb_prepare;
3971
+ rocksdb_hton->prepare_ordered = NULL ; // Do not need it
3972
+
3886
3973
rocksdb_hton->commit_by_xid = rocksdb_commit_by_xid;
3887
3974
rocksdb_hton->rollback_by_xid = rocksdb_rollback_by_xid;
3888
3975
rocksdb_hton->recover = rocksdb_recover;
3976
+
3977
+ rocksdb_hton->commit_ordered = rocksdb_commit_ordered;
3889
3978
rocksdb_hton->commit = rocksdb_commit;
3979
+
3980
+ rocksdb_hton->commit_checkpoint_request = rocksdb_checkpoint_request;
3981
+
3890
3982
rocksdb_hton->rollback = rocksdb_rollback;
3891
3983
rocksdb_hton->show_status = rocksdb_show_status;
3892
3984
rocksdb_hton->start_consistent_snapshot =
0 commit comments