Skip to content

Commit

Permalink
IN() complete for joins and jstore - w/ non-relational IN() clause
Browse files Browse the repository at this point in the history
  • Loading branch information
sammy committed Oct 17, 2010
1 parent 2c4593c commit 7a37fc8
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 93 deletions.
35 changes: 35 additions & 0 deletions bash_functions.sh
Expand Up @@ -294,6 +294,7 @@ function order_by_test() {
$CLI SELECT id,name,salary,division FROM employee WHERE division BETWEEN 22 AND 77 ORDER BY name DESC LIMIT 4
}


function order_by_test_joins() {
echo SELECT division.name,division.location,subdivision.name,worker.name,worker.salary FROM division,subdivision,worker WHERE division.id = subdivision.division AND division.id = worker.division AND division.id BETWEEN 11 AND 33 ORDER BY worker.salary
$CLI SELECT division.name,division.location,subdivision.name,worker.name,worker.salary FROM division,subdivision,worker WHERE division.id = subdivision.division AND division.id = worker.division AND division.id BETWEEN 11 AND 33 ORDER BY worker.salary
Expand All @@ -312,6 +313,12 @@ function istore_customer_hobby_order_by_denorm_to_many_lists() {
$CLI LRANGE employee_ordered_hobby_list:4 0 -1
}

function orderbyer() {
order_by_test
order_by_test_joins
istore_customer_hobby_order_by_denorm_to_many_lists
}

function in_test_cust_id() {
echo SELECT \* FROM customer WHERE id IN "(1,2,3,4)"
$CLI SELECT \* FROM customer WHERE id IN "(1,2,3,4)"
Expand All @@ -331,6 +338,31 @@ function in_test_cust_hobby() {
$CLI SELECT \* FROM customer WHERE hobby IN "(LRANGE list_index_customer_hobby 0 2)" ORDER BY name
}

function in_test_join_nonrelational() {
echo SELECT division.id,division.name,division.location,external.name,external.salary FROM division,external WHERE division.id=external.division AND division.id IN "(44,55,33,11,22)"
$CLI SELECT division.id,division.name,division.location,external.name,external.salary FROM division,external WHERE division.id=external.division AND division.id IN "(44,55,33,11,22)"
echo DEL L_IND_div_id
$CLI DEL L_IND_div_id
echo LPUSH L_IND_div_id 22
$CLI LPUSH L_IND_div_id 22
echo LPUSH L_IND_div_id 11
$CLI LPUSH L_IND_div_id 11
echo LPUSH L_IND_div_id 33
$CLI LPUSH L_IND_div_id 33
echo LPUSH L_IND_div_id 55
$CLI LPUSH L_IND_div_id 55
echo LPUSH L_IND_div_id 44
$CLI LPUSH L_IND_div_id 44
echo SELECT division.id,division.name,division.location,external.name,external.salary FROM division,external WHERE division.id=external.division AND division.id IN "(LRANGE L_IND_div_id 0 -1)"
$CLI SELECT division.id,division.name,division.location,external.name,external.salary FROM division,external WHERE division.id=external.division AND division.id IN "(LRANGE L_IND_div_id 0 -1)"
}

function in_tester() {
in_test_cust_id
in_test_cust_hobby
in_test_join_nonrelational
}

function joiner() {
echo "JOINS"
echo
Expand Down Expand Up @@ -676,6 +708,9 @@ function all_tests() {
create_table_as_select_customer
create_table_as_select_join_worker_health

orderbyer
in_tester

test_fk_joins

secondary_range_query_test
Expand Down
146 changes: 91 additions & 55 deletions join.c
Expand Up @@ -629,7 +629,8 @@ void joinGeneric(redisClient *c,
int obt,
int obc,
bool asc,
int lim) {
int lim,
list *inl) {
Order_by = (obt != -1);
Order_by_col_val = NULL;

Expand Down Expand Up @@ -684,45 +685,75 @@ void joinGeneric(redisClient *c,
rset[i] = createValSetObject();
}

join_add_cols_t jac; /* these dont change in the loop below */
jac.qcols = qcols;
jac.j_tbls = j_tbls;
jac.j_cols = j_cols;
jac.jind_ncols = jind_ncols;
jac.j_ind_len = j_ind_len;
jac.jbtr = jbtr;
join_add_cols_t jc; /* these dont change in the loop below */
jc.qcols = qcols;
jc.j_tbls = j_tbls;
jc.j_cols = j_cols;
jc.jind_ncols = jind_ncols;
jc.j_ind_len = j_ind_len;
jc.jbtr = jbtr;

for (int i = 0; i < n_ind; i++) { // iterate indices
for (int i = 0; i < n_ind; i++) { /* iterate indices */
btEntry *be, *nbe;
j_ind_len[i] = 0;
jac.index = i;
jc.index = i;

jac.itable = Index[server.dbid][j_indxs[i]].table;
jac.o = lookupKeyRead(c->db, Tbl[server.dbid][jac.itable].name);
jc.itable = Index[server.dbid][j_indxs[i]].table;
jc.o = lookupKeyRead(c->db, Tbl[server.dbid][jc.itable].name);
jc.virt = Index[server.dbid][j_indxs[i]].virt;
robj *ind = Index[server.dbid][j_indxs[i]].obj;
jac.virt = Index[server.dbid][j_indxs[i]].virt;
robj *bt = jac.virt ? jac.o : lookupKey(c->db, ind);
btStreamIterator *bi = btGetRangeIterator(bt, low, high, jac.virt);
while ((be = btRangeNext(bi, 1)) != NULL) { // iterate btree
if (jac.virt) {
jac.jk = be->key;
jac.val = be->val;
joinAddColsFromInd(&jac, rset, obt, obc);

if (low) { /* RANGE QUERY */
robj *bt = jc.virt ? jc.o : lookupKey(c->db, ind);
btStreamIterator *bi = btGetRangeIterator(bt, low, high, jc.virt);
while ((be = btRangeNext(bi, 1)) != NULL) {
if (jc.virt) {
jc.jk = be->key;
jc.val = be->val;
joinAddColsFromInd(&jc, rset, obt, obc);
} else {
jc.jk = be->key;
robj *val = be->val;
btStreamIterator *nbi = btGetFullRangeIterator(val, 0, 0);
while ((nbe = btRangeNext(nbi, 1)) != NULL) {
jc.val = nbe->key;
joinAddColsFromInd(&jc, rset, obt, obc);
}
btReleaseRangeIterator(nbi);
}
}
btReleaseRangeIterator(bi);
} else { /* IN () QUERY */
listNode *ln;
listIter *li = listGetIterator(inl, AL_START_HEAD);
if (jc.virt) {
bool pktype = Tbl[server.dbid][jc.itable].col_type[0];
while((ln = listNext(li)) != NULL) {
jc.jk = ln->value;
jc.val = btFindVal(jc.o, jc.jk, pktype);
if (jc.val) joinAddColsFromInd(&jc, rset, obt, obc);
}
} else {
jac.jk = be->key;
robj *val = be->val;
btStreamIterator *nbi = btGetFullRangeIterator(val, 0, 0);
while ((nbe = btRangeNext(nbi, 1)) != NULL) { // iterate NodeBT
jac.val = nbe->key;
joinAddColsFromInd(&jac, rset, obt, obc);
int ind_col = (int)Index[server.dbid][j_indxs[i]].column;
bool fktype = Tbl[server.dbid][jc.itable].col_type[ind_col];
btStreamIterator *nbi;
robj *ibt = lookupKey(c->db, ind);
while((ln = listNext(li)) != NULL) {
jc.jk = ln->value;
robj *val = btIndFindVal(ibt->ptr, jc.jk, fktype);
if (val) {
nbi = btGetFullRangeIterator(val, 0, 0);
while ((nbe = btRangeNext(nbi, 1)) != NULL) {
jc.val = nbe->key;
joinAddColsFromInd(&jc, rset, obt, obc);
}
btReleaseRangeIterator(nbi);
}
}
btReleaseRangeIterator(nbi);
}
listReleaseIterator(li);
}
btReleaseRangeIterator(bi);
}
decrRefCount(low);
decrRefCount(high);

/* cant join if one table had ZERO rows */
bool one_empty = 0;
Expand Down Expand Up @@ -833,30 +864,10 @@ void joinGeneric(redisClient *c,
}
}

void legacyJoinCommand(redisClient *c) {
int j_indxs[MAX_JOIN_INDXS];
int j_tbls [MAX_JOIN_INDXS];
int j_cols [MAX_JOIN_INDXS];
int n_ind = parseIndexedColumnListOrReply(c, c->argv[1]->ptr, j_indxs);
if (!n_ind) {
addReply(c, shared.joinindexedcolumnlisterror);
return;
}
int qcols = multiColCheckOrReply(c, c->argv[2]->ptr, j_tbls, j_cols);
if (!qcols) {
addReply(c, shared.joincolumnlisterror);
return;
}
RANGE_CHECK_OR_REPLY(c->argv[3]->ptr,)

joinGeneric(c, NULL, j_indxs, j_tbls, j_cols, n_ind, qcols, low, high,
-1, 0, 0, NULL, /* STORE args */
-1, -1, 1, -1); /* ORDER BY args */
}

void jstoreCommit(redisClient *c,
int sto,
robj *range,
robj *low,
robj *high,
robj *nname,
int j_indxs[MAX_JOIN_INDXS],
int j_tbls [MAX_JOIN_INDXS],
Expand All @@ -866,7 +877,8 @@ void jstoreCommit(redisClient *c,
int obt,
int obc,
bool asc,
int lim) {
int lim,
list *inl) {
robj *argv[STORAGE_MAX_ARGC + 1];
struct redisClient *fc = rsql_createFakeClient();
fc->argv = argv;
Expand All @@ -887,7 +899,6 @@ void jstoreCommit(redisClient *c,
}
if (sub_pk) nargc--;
}
RANGE_CHECK_OR_REPLY(range->ptr,)

if (!StorageCommands[sto].argc) { /* INSERT -> create table first */
fc->argv[1] = cloneRobj(nname);
Expand All @@ -898,7 +909,7 @@ void jstoreCommit(redisClient *c,
}

joinGeneric(c, fc, j_indxs, j_tbls, j_cols, n_ind, qcols, low, high, sto,
sub_pk, nargc, nname, obt, obc, asc, lim);
sub_pk, nargc, nname, obt, obc, asc, lim, inl);

freeFakeClient(fc);
}
Expand All @@ -918,3 +929,28 @@ void freeValSetObject(robj *o) {
//RL4 "freeValSetObject: %p", o->ptr);
dictRelease((dict*) o->ptr);
}

#if 0
/* LEGACY CODE - the ROOTS */
void legacyJoinCommand(redisClient *c) {
int j_indxs[MAX_JOIN_INDXS];
int j_tbls [MAX_JOIN_INDXS];
int j_cols [MAX_JOIN_INDXS];
int n_ind = parseIndexedColumnListOrReply(c, c->argv[1]->ptr, j_indxs);
if (!n_ind) {
addReply(c, shared.joinindexedcolumnlisterror);
return;
}
int qcols = multiColCheckOrReply(c, c->argv[2]->ptr, j_tbls, j_cols);
if (!qcols) {
addReply(c, shared.joincolumnlisterror);
return;
}
RANGE_CHECK_OR_REPLY(c->argv[3]->ptr,)

joinGeneric(c, NULL, j_indxs, j_tbls, j_cols, n_ind, qcols, low, high,
-1, 0, 0, NULL, /* STORE args */
-1, -1, 1, -1, /* ORDER BY args */
NULL); /* IN() args */
}
#endif
11 changes: 8 additions & 3 deletions join.h
Expand Up @@ -18,7 +18,9 @@ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLI
#ifndef __JOINSTORE__H
#define __JOINSTORE__H

#include "adlist.h"
#include "redis.h"

#include "common.h"

void freeJoinRowObject(robj *o);
Expand Down Expand Up @@ -48,12 +50,14 @@ void joinGeneric(redisClient *c,
int obt,
int obc,
bool asc,
int lim);
int lim,
list *inl);


void jstoreCommit(redisClient *c,
int sto,
robj *range,
robj *low,
robj *high,
robj *nname,
int j_indxs[MAX_JOIN_INDXS],
int j_tbls [MAX_JOIN_INDXS],
Expand All @@ -63,6 +67,7 @@ void jstoreCommit(redisClient *c,
int obt,
int obc,
bool asc,
int lim);
int lim,
list *inl);

#endif /* __JOINSTORE__H */
4 changes: 1 addition & 3 deletions redis.c
Expand Up @@ -796,7 +796,6 @@ void denormCommand(redisClient *c);

void ikeysCommand(redisClient *c);

void legacyJoinCommand(redisClient *c);
void legacyTableCommand(redisClient *c);
void legacyInsertCommand(redisClient *c);
#endif /* ALSOSQL END */
Expand Down Expand Up @@ -935,7 +934,6 @@ static struct redisCommand cmdTable[] = {
{"norm", normCommand, -2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,1,1,1,1},
{"denorm", denormCommand, -3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,1,1,1,1},

{"legacyjoin", legacyJoinCommand, 4,REDIS_CMD_INLINE,NULL,1,1,1,1},
{"legacytable", legacyTableCommand, -3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,1,1,1,1},
{"legacyinsert", legacyInsertCommand, -3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,1,1,1,1},
{"legacyindex", legacyIndexCommand, 3,REDIS_CMD_INLINE,NULL,1,1,1,1},
Expand Down Expand Up @@ -1786,7 +1784,7 @@ static void createSharedObjects(void) {
shared.join_on_multi_col = createObject(REDIS_STRING,sdsnew(
"-ERR SELECT: JOIN: Only SINGLE index joins are supported\r\n"));
shared.join_requires_range = createObject(REDIS_STRING,sdsnew(
"-ERR SELECT: JOIN: A range must be specified when joining (e.g. tbl.col BETWEEN x AND Y) -> Full Table Scans when joining are purposefully prohibited\r\n"));
"-ERR SELECT: JOIN: A range must be specified when joining, e.g. [tbl.col BETWEEN x AND Y] or [tbl.col IN (1,2,3)] - Use SCANSELECT for FullTableJoins \r\n"));
shared.join_order_by_tbl = createObject(REDIS_STRING,sdsnew(
"-ERR SELECT: JOIN: ORDER BY tablename.columname - table does not exist\r\n"));
shared.join_order_by_col = createObject(REDIS_STRING,sdsnew(
Expand Down

0 comments on commit 7a37fc8

Please sign in to comment.