From 9be8b5c43996c86791897e5b01dfc0367455040a Mon Sep 17 00:00:00 2001 From: CrawfordCurrie Date: Tue, 4 Mar 2014 12:23:17 +0000 Subject: [PATCH] Item9808: limit data sizes and generate indexes for tables using a per-installation schema. git-svn-id: http://svn.foswiki.org/trunk/DBIStoreContrib@17302 0b4bb1d4-4e5a-0410-9cc4-b2b747904278 --- data/System/DBIStoreContrib.txt | 55 +--- data/System/DBIStoreTest.txt | 14 +- lib/Foswiki/Contrib/DBIStoreContrib.pm | 4 +- .../Contrib/DBIStoreContrib/Config.spec | 68 +++- .../Contrib/DBIStoreContrib/DBIStore.pm | 187 ++++++----- .../Contrib/DBIStoreContrib/HoistSQL.pm | 295 ++++++++---------- .../Contrib/DBIStoreContrib/Personality.pm | 80 ++++- .../DBIStoreContrib/Personality/ODBC.pm | 56 ++-- 8 files changed, 415 insertions(+), 344 deletions(-) diff --git a/data/System/DBIStoreContrib.txt b/data/System/DBIStoreContrib.txt index c77048b5b7..b90ed9f455 100644 --- a/data/System/DBIStoreContrib.txt +++ b/data/System/DBIStoreContrib.txt @@ -23,51 +23,8 @@ Contains all wiki topics. Contains the names of all meta-tables. * =name= - table name, e.g. =TOPICINFO=, =FORM= etc -At a minimum, =metatypes= will include the names of all the -tables listed below. The columns of these tables have identical semantics -(and syntax) to their corresponding =%META:= statements - -=TOPICINFO= %BR% - * =TEXT author= - * =TEXT version= - * =TEXT date= - * =TEXT format= - * =TEXT reprev= - * =TEXT rev= - * =TEXT comment= - * =TEXT encoding= - -=TOPICMOVED= %BR% - * =TEXT from= - * =TEXT to= - * =TEXT by= - * =TEXT date= - -=TOPICPARENT= %BR% - * =TEXT name= - -=FILEATTACHMENT= %BR% - * =TEXT name= - * =TEXT version= - * =TEXT path= - * =TEXT size= - * =TEXT date= - * =TEXT user= - * =TEXT comment= - * =TEXT attr= - -=FORM= %BR% - * =TEXT name= - -=FIELD= %BR% - * =TEXT name= - * =TEXT value= - * =TEXT title= - -=PREFERENCE= %BR% - * =TEXT name= - * =TEXT value= - * =TEXT type= +The full schema, including the types used to represent different data can +be found in =configure= under the settings for the Extension. Note that file attachments are *not* stored in the database at this time. @@ -97,6 +54,14 @@ Date conversion using the =d2n= operator is not supported. ---+++ Row indexes Integer indexes are not supported. Use queries instead. +---+++ Representational types +The type defined in the schema must be long enough to store any possible +value for the given field, but be as short as possible to maximise the +DB's chance of building a decent index for it. + +---+++ length of an array +The =length()= operator only works on string data, not on tables. + ---++ Installation Instructions %$INSTALL_INSTRUCTIONS% diff --git a/data/System/DBIStoreTest.txt b/data/System/DBIStoreTest.txt index efc04a8c22..3d1f4970c4 100644 --- a/data/System/DBIStoreTest.txt +++ b/data/System/DBIStoreTest.txt @@ -20,15 +20,18 @@ To reload this topic, click on %SCRIPTURL{"view"}%/%WEB%/%TOPIC%?dbistore_update To reload the companion form topic, click on %SCRIPTURL{"view"}%/%WEB%/DBIStoreTestForm?dbistore_update=1 - * Non-query: %XA 2 %XB "1" %SOP%%XD% * False: %XA% 0 %XB% "0" %XC% * True: %XA% 2 %XB% "1" %XC% - * Table.selector: %XA% 2 %XB% "form.name='DBIStoreTestForm' AND length('x')=1" %XC% - * Array: %XA% 1 %XB% "fields[name='string'].value='String'" %XC% - * Field: %XA% 2 %XB% "number" %XC% + * > literal: %XA% 0 %XB% "3>20" %XC% + * < literal: %XA% 0 %XB% "'3'<'20'" %XC% + * Simple regex: %XA% 2 %XB% "'AA'=~'A'" %XC% + * Numeric Field: %XA% 2 %XB% "number" %XC% * Boolean field: %XA% 1 %XB% "boolean" %XC% * Field cmp: %XA% 1 %XB% "number=99" %XC% - * Simple regex: %XA% 2 %XB% "'AA'=~'A'" %XC% + * Field cmp: %XA% 1 %XB% "99=number" %XC% + * Field uop: %XA% 1 %XB% "length(number)=2" %XC% + * Array: %XA% 1 %XB% "fields[name='string'].value='String'" %XC% + * Table.selector: %XA% 2 %XB% "form.name='DBIStoreTestForm' AND length('x')=1" %XC% * Simple LIKE: %XA% 1 %XB% "name~'DBIStoreT*orm'" %XC% * Simple AND: %XA% 1 %XB% "number=99 AND string='String'" %XC% * Simple OR: %XA% 1 %XB% "number=99 OR string='String'" %XC% @@ -52,6 +55,7 @@ To reload the companion form topic, click on %SCRIPTURL{"view"}%/%WEB%/DBIStoreT * Escapes %XA% 0 %XB% "name =~ '\\\' OR name ~ '\\\' OR name = '\\\'" %XC% * Escapes %XA% 0 %XB% "name =~ '\\.x.y\\\'" %XC% * Table=Table %#TA% 0 %#TB% "fields=attachments" %#TC% + * Non-query: %XA 2 %XB "1" %SOP%%XD% %META:FORM{name="DBIStoreTestForm"}% %META:FIELD{name="number" attributes="" title="number" value="99"}% diff --git a/lib/Foswiki/Contrib/DBIStoreContrib.pm b/lib/Foswiki/Contrib/DBIStoreContrib.pm index 09723d3c17..9ce9c762c3 100644 --- a/lib/Foswiki/Contrib/DBIStoreContrib.pm +++ b/lib/Foswiki/Contrib/DBIStoreContrib.pm @@ -4,8 +4,8 @@ package Foswiki::Contrib::DBIStoreContrib; use strict; use Foswiki (); -our $VERSION = '1.1'; # plugin version is also locked to this -our $RELEASE = '25 Feb 2014'; +our $VERSION = '1.1'; # plugin version is also locked to this +our $RELEASE = '4 Mar 2014'; # Very verbose debugging. Used by all modules in the suite. use constant MONITOR => 0; diff --git a/lib/Foswiki/Contrib/DBIStoreContrib/Config.spec b/lib/Foswiki/Contrib/DBIStoreContrib/Config.spec index 2f8f6e7c47..f23ca322cb 100644 --- a/lib/Foswiki/Contrib/DBIStoreContrib/Config.spec +++ b/lib/Foswiki/Contrib/DBIStoreContrib/Config.spec @@ -25,4 +25,70 @@ $Foswiki::cfg{Extensions}{DBIStoreContrib}{SQLite}{PCRE} = '/usr/lib/sqlite3/pcr # encountered in topic text. This should not normally be required, as plugins # should register all META that they create. Note that only META:NAME where # NAME matches /^[A-Z][A_Z0-9_]+$/ will be loaded. -$Foswiki::cfg{Extensions}{DBIStoreContrib}{AutoloadUnknownMETA} = 0; \ No newline at end of file +$Foswiki::cfg{Extensions}{DBIStoreContrib}{AutoloadUnknownMETA} = 0; +# **PERL** +# If a column isn't found in the schema, it will use the _DEFAULT type. +# You should extend this table as required by extra meta-data found in +# your wiki. +# If an entry for a column is a string starting with an underscore, +# that string will be used as an index to get the 'real' schema for +# the column. +# The pseudo-type _DEFAULT must exist and must be a text type. +# If {index} is true, then an index will be created for that column. +$Foswiki::cfg{Extensions}{DBIStoreContrib}{Schema} = { + _DEFAULT => { type => 'TEXT' }, + _USERNAME => { type => 'VARCHAR(64)', index => 1 }, + _DATE => { type => 'VARCHAR(32)' }, + topic => { + web => { type => 'VARCHAR(256)', index => 1 }, + name => { type => 'VARCHAR(128)', index => 1 }, + text => '_DEFAULT', + raw => '_DEFAULT' + }, + metatypes => { + name => { type => 'VARCHAR(63)', index => 1 }, + }, + TOPICINFO => { + author => '_USERNAME', + version => { type => 'VARCHAR(256)' }, + date => '_DATE', + format => { type => 'VARCHAR(32)' }, + reprev => { type => 'VARCHAR(32)' }, + rev => { type => 'VARCHAR(32)' }, + comment => { type => 'VARCHAR(512)' }, + encoding => { type => 'VARCHAR(32)' }, + }, + TOPICMOVED => { + from => { type => 'VARCHAR(256)' }, + to => { type => 'VARCHAR(256)' }, + by => { type => 'VARCHAR(256)' }, + date => '_DATE', + }, + TOPICPARENT => { + name => { type => 'VARCHAR(256)', index => 1 }, + }, + FILEATTACHMENT => { + name => { type => 'VARCHAR(256)', index => 1 }, + version => { type => 'VARCHAR(32)' }, + path => { type => 'VARCHAR(256)' }, + size => { type => 'VARCHAR(32)' }, + date => '_DATE', + user => '_USERNAME', + comment => { type => 'VARCHAR(512)' }, + attr => { type => 'VARCHAR(32)' }, + }, + FORM => { + name => { type => 'VARCHAR(256)', index => 1 }, + }, + FIELD => { + name => { type => 'VARCHAR(128)', index => 1 }, + value => { type => 'VARCHAR(512)', index => 1 }, + title => { type => 'VARCHAR(256)' }, + }, + PREFERENCE => { + name => { type => 'VARCHAR(64)', index => 1 }, + value => { type => 'VARCHAR(512)' }, + type => { type => 'VARCHAR(32)' }, + } +}; + diff --git a/lib/Foswiki/Contrib/DBIStoreContrib/DBIStore.pm b/lib/Foswiki/Contrib/DBIStoreContrib/DBIStore.pm index ff316d1e52..7b3fcd9e44 100644 --- a/lib/Foswiki/Contrib/DBIStoreContrib/DBIStore.pm +++ b/lib/Foswiki/Contrib/DBIStoreContrib/DBIStore.pm @@ -33,7 +33,7 @@ use constant MONITOR => Foswiki::Contrib::DBIStoreContrib::MONITOR; # TODO: SMELL: convert to using $session->{store} perhaps? our $db; # singleton instance of this class our $personality; # personality module for the selected DSN -our ( $CQ, $TEXT ); +our $CQ; # character string quote # @ISA not used, as its set by magic, and we don't want to import more functions # our @ISA = ('Foswiki::Store::Store'); @@ -81,8 +81,6 @@ sub _connect { return 1 if $this->{handle} && !$hard_reset; - $this->{schema} ||= {}; - unless ( $this->{handle} ) { if ($Foswiki::inUnitTestMode) { @@ -110,30 +108,54 @@ sub _connect { # Custom code to put DB's into ANSI mode and clean up error reporting personality()->startup(); - $CQ = personality()->{string_quote}; - $TEXT = personality()->{text_type}; + $CQ = personality()->{string_quote}; } # Check if the DB is initialised with a quick sniff of the tables # to see if all the ones we expect are there if ( personality()->table_exists( 'metatypes', 'topic' ) ) { + if (MONITOR) { + + # Check metatypes integrity + my $tables = + $this->{handle}->selectcol_arrayref('SELECT name FROM metatypes'); + foreach my $table (@$tables) { + unless ( personality()->table_exists($table) ) { + _say "$table is in metatypes but does not exist"; + } + } + } return 1 unless ($hard_reset); _say "HARD RESET" if MONITOR; + } + elsif (MONITOR) { + _say "Base tables don't exist"; + ASSERT(0); + } - # Hard reset; strip down all existing tables + # Hard reset; strip down all existing tables - # The metatypes table is how we know which tables are ours. - my $tables = + # The metatypes table is how we know which tables are ours. Add + # this to the schema. + my @tables = keys %{ $Foswiki::cfg{Extensions}{DBIStoreContrib}{Schema} }; + if ( personality()->table_exists('metatypes') ) { + my $mts = $this->{handle}->selectcol_arrayref('SELECT name FROM metatypes'); - - foreach my $table ( @$tables, 'topic', 'metatypes' ) { - if ( personality()->table_exists($table) ) { - $this->{handle} - ->do( 'DROP TABLE ' . personality()->safe_id($table) ); + foreach my $t (@$mts) { + unless ( grep( /^$t$/, @tables ) ) { + push( @tables, $t ); } } } + foreach my $table (@tables) { + if ( personality()->table_exists($table) ) { + $this->{handle} + ->do( 'DROP TABLE ' . personality()->safe_id($table) ); + _say "Dropped $table" if MONITOR; + } + } + # No topic table, or we've had a hard reset _say "Loading DB schema" if MONITOR; $this->_createTables(); @@ -149,17 +171,40 @@ sub _connect { # Create the table for the given META - PRIVATE sub _createTableForMETA { - my ( $this, $t, @colnames ) = @_; - my $cols = join( ',', - 'tid INT', map { personality()->safe_id($_) . " $TEXT" } @colnames ); + my ( $this, $t ) = @_; + my $schema = $Foswiki::cfg{Extensions}{DBIStoreContrib}{Schema}{$t}; + + # Create table + my $cols = join( + ',', + 'tid INT', + map { + personality()->safe_id($_) . ' ' + . personality()->column_type( $t, $_ ) + } grep( !/^_/, keys %$schema ) + ); my $sn = personality()->safe_id($t); my $sql = "CREATE TABLE $sn ( $cols )"; _say $sql if MONITOR; $this->{handle}->do($sql); - $this->{schema}->{$t} = { map { $_ => 1 } @colnames }; # Add the table to the table of tables $this->{handle}->do("INSERT INTO metatypes (name) VALUES ( $CQ$t$CQ )"); + + # Create indexes + while ( my ( $col, $v ) = each %$schema ) { + $v = $Foswiki::cfg{Extensions}{DBIStoreContrib}{Schema}{$v} + unless ( ref($v) ); + if ( $v->{index} ) { + my $sql = + 'CREATE INDEX ' + . personality()->safe_id("IX_${t}_${col}") . ' ON ' + . personality()->safe_id($t) . '(' + . personality()->safe_id($col) . ')'; + _say $sql if MONITOR; + $this->{handle}->do($sql); + } + } } # Create all the base tables in the DB (including all default META: tables) - PRIVATE @@ -168,44 +213,32 @@ sub _createTables { # Create the topic table. This links the web name, topic name, # topic text and raw text of the topic. - $this->{handle}->do(<safe_id($c) . ' ' + . personality()->column_type( 'topic', $c ) ); + } + my $colst = join( ',', @cols ); + $this->{handle}->do("CREATE TABLE topic ($colst,UNIQUE (tid))"); # Now create the meta-table of known META: tables - $this->{handle}->do(<{handle}->do( 'CREATE TABLE metatypes(name ' + . personality()->column_type( 'metatypes', 'name' ) + . ')' ); # Create the tables for each registered META: type - $this->{schema} = {}; - foreach my $type ( keys(%Foswiki::Meta::VALIDATE) ) { - _say "Creating VALIDATE table for $type" if MONITOR; - my @attrs; - my $t = $Foswiki::Meta::VALIDATE{$type}; - if ($t) { - - # Get the col names from VALIDATE - foreach my $g (qw(require allow other)) { - if ( defined $t->{$g} ) { - - # The if is critical; without it, we change the schema - foreach ( @{ $t->{$g} } ) { - push( @attrs, $_ ); - } - } - } + foreach my $type ( + keys( %{ $Foswiki::cfg{Extensions}{DBIStoreContrib}{Schema} } ) ) + { + + next if $type =~ /^(_.*|topic|metatypes)$/; + + my $t = $Foswiki::cfg{Extensions}{DBIStoreContrib}{Schema}{$type}; + unless ( $t =~ /^_/ ) { + _say "Creating table for $type" if MONITOR; + $this->_createTableForMETA($type); } - $this->_createTableForMETA( $type, @attrs ); } $this->{handle}->do('COMMIT') if personality()->{requires_COMMIT}; } @@ -266,45 +299,42 @@ sub _inner_insert { # Make sure it's registered. next - unless ( defined $Foswiki::Meta::VALIDATE{$type} + unless ( + defined $Foswiki::cfg{Extensions}{DBIStoreContrib}{Schema}{$type} || $Foswiki::cfg{Extensions}{DBIStoreContrib}{AutoloadUnknownMETA} && $type =~ /^[A-Z][A-Z0-9_]+$/ ); # Make sure the table exists - unless ( $this->{schema}->{$type} ) { + my $schema = $Foswiki::cfg{Extensions}{DBIStoreContrib}{Schema}{$type}; - # The table is not in the schema + unless ($schema) { + + # The table is not in the schema. Is it in the DB? if ( personality()->table_exists($type) ) { # Pull the column names from the DB - $this->{schema}->{$type} = - { map { $_ => 1 } personality()->get_columns($type) }; + $schema = + { map { $_ => '_DEFAULT' } + personality()->get_columns($type) }; + $Foswiki::cfg{Extensions}{DBIStoreContrib}{Schema}{$type} = + $schema; } else { - # The table is not in the DB either - my %attrs; - my $t = $Foswiki::Meta::VALIDATE{$type}; - if ($t) { - - # Get the col names from VALIDATE - foreach my $g (qw(require allow other)) { - if ( defined $t->{$g} ) { - foreach ( @{ $t->{$g} } ) { - $attrs{$_} = 1; - } - } - } - } + # The table is not in the DB either. Try deduce the schema + # from the data. + $schema = {}; # Check the entries to ensure we have picked up all the # columns. We read *all* entries so we get all columns. foreach my $item ( $mo->find($type) ) { - foreach ( keys(%$item) ) { - $attrs{$_} = 1; + foreach my $col ( keys(%$item) ) { + $schema->{$col} ||= '_DEFAULT'; } } _say "Creating fly table for $type" if MONITOR; - $this->_createTableForMETA( $type, keys %attrs ); + $Foswiki::cfg{Extensions}{DBIStoreContrib}{Schema}->{$type} = + $schema; + $this->_createTableForMETA($type); } } @@ -321,20 +351,20 @@ sub _inner_insert { # Check that the table is configured to accept this data foreach my $kn (@kns) { - unless ( $this->{schema}->{$type}->{$kn} ) { + unless ( $schema->{$kn} ) { # The column is not in the schema unless ( personality()->column_exists( $type, $kn ) ) { # The column might be in the DB but not in # the schema. This is unlikely, but possible. + $schema->{$kn} = + personality()->column_type( $type, $kn ); $this->{handle}->do( 'ALTER TABLE ' . personality()->safe_id($type) . ' ADD ' - . personality()->safe_id($kn) - . " $TEXT" ); + . personality()->safe_id($kn) . ' ' + . $schema->{$kn} ); } - $this->{schema}->{$type} = - { map { $_ => 1 } personality()->get_columns($type) }; } # The column might be in the schema but not in the DB @@ -355,9 +385,6 @@ sub _inner_insert { _say "$sql [tid," . join( ',', map { $item->{$_} } @kns ) . ']' if MONITOR; - #_say "$sql tid," - # . join(';', map { (defined $item->{$_}?$item->{$_}:"") } @kns) - # if MONITOR; $this->{handle}->do( $sql, {}, $tid, map { _convertToUTF8( $item->{$_} ) } @kns ); } diff --git a/lib/Foswiki/Contrib/DBIStoreContrib/HoistSQL.pm b/lib/Foswiki/Contrib/DBIStoreContrib/HoistSQL.pm index 00c195a21c..2ca510a286 100644 --- a/lib/Foswiki/Contrib/DBIStoreContrib/HoistSQL.pm +++ b/lib/Foswiki/Contrib/DBIStoreContrib/HoistSQL.pm @@ -59,7 +59,7 @@ BEGIN { } # Coverage; add _COVER(__LINE__) where you want a visit recorded. -use constant COVER => 1; +use constant COVER => 0; our %covered; BEGIN { @@ -102,16 +102,17 @@ sub _SELECT { $info = _personality()->make_comment( $opts{monitor} ); } my $pick = $opts{select}; - my $sql = "SELECT$info $pick"; + if ( ref($pick) ) { + $pick = join( ',', @$pick ); + } + my $sql = "SELECT$info $pick"; while ( my ( $opt, $val ) = each %opts ) { - next unless ( $opt =~ /^[A-Z]/ - && defined $val ); + next unless ( $opt && $opt =~ /^[A-Z]/ && defined $val ); + $val = [$val] unless ( ref($val) ); - # Bracket sub-selects for FROM etc - if ( $val =~ /^SELECT/ ) { - $val = "($val)"; - } - $sql .= " $opt $val"; + # Bracket subqueries for FROM etc + $sql .= + " $opt " . join( ',', map { $_ =~ /^SELECT/ ? "($_)" : $_ } @$val ); } return $sql; } @@ -168,17 +169,10 @@ my %uop_map = ( uc => sub { _simple_uop( 'UPPER', STRING, @_ ) }, length => sub { my ( $arg, $atype ) = @_; - if ( $atype == UNKNOWN ) { - - # Assume a table - return ( "COUNT(*)", NUMBER ); - } - else { - if ( $atype ne STRING ) { - $arg = _cast( $arg, $atype, STRING ); - } - return ( _personality()->length($arg), NUMBER ); + if ( $atype != STRING && $atype != UNKNOWN ) { + $arg = _cast( $arg, $atype, STRING ); } + return ( _personality()->length($arg), NUMBER ); }, d2n => sub { my ( $arg, $atype ) = @_; @@ -319,7 +313,7 @@ sub hoist { if ( $h{is_table_name} ) { $h{sql} = "topic.tid IN (SELECT tid FROM ($h{sql}) AS $alias)"; } - elsif ( $h{sql} =~ /^SELECT/ ) { + elsif ( $h{is_select} ) { # It's a table; test if the selector is a true value my $where = ''; @@ -390,7 +384,7 @@ sub _hoist { $result{type} = UNKNOWN; if ( !ref( $node->{op} ) ) { - if ( $node->{op} eq STRING ) { + if ( $node->{op} == STRING ) { # Conbvert to an escaped SQL string my $s = $node->{params}[0]; @@ -431,7 +425,7 @@ sub _hoist { my $from_alias; my $tid_constraint = ''; - if ( $lhs{is_table_name} || $lhs{sql} =~ /^SELECT/ ) { + if ( $lhs{is_table_name} || $lhs{is_select} ) { $from_alias = _alias(__LINE__); $lhs{sql} = _AS( $lhs{sql} => $from_alias ); @@ -447,7 +441,7 @@ sub _hoist { my %where = _hoist( $node->{params}[1], $from_alias ); - if ( $where{sql} =~ /^SELECT/ ) { + if ( $where{is_select} ) { $where{sql} = "EXISTS($where{sql})"; } elsif ( $where{is_table_name} ) { @@ -476,6 +470,7 @@ sub _hoist { WHERE => $where, monitor => __LINE__ ); + $result{is_select} = 1; $result{has_where} = length($where); $result{type} = STRING; $result{ignore_tid} = $lhs{ignore_tid}; @@ -493,32 +488,23 @@ sub _hoist { } $result{sel} = $rhs->{params}[0]; - if ( $lhs{sql} =~ /^SELECT/ ) { - my $lhs_alias = _alias(__LINE__); - $result{sql} = _SELECT( - select => "$result{sel},$lhs_alias.tid", - FROM => _AS( $lhs{sql} => $lhs_alias ), - monitor => __LINE__ - ); + my $alias = _alias(__LINE__); + my @selects = ("$alias.tid"); + if ( $lhs{is_select} ) { + push( @selects, $result{sel} ); } elsif ( $lhs{is_table_name} ) { - my $alias = _alias(__LINE__); - $result{sql} = _SELECT( - select => "$alias.$result{sel},$alias.tid", - FROM => _AS( $lhs{sql} => $alias ), - - #-MySQL # Only need WHERE if the LHS is not the in_table - #-MySQL WHERE => (!$in_table || $lhs{sql} ne $in_table) - #-MySQL ? ("$alias.tid=" . ( $in_table - #-MySQL ? "$in_table.tid" - #-MySQL : 'tid' )) - #-MySQL : undef - monitor => __LINE__ - ); + push( @selects, "$alias.$result{sel}" ); } else { _abort( "Expected a table on the LHS of '.':", $node ); } + $result{sql} = _SELECT( + select => \@selects, + FROM => _AS( $lhs{sql} => $alias ), + monitor => __LINE__ + ); + $result{is_select} = 1; $result{type} = STRING; $result{ignore_tid} = $lhs{ignore_tid}; @@ -533,12 +519,12 @@ sub _hoist { my $lhs = $node->{params}[0]; my %lhs = _hoist( $node->{params}[0], undef ); my $lhs_where; - my $select = ''; - my $wtn = _personality() + my @selects; + my $wtn = _personality() ->strcat( "$topic_alias.web", "$CQ.$CQ", "$topic_alias.name" ); - if ( $lhs{sql} =~ /^SELECT/ ) { + if ( $lhs{is_select} ) { my $tnames = _alias(__LINE__); - $select = _AS( $lhs{sql} => $tnames ) . ','; + push( @selects, _AS( $lhs{sql} => $tnames ) ); my $tname_sel = $tnames; $tname_sel = "$tnames.$lhs{sel}" if $lhs{sel}; $lhs_where = "($topic_alias.name=$tname_sel OR ($wtn)=$tname_sel)"; @@ -557,7 +543,7 @@ sub _hoist { # Expand the RHS *without* a constraint on the topic table my %rhs = _hoist( $node->{params}[1], undef ); - unless ( $rhs{sql} =~ /^SELECT/ || $rhs{is_table_name} ) { + unless ( $rhs{is_select} || $rhs{is_table_name} ) { # We *could* handle this without an error, but it would # be pretty meaningless e.g. 'Topic'/1 @@ -570,38 +556,38 @@ sub _hoist { my $tid_constraint = "$sexpr_alias.tid IN (" . _SELECT( select => 'tid', - FROM => _AS( 'topic', $topic_alias ), + FROM => _AS( 'topic' => $topic_alias ), WHERE => $lhs_where, monitor => __LINE__ ) . ")"; + push( @selects, _AS( $rhs{sql} => $sexpr_alias ) ); $result{sql} = _SELECT( # select all columns (which will include tid) select => "$sexpr_alias.*", - FROM => $select . _AS( $rhs{sql} => $sexpr_alias ), + FROM => \@selects, WHERE => $tid_constraint, monitor => __LINE__ ); + $result{is_select} = 1; $result{has_where} = 1; $result{type} = $rhs{type}; $result{ignore_tid} = 1; } elsif ( $arity == 2 && defined $bop_map{$op} ) { - my $lhs = $node->{params}[0]; - my %lhs = _hoist( $lhs, $in_table ); - my $lhs_is_SELECT = $lhs{sql} =~ /^SELECT/; + my $lhs = $node->{params}[0]; + my %lhs = _hoist( $lhs, $in_table ); - my $rhs = $node->{params}[1]; - my %rhs = _hoist( $rhs, $in_table ); - my $rhs_is_SELECT = $rhs{sql} =~ /^SELECT/; + my $rhs = $node->{params}[1]; + my %rhs = _hoist( $rhs, $in_table ); my $opfn = $bop_map{$op}; - if ( ( $lhs_is_SELECT || $lhs{is_table_name} ) - && ( $rhs_is_SELECT || $rhs{is_table_name} ) ) + if ( ( $lhs{is_select} || $lhs{is_table_name} ) + && ( $rhs{is_select} || $rhs{is_table_name} ) ) { # TABLE - TABLE @@ -658,11 +644,12 @@ sub _hoist { my $union_sql = _UNION( $lhs_sql, $rhs_sql ); $result{sql} = _SELECT( - select => 'DISTINCT ' - . _AS( $TRUE => $result{sel} ) . ",tid", + select => + [ 'DISTINCT ' . _AS( $TRUE => $result{sel} ), 'tid' ], FROM => _AS( $union_sql, $union_alias ), monitor => __LINE__ ); + $result{is_select} = 1; $result{type} = $TRUE_TYPE; $result{ignore_tid} = 0; } @@ -701,9 +688,8 @@ sub _hoist { $tid_table = 'topic,'; } $result{sql} = _SELECT( - select => 'DISTINCT ' - . _AS( $expr => $result{sel} ) - . ",$ret_tid", + select => + [ 'DISTINCT ' . _AS( $expr => $result{sel} ), $ret_tid ], FROM => $tid_table . _AS( $lhs{sql} => $lhs_alias, @@ -712,90 +698,36 @@ sub _hoist { WHERE => $where, monitor => __LINE__ ); + $result{is_select} = 1; $result{has_where} = length($where); $result{type} = $optype; } } - elsif ( $lhs_is_SELECT || $lhs{is_table_name} ) { + elsif ( $lhs{is_select} || $lhs{is_table_name} ) { # TABLE - CONSTANT - my $lhs_alias = _alias(__LINE__); - my $l_sel = $lhs_alias; - $l_sel = "$lhs_alias.$lhs{sel}" if $lhs{sel}; - - $result{sel} = _alias(__LINE__); - $result{ignore_tid} = 0; - - my ( $expr, $optype ) = &$opfn( - $l_sel => $lhs{type}, - $rhs{sql} => $rhs{type} - ); - - my $where; - if ( $optype == BOOLEAN ) { - $where = $expr; - $expr = $TRUE; - $optype = $TRUE_TYPE; - } - - my $ret_tid = "$lhs_alias.tid"; - my $tid_table = ''; - if ( $lhs{ignore_tid} ) { - - # ignore tid coming from the subexpression - $ret_tid = 'topic.tid'; - $tid_table = 'topic,'; - } - - $result{sql} = _SELECT( - select => _AS( $expr => $result{sel} ) . ",$ret_tid", - FROM => $tid_table . _AS( $lhs{sql} => $lhs_alias ), - WHERE => $where, - monitor => __LINE__ . " $op" - ); - $result{has_where} = length($where); - $result{type} = $optype; - + my $operate = sub { + my $sel = shift; + return &$opfn( + $sel => $lhs{type}, + $rhs{sql} => $rhs{type} + ); + }; + _genSingleTableSELECT( \%lhs, $operate, \%result, + __LINE__ . " $op" ); } - elsif ($rhs_is_SELECT) { + elsif ( $rhs{is_select} ) { # CONSTANT - TABLE - my $rhs_alias = _alias(__LINE__); - my $r_sel = $rhs_alias; - $r_sel = "$rhs_alias.$rhs{sel}" if $rhs{sel}; - - $result{sel} = _alias(__LINE__); - $result{ignore_tid} = 0; - - my ( $expr, $optype ) = &$opfn( - $lhs{sql} => $lhs{type}, - $r_sel => $rhs{type} - ); - - my $where; - if ( $optype == BOOLEAN ) { - $where = $expr; - $expr = $TRUE; - $optype = $TRUE_TYPE; - } - - my $ret_tid = "$rhs_alias.tid"; - my $tid_table = ''; - if ( $rhs{ignore_tid} ) { - - # ignore tid coming from the subexpression - $ret_tid = 'topic.tid'; - $tid_table = 'topic,'; - } - - $result{sql} = _SELECT( - select => _AS( $expr => $result{sel} ) . ",$ret_tid", - FROM => $tid_table . _AS( $rhs{sql} => $rhs_alias ), - WHERE => $where, - monitor => __LINE__ - ); - $result{has_where} = length($where); - $result{type} = $optype; + my $operate = sub { + my $sel = shift; + return &$opfn( + $lhs{sql} => $lhs{type}, + $sel => $rhs{type} + ); + }; + _genSingleTableSELECT( \%rhs, $operate, \%result, + __LINE__ . " $op" ); } else { @@ -809,36 +741,14 @@ sub _hoist { elsif ( $arity == 1 && defined $uop_map{$op} ) { my $opfn = $uop_map{$op}; my %kid = _hoist( $node->{params}[0], $in_table ); - if ( $kid{sql} =~ /^SELECT/ || $kid{is_table_name} ) { - my $arg_alias = _alias(__LINE__); - my $arg_sel = $arg_alias; - $arg_sel = "$arg_alias.$kid{sel}" if $kid{sel}; - $result{sel} = _alias(__LINE__); - $result{ignore_tid} = 0; - my ( $expr, $optype ) = &$opfn( $arg_sel => UNKNOWN ); - - my $where; - if ( $optype == BOOLEAN ) { - $where = $expr; - $expr = $TRUE; - $optype = $TRUE_TYPE; - } - my $ret_tid = 'tid'; - my $tid_table = ''; - if ( $kid{ignore_tid} ) { + if ( $kid{is_select} || $kid{is_table_name} ) { + my $operate = sub { + my $sel = shift; + return &$opfn( $sel => UNKNOWN ); + }; + _genSingleTableSELECT( \%kid, $operate, \%result, + __LINE__ . " $op" ); - # ignore tid coming from the subexpression - $ret_tid = 'topic.tid'; - $tid_table = 'topic,'; - } - $result{sql} = _SELECT( - select => _AS( $expr => $result{sel} ) . ",$ret_tid", - FROM => $tid_table . _AS( $kid{sql} => $arg_alias ), - WHERE => $where, - monitor => __LINE__ - ); - $result{has_where} = length($expr); - $result{type} = $optype; } else { ( $result{sql}, $result{type} ) = &$opfn( $kid{sql}, $kid{type} ); @@ -848,15 +758,54 @@ sub _hoist { else { _abort( "Don't know how to hoist '$op':", $node ); } - if (MONITOR) { - print STDERR "Hoist " . recreate($node) . " ->\n"; - print STDERR "select $result{sel} from\n" if $result{sel}; - print STDERR "table name\n" if $result{is_table_name}; - print STDERR _format_SQL( $result{sql} ) . "\n"; - } + + # if (MONITOR) { + # print STDERR "Hoist " . recreate($node) . " ->\n"; + # print STDERR "select $result{sel} from\n" if $result{sel}; + # print STDERR "table name\n" if $result{is_table_name}; + # print STDERR _format_SQL( $result{sql} ) . "\n"; + # } return %result; } +sub _genSingleTableSELECT { + my ( $table, $operate, $result, $monitor ) = @_; + + my $alias = _alias(__LINE__); + my $sel = $alias; + $sel = "$alias.$table->{sel}" if $table->{sel}; + + $result->{sel} = _alias(__LINE__); + $result->{ignore_tid} = 0; + my ( $expr, $optype ) = &$operate($sel); + + my $where; + if ( $optype == BOOLEAN ) { + $where = $expr; + $expr = $TRUE; + $optype = $TRUE_TYPE; + } + + my $ret_tid = "$alias.tid"; + my @froms = ( _AS( $table->{sql} => $alias ) ); + if ( $table->{ignore_tid} ) { + + # ignore tid coming from the subexpression + $ret_tid = 'topic.tid'; + unshift( @froms, 'topic' ); + } + + $result->{sql} = _SELECT( + select => [ _AS( $expr => $result->{sel} ), $ret_tid ], + FROM => \@froms, + WHERE => $where, + monitor => $monitor + ); + $result->{is_select} = 1; + $result->{has_where} = length($where); + $result->{type} = $optype; +} + # Generate a cast statement, if necessary. # from a child node type. # $arg - the SQL being cast diff --git a/lib/Foswiki/Contrib/DBIStoreContrib/Personality.pm b/lib/Foswiki/Contrib/DBIStoreContrib/Personality.pm index 8efbe7c443..6526739d5b 100644 --- a/lib/Foswiki/Contrib/DBIStoreContrib/Personality.pm +++ b/lib/Foswiki/Contrib/DBIStoreContrib/Personality.pm @@ -37,12 +37,28 @@ sub new { my ( $class, $dbistore ) = @_; my $this = bless( { - store => $dbistore, - requires_COMMIT => 1, - string_quote => "'", - text_type => 'TEXT', - true_value => '1=1', - true_type => Foswiki::Contrib::DBIStoreContrib::BOOLEAN, + store => $dbistore, + + # The quote to use around constant strings + string_quote => "'", + + # If the DB is running *without* auto-commit enabled, then this + # is required. + requires_COMMIT => 0, + + # A DB with native BOOLEAN can use a simple boolean expression + # here. Without BOOLEAN support a more convoluted route is + # required. + true_value => '1=1', + + # If the DB has a native BOOLEAN type this is BOOLEAN. If it + # has to use a BIT value, this will be PSEUDO_BOOL. + true_type => Foswiki::Contrib::DBIStoreContrib::BOOLEAN, + + # Numeric shadow columns? If true, generate a FLOAT column + # for each META: column, and do a perl data conversion of + # the text data into it when saving. + use_shadows => 0, }, $class ); @@ -86,7 +102,8 @@ sub startup { =begin TML ---+ table_exists(table_name [, table_name]*) -> boolean -Determine if a table exists +Determine if a table exists. All tables named in parameters +must exist. =cut @@ -99,8 +116,8 @@ sub table_exists { SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME IN ($tables) SQL - my @rows = $this->{store}->{handle}->selectrow_array($sql); - return scalar(@rows); + my $rows = $this->{store}->{handle}->selectall_arrayref($sql); + return scalar(@$rows) == scalar(@_); } =begin TML @@ -187,14 +204,13 @@ sub wildcard { my @exprs; if ( $rhs =~ s/^'(.*)'$/$1/ ) { foreach my $spec ( split( /(?:,\s*|\|)/, $rhs ) ) { - $spec =~ s/(['.])/\\$1/g; my $like = 0; - $like = 1 if $spec =~ s/\*/.*/g; - $like = 1 if $spec =~ s/\?/./g; + $like = 1 if $spec =~ s/(['.%_])/[$1]/g; + $like = 1 if $spec =~ s/\*/%/g; + $like = 1 if $spec =~ s/\?/_/g; if ($like) { - $spec = "'^$spec\$'"; - my $res = $this->regexp( $lhs, $spec ); + my $res = "$lhs LIKE '$spec'"; push( @exprs, $res ); } else { @@ -267,7 +283,10 @@ Cast a datum to a character string type for comparison sub cast_to_text { my ( $this, $d ) = @_; - return "CAST(($d) AS $this->{text_type})"; + return + "CAST(($d) AS " + . $Foswiki::cfg{Extensions}{DBIStoreContrib}{Schema}{_DEFAULT}{type} + . ')'; } =begin TML @@ -294,6 +313,37 @@ sub strcat { return join( '||', @_ ); } +=begin TML + +---++ column_type($table, $column) -> $typename +Determine the best text type to use to represent the given column. + +The default implementation uses the +{Extensions}{DBIStoreContrib}{Schema} table to map from a +table name, column name to a type. + +=cut + +sub column_type { + my ( $this, $table, $column ) = @_; + + my $l = $Foswiki::cfg{Extensions}{DBIStoreContrib}{Schema}{$table}; + $l = $l->{$column} if $l; + if ( defined $l ) { + + # If the type name starts with an underscore, map to a default + # type name + if ( $l =~ /^_/ ) { + $l = $Foswiki::cfg{Extensions}{DBIStoreContrib}{Schema}{$l}; + ASSERT($l) if DEBUG; + } + return $l->{type}; + } + print STDERR "WARNING: Could not determine a type for $table.$column\n" + if DEBUG; + return $Foswiki::cfg{Extensions}{DBIStoreContrib}{Schema}{_DEFAULT}{type}; +} + 1; __DATA__ diff --git a/lib/Foswiki/Contrib/DBIStoreContrib/Personality/ODBC.pm b/lib/Foswiki/Contrib/DBIStoreContrib/Personality/ODBC.pm index 370f25f12b..10bc372877 100644 --- a/lib/Foswiki/Contrib/DBIStoreContrib/Personality/ODBC.pm +++ b/lib/Foswiki/Contrib/DBIStoreContrib/Personality/ODBC.pm @@ -31,7 +31,10 @@ sub new { TRY_CONVERT TSEQUAL UNPIVOT UPDATETEXT USE USER VARYING VIEW WAITFOR WHILE WITHIN GROUP WRITETEXT/ ); - $this->{text_type} = 'VARCHAR(MAX)'; + + # Override the default type in the schema + $Foswiki::cfg{Extensions}{DBIStoreContrib}{Schema}{_DEFAULT}{type} = + 'VARCHAR(MAX)'; $this->{true_value} = 'CAST(1 AS BIT)'; $this->{true_type} = Foswiki::Contrib::DBIStoreContrib::PSEUDO_BOOL; $this->{requires_COMMIT} = 0; @@ -51,30 +54,37 @@ SELECT 1 WHERE OBJECT_ID('dbo.foswiki_CONVERT') IS NOT NULL SQL if ( $exists == 0 ) { - # make_number derived from is_numeric by Dmitri Golovan of Micralyne. + # Error-tolerant number conversion. Works like perl. $this->{store}->{handle}->do(<<'SQL'); CREATE FUNCTION foswiki_CONVERT( @value VARCHAR(MAX) ) RETURNS FLOAT AS -BEGIN - RETURN ( - CASE - WHEN @value NOT LIKE '%[^-0-9.+]%' - AND ( - CHARINDEX('.', @value) = 0 - OR - CHARINDEX('.', @value) > 0 AND LEN(@value) > 1 - AND LEN(@value) - LEN(REPLACE(@value, '.', '')) = 1 - ) - AND ( - CHARINDEX('-', @value)=0 - OR - CHARINDEX('-', @value) = 1 AND LEN(@value) > 1 - AND CHARINDEX('-', @value, 2) = 0 - ) - THEN CONVERT(FLOAT, @value) - ELSE 0 - END - ) -END + BEGIN + IF @value LIKE '%[^-+0-9eE]%' RETURN 0 + IF NOT ( @value LIKE '[0-9]%' OR @value LIKE '[-+][0-9]%') RETURN 0 + -- definitely have a number; just need to find the end + DECLARE @s INT + DECLARE @ss VARCHAR(2) + SET @s = 1 + IF @value LIKE '[-+]%' SET @s = 2 + SET @ss = SUBSTRING(@value, @s, 2) + WHILE @ss LIKE '[0-9]%' + BEGIN + SET @s = @s + 1 + SET @ss = SUBSTRING(@value, @s, 2); + END + IF @ss LIKE '.[0-9eE]' + BEGIN -- fractional part + SET @s = @s + 1; + WHILE SUBSTRING(@value, @s, 1) LIKE '[0-9]' SET @s = @s + 1 + SET @ss = SUBSTRING(@value, @s, 2); + END + IF @ss LIKE '[eE][-+0-9]' + BEGIN --- exponent + SET @s = @s + 2; -- skip e and sign or first digit + WHILE SUBSTRING(@value, @s, 1) LIKE '[0-9]' SET @s = @s + 1 + END + IF @s <= DATALENGTH(@value) RETURN 0 + RETURN CONVERT(FLOAT, @value) + END SQL } }