Skip to content

Commit

Permalink
Add DiagnosticStream to QueryPlanner & ByteCodeInterpreter.
Browse files Browse the repository at this point in the history
  • Loading branch information
danluu committed Oct 19, 2016
1 parent da5bbd2 commit c20bf78
Show file tree
Hide file tree
Showing 14 changed files with 213 additions and 37 deletions.
8 changes: 5 additions & 3 deletions inc/BitFunnel/Plan/Factories.h
Expand Up @@ -56,10 +56,12 @@ namespace BitFunnel


// TODO: get rid of these convenience methods? // TODO: get rid of these convenience methods?
std::vector<DocId> RunQueryPlanner(TermMatchNode const & tree, std::vector<DocId> RunQueryPlanner(TermMatchNode const & tree,
ISimpleIndex const & index, ISimpleIndex const & index,
IDiagnosticStream* diagnosticStream); IDiagnosticStream & diagnosticStream);


std::vector<DocId> RunSimplePlanner(TermMatchNode const & tree, ISimpleIndex const & index); std::vector<DocId> RunSimplePlanner(TermMatchNode const & tree,
ISimpleIndex const & index,
IDiagnosticStream & diagnosticStream);


} }
} }
3 changes: 3 additions & 0 deletions inc/BitFunnel/Utilities/Factories.h
Expand Up @@ -32,6 +32,7 @@
namespace BitFunnel namespace BitFunnel
{ {
class IBlockAllocator; class IBlockAllocator;
class IDiagnosticStream;
class IObjectFormatter; class IObjectFormatter;
class ITaskProcessor; class ITaskProcessor;
class ITokenManager; class ITokenManager;
Expand All @@ -41,6 +42,8 @@ namespace BitFunnel
std::unique_ptr<IBlockAllocator> std::unique_ptr<IBlockAllocator>
CreateBlockAllocator(size_t blockSize, size_t totalBlockCount); CreateBlockAllocator(size_t blockSize, size_t totalBlockCount);


std::unique_ptr<IDiagnosticStream> CreateDiagnosticStream(std::ostream& stream);

// TODO: return unique_ptr. // TODO: return unique_ptr.
IObjectFormatter* CreateObjectFormatter(std::ostream& output); IObjectFormatter* CreateObjectFormatter(std::ostream& output);


Expand Down
1 change: 1 addition & 0 deletions src/Common/Utilities/src/CMakeLists.txt
Expand Up @@ -5,6 +5,7 @@ set(CPPFILES
Allocator.cpp Allocator.cpp
BlockAllocator.cpp BlockAllocator.cpp
ConsoleLogger.cpp ConsoleLogger.cpp
DiagnosticStream.cpp
Exceptions.cpp Exceptions.cpp
FileHeader.cpp FileHeader.cpp
Logging.cpp Logging.cpp
Expand Down
74 changes: 74 additions & 0 deletions src/Common/Utilities/src/DiagnosticStream.cpp
@@ -0,0 +1,74 @@
#include "DiagnosticStream.h"
#include "BitFunnel/Utilities/Factories.h"


namespace BitFunnel
{
std::unique_ptr<IDiagnosticStream> Factories::CreateDiagnosticStream(std::ostream& stream)
{
return std::unique_ptr<IDiagnosticStream>(new DiagnosticStream(stream));
}


DiagnosticStream::DiagnosticStream(std::ostream& stream)
: m_stream(stream)
{
}


void DiagnosticStream::Enable(char const * diagnostic)
{
// TODO: REVIEW: check for duplicates?
m_enabled.push_back(diagnostic);
}


void DiagnosticStream::Disable(char const * diagnostic)
{
// TODO: REVIEW: check for mismatch?
for (unsigned i = 0 ; i < m_enabled.size(); ++i)
{
if (m_enabled[i].compare(diagnostic) == 0)
{
m_enabled.erase(m_enabled.begin() + i);

// TODO: REVIEW: Assuming there is only one to remove.
break;
}
}
}


// Returns true if text starts with prefix.
bool StartsWith(char const * text, std::string const & prefix)
{
for (unsigned i = 0 ; i < prefix.size(); ++i)
{
if (*text != prefix[i])
{
return false;
}
++text;
}
return true;
}


bool DiagnosticStream::IsEnabled(char const * diagnostic) const
{
for (unsigned i = 0; i < m_enabled.size(); ++i)
{
if (StartsWith(diagnostic, m_enabled[i]))
{
return true;
}
}
return false;
}


std::ostream& DiagnosticStream::GetStream()
{
return m_stream;
}
}
49 changes: 49 additions & 0 deletions src/Common/Utilities/src/DiagnosticStream.h
@@ -0,0 +1,49 @@
#pragma once

#include <string> // Embeds std::string.
#include <vector> // Embeds std::vector.

#include "BitFunnel/IDiagnosticStream.h" // Inherits from IDiagnosticStream.
#include "BitFunnel/NonCopyable.h" // Inherits from NonCopyable.


namespace BitFunnel
{
//*************************************************************************
//
// DiagnosticStream implements the IDiagnosticStream interface to provice
// an std::ostream used to output diagnostic information.
//
// DiagnosticStream maintains a list of diagnostic keyword prefixes that
// enable diagnostics for various parts of the system. Code with the
// ability to emit diagnostic information should use the IsEnabled() method
// to determine whether to actually write to the diagnostic stream.
//
//*************************************************************************
class DiagnosticStream : public IDiagnosticStream, NonCopyable
{
public:
// Constructs a DiagnosticStream that outputs to the specified stream.
DiagnosticStream(std::ostream& stream);

// Adds the diagnostic keyword prefix to the list of prefixes that
// enable diagnostics.
void Enable(char const * diagnostic);

// Removes the diagnostic keyword prefix from the list of prefixes
// that enable diagnostics.
void Disable(char const * diagnostic);

// Returns true if the diagnostic keyword begins with one of the
// prefixes.
bool IsEnabled(char const * diagnostic) const;

// Returns a reference to the stream used for diagnostic output.
std::ostream& GetStream();

private:
std::ostream& m_stream;

std::vector<std::string> m_enabled;
};
}
19 changes: 17 additions & 2 deletions src/Plan/src/ByteCodeInterpreter.cpp
Expand Up @@ -24,6 +24,7 @@
#include <limits> #include <limits>


#include "BitFunnel/Exceptions.h" #include "BitFunnel/Exceptions.h"
#include "BitFunnel/IDiagnosticStream.h"
#include "BitFunnel/Plan/IResultsProcessor.h" #include "BitFunnel/Plan/IResultsProcessor.h"
#include "ByteCodeInterpreter.h" #include "ByteCodeInterpreter.h"
#include "LoggerInterfaces/Check.h" #include "LoggerInterfaces/Check.h"
Expand Down Expand Up @@ -63,14 +64,16 @@ Decide on type of Slices
size_t sliceCount, size_t sliceCount,
char * const * sliceBuffers, char * const * sliceBuffers,
size_t iterationsPerSlice, size_t iterationsPerSlice,
ptrdiff_t const * rowOffsets) ptrdiff_t const * rowOffsets,
IDiagnosticStream& diagnosticStream)
: m_code(code.GetCode()), : m_code(code.GetCode()),
m_jumpTable(code.GetJumpTable()), m_jumpTable(code.GetJumpTable()),
m_resultsProcessor(resultsProcessor), m_resultsProcessor(resultsProcessor),
m_sliceCount(sliceCount), m_sliceCount(sliceCount),
m_sliceBuffers(sliceBuffers), m_sliceBuffers(sliceBuffers),
m_iterationsPerSlice(iterationsPerSlice), m_iterationsPerSlice(iterationsPerSlice),
m_rowOffsets(rowOffsets) m_rowOffsets(rowOffsets),
m_diagnosticStream(diagnosticStream)
{ {
} }


Expand Down Expand Up @@ -116,13 +119,25 @@ Decide on type of Slices
m_ip = m_code.data(); m_ip = m_code.data();
m_offset = iteration; m_offset = iteration;


if (m_diagnosticStream.IsEnabled("bytecode/run"))
{
std::ostream& out = m_diagnosticStream.GetStream();
out << "--------------------" << std::endl;
out << "ByteCode RunOneIteration:" << std::endl;
}

while (m_ip->GetOpcode() != Opcode::End) while (m_ip->GetOpcode() != Opcode::End)
{ {
const Opcode opcode = m_ip->GetOpcode(); const Opcode opcode = m_ip->GetOpcode();
const unsigned row = m_ip->GetRow(); const unsigned row = m_ip->GetRow();
const unsigned delta = m_ip->GetDelta(); const unsigned delta = m_ip->GetDelta();
const bool inverted = m_ip->IsInverted(); const bool inverted = m_ip->IsInverted();


if (m_diagnosticStream.IsEnabled("bytecode/run"))
{
std::ostream& out = m_diagnosticStream.GetStream();
out << "Opcode: " << opcode << std::endl;
}
switch (opcode) switch (opcode)
{ {
case Opcode::AndRow: case Opcode::AndRow:
Expand Down
6 changes: 5 additions & 1 deletion src/Plan/src/ByteCodeInterpreter.h
Expand Up @@ -34,6 +34,7 @@
namespace BitFunnel namespace BitFunnel
{ {
class ByteCodeGenerator; class ByteCodeGenerator;
class IDiagnosticStream;
class IResultsProcessor; class IResultsProcessor;


//************************************************************************* //*************************************************************************
Expand Down Expand Up @@ -69,7 +70,8 @@ namespace BitFunnel
size_t sliceCount, size_t sliceCount,
char * const * sliceBuffers, char * const * sliceBuffers,
size_t iterationsPerSlice, size_t iterationsPerSlice,
ptrdiff_t const * rowOffsets); ptrdiff_t const * rowOffsets,
IDiagnosticStream & diagnosticStream);


// Runs the instruction sequence for a specified number of iterations. // Runs the instruction sequence for a specified number of iterations.
// Each iteration processes a single quadword of row data at the // Each iteration processes a single quadword of row data at the
Expand Down Expand Up @@ -216,6 +218,8 @@ namespace BitFunnel


// TODO: Formalize definition and usage of zero flag. // TODO: Formalize definition and usage of zero flag.
bool m_zeroFlag; bool m_zeroFlag;

IDiagnosticStream& m_diagnosticStream;
}; };


inline std::ostream& operator<<(std::ostream& out, const ByteCodeInterpreter::Opcode value) inline std::ostream& operator<<(std::ostream& out, const ByteCodeInterpreter::Opcode value)
Expand Down
40 changes: 21 additions & 19 deletions src/Plan/src/QueryPlanner.cpp
Expand Up @@ -54,7 +54,7 @@ namespace BitFunnel
// way SimplePlanner is connected. // way SimplePlanner is connected.
std::vector<DocId> Factories::RunQueryPlanner(TermMatchNode const & tree, std::vector<DocId> Factories::RunQueryPlanner(TermMatchNode const & tree,
ISimpleIndex const & index, ISimpleIndex const & index,
IDiagnosticStream* diagnosticStream) IDiagnosticStream& diagnosticStream)
{ {
// TODO: this really shouldn't create its own allocator. // TODO: this really shouldn't create its own allocator.
Allocator allocator(4096*16); Allocator allocator(4096*16);
Expand Down Expand Up @@ -92,18 +92,18 @@ namespace BitFunnel
ISimpleIndex const & index, ISimpleIndex const & index,
// IThreadResources& threadResources, // IThreadResources& threadResources,
IAllocator& allocator, IAllocator& allocator,
IDiagnosticStream* diagnosticStream) IDiagnosticStream& diagnosticStream)
// bool generateNonBodyPlan, // bool generateNonBodyPlan,
// unsigned maxIterationsScannedBetweenTerminationChecks) // unsigned maxIterationsScannedBetweenTerminationChecks)
: m_resultsProcessor(Factories::CreateSimpleResultsProcessor()) : m_resultsProcessor(Factories::CreateSimpleResultsProcessor())
// : // m_x64FunctionGeneratorWrapper(threadResources), // : // m_x64FunctionGeneratorWrapper(threadResources),
// m_maxIterationsScannedBetweenTerminationChecks(maxIterationsScannedBetweenTerminationChecks) // m_maxIterationsScannedBetweenTerminationChecks(maxIterationsScannedBetweenTerminationChecks)
{ {
if (diagnosticStream != nullptr && diagnosticStream->IsEnabled("planning/term")) if (diagnosticStream.IsEnabled("planning/term"))
{ {
std::ostream& out = diagnosticStream->GetStream(); std::ostream& out = diagnosticStream.GetStream();
std::unique_ptr<IObjectFormatter> std::unique_ptr<IObjectFormatter>
formatter(Factories::CreateObjectFormatter(diagnosticStream->GetStream())); formatter(Factories::CreateObjectFormatter(diagnosticStream.GetStream()));


out << "--------------------" << std::endl; out << "--------------------" << std::endl;
out << "Term Plan:" << std::endl; out << "Term Plan:" << std::endl;
Expand All @@ -117,11 +117,11 @@ namespace BitFunnel
// generateNonBodyPlan, // generateNonBodyPlan,
allocator); allocator);


if (diagnosticStream != nullptr && diagnosticStream->IsEnabled("planning/row")) if (diagnosticStream.IsEnabled("planning/row"))
{ {
std::ostream& out = diagnosticStream->GetStream(); std::ostream& out = diagnosticStream.GetStream();
std::unique_ptr<IObjectFormatter> std::unique_ptr<IObjectFormatter>
formatter(Factories::CreateObjectFormatter(diagnosticStream->GetStream())); formatter(Factories::CreateObjectFormatter(diagnosticStream.GetStream()));


out << "--------------------" << std::endl; out << "--------------------" << std::endl;
out << "Row Plan:" << std::endl; out << "Row Plan:" << std::endl;
Expand All @@ -131,11 +131,11 @@ namespace BitFunnel


m_planRows = &rowPlan.GetPlanRows(); m_planRows = &rowPlan.GetPlanRows();


if (diagnosticStream != nullptr && diagnosticStream->IsEnabled("planning/planrows")) if (diagnosticStream.IsEnabled("planning/planrows"))
{ {
std::ostream& out = diagnosticStream->GetStream(); std::ostream& out = diagnosticStream.GetStream();
std::unique_ptr<IObjectFormatter> std::unique_ptr<IObjectFormatter>
formatter(Factories::CreateObjectFormatter(diagnosticStream->GetStream())); formatter(Factories::CreateObjectFormatter(diagnosticStream.GetStream()));


out << "--------------------" << std::endl; out << "--------------------" << std::endl;
out << "IPlanRows:" << std::endl; out << "IPlanRows:" << std::endl;
Expand All @@ -161,11 +161,11 @@ namespace BitFunnel
allocator); allocator);




if (diagnosticStream != nullptr && diagnosticStream->IsEnabled("planning/rewrite")) if (diagnosticStream.IsEnabled("planning/rewrite"))
{ {
std::ostream& out = diagnosticStream->GetStream(); std::ostream& out = diagnosticStream.GetStream();
std::unique_ptr<IObjectFormatter> std::unique_ptr<IObjectFormatter>
formatter(Factories::CreateObjectFormatter(diagnosticStream->GetStream())); formatter(Factories::CreateObjectFormatter(diagnosticStream.GetStream()));


out << "--------------------" << std::endl; out << "--------------------" << std::endl;
out << "Rewritten Plan:" << std::endl; out << "Rewritten Plan:" << std::endl;
Expand All @@ -178,11 +178,11 @@ namespace BitFunnel
compiler.Compile(rewritten); compiler.Compile(rewritten);
CompileNode const & compileTree = compiler.CreateTree(c_maxRankValue); CompileNode const & compileTree = compiler.CreateTree(c_maxRankValue);


if (diagnosticStream != nullptr && diagnosticStream->IsEnabled("planning/compile")) if (diagnosticStream.IsEnabled("planning/compile"))
{ {
std::ostream& out = diagnosticStream->GetStream(); std::ostream& out = diagnosticStream.GetStream();
std::unique_ptr<IObjectFormatter> std::unique_ptr<IObjectFormatter>
formatter(Factories::CreateObjectFormatter(diagnosticStream->GetStream())); formatter(Factories::CreateObjectFormatter(diagnosticStream.GetStream()));


out << "--------------------" << std::endl; out << "--------------------" << std::endl;
out << "Compile Nodes:" << std::endl; out << "Compile Nodes:" << std::endl;
Expand Down Expand Up @@ -223,14 +223,16 @@ namespace BitFunnel
size_t sliceCount = sliceBuffers.size(); size_t sliceCount = sliceBuffers.size();


// Iterations per slice calculation. // Iterations per slice calculation.
auto iterationsPerSlice = shard.GetSliceCapacity() >> 6 >> c_maxRankValue; const int c_horribleRankHack = 3;
auto iterationsPerSlice = shard.GetSliceCapacity() >> 6 >> c_horribleRankHack;


ByteCodeInterpreter intepreter(m_code, ByteCodeInterpreter intepreter(m_code,
*m_resultsProcessor, *m_resultsProcessor,
sliceCount, sliceCount,
reinterpret_cast<char* const *>(sliceBuffers.data()), reinterpret_cast<char* const *>(sliceBuffers.data()),
iterationsPerSlice, iterationsPerSlice,
rowSet.GetRowOffsets(c_shardId)); rowSet.GetRowOffsets(c_shardId),
diagnosticStream);


intepreter.Run(); intepreter.Run();


Expand Down
2 changes: 1 addition & 1 deletion src/Plan/src/QueryPlanner.h
Expand Up @@ -51,7 +51,7 @@ namespace BitFunnel
ISimpleIndex const & index, ISimpleIndex const & index,
// IThreadResources& threadResources, // IThreadResources& threadResources,
IAllocator& allocator, IAllocator& allocator,
IDiagnosticStream* diagnosticStream); IDiagnosticStream& diagnosticStream);
// bool generateNonBodyPlan, // bool generateNonBodyPlan,
// unsigned maxIterationsScannedBetweenTerminationChecks); // unsigned maxIterationsScannedBetweenTerminationChecks);


Expand Down
9 changes: 8 additions & 1 deletion src/Plan/src/QueryRunner.cpp
Expand Up @@ -20,12 +20,15 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE. // THE SOFTWARE.


#include <iostream> // Used for DiagnosticStream ref; not actually used.
#include <memory> // Used for std::unique_ptr of diagnosticStream. Probably temporary.
#include <ostream> #include <ostream>
#include <sstream> #include <sstream>


#include "Allocator.h" #include "Allocator.h"
#include "BitFunnel/Configuration/Factories.h" #include "BitFunnel/Configuration/Factories.h"
#include "BitFunnel/Configuration/IStreamConfiguration.h" #include "BitFunnel/Configuration/IStreamConfiguration.h"
#include "BitFunnel/IDiagnosticStream.h"
#include "BitFunnel/Plan/Factories.h" #include "BitFunnel/Plan/Factories.h"
#include "BitFunnel/Plan/QueryRunner.h" #include "BitFunnel/Plan/QueryRunner.h"
#include "BitFunnel/Utilities/Factories.h" #include "BitFunnel/Utilities/Factories.h"
Expand Down Expand Up @@ -124,9 +127,13 @@ namespace BitFunnel
QueryParser parser(s, m_config, *m_allocator); QueryParser parser(s, m_config, *m_allocator);
auto tree = parser.Parse(); auto tree = parser.Parse();


// TODO: remove diagnosticStream and replace with nullable.
auto diagnosticStream = Factories::CreateDiagnosticStream(std::cout);
if (tree != nullptr) if (tree != nullptr)
{ {
auto observed = Factories::RunSimplePlanner(*tree, m_index); auto observed = Factories::RunSimplePlanner(*tree,
m_index,
*diagnosticStream);
} }
} }


Expand Down

0 comments on commit c20bf78

Please sign in to comment.