Skip to content

Commit

Permalink
Task/rdmp 191 extract to db improvements (#1862)
Browse files Browse the repository at this point in the history
* WORKING TRIGGER

* working update

* fix build

* better trigger

* sort of working

* fix clash

* add null check

* add todo note

* add come notes

* update changelog

* attempt to fix tests

* only add column once

* tidy up code

* move pk dropping

* put pk in correct place

* reorder

* add check for trigger

* better check for PK

* start of index

* working indexes

* tidy up code

* index tests

* interim test

* working triggers

* allow nulls

* fix patch

* fix typo

* tidy up

* comment out test

* re-add test

* dispose of pipeline

* tidy up

* add more tests

* update test

* improved pk matching

* fuix up tests

* updates from review

* tidy up tests
  • Loading branch information
JFriel committed Jun 27, 2024
1 parent 915603e commit 2202774
Show file tree
Hide file tree
Showing 7 changed files with 954 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,11 @@ This component has several configurable options that are detailed below
[Project]: ../CodeTutorials/Glossary.md#Project
[ExtractionProgress]: ../CodeTutorials/Glossary.md#ExtractionProgress
[Catalogue]: ../CodeTutorials/Glossary.md#Catalogue


## Notes on updating an extraction
Using the "Append Data If Table Exists" option within this extraction destination component will allow you to re-extract additional information to a database, however there a number of caveats and gotchas with this.
* Archive Table
* Similar to data loads, these database tables will come with an _Archive table that will auto-populate when new extractions are ran against the database.
* Data Structure changing
* While the extractor can handle columns being removed, it does not support columns being added beyond the first extraction. For this you will need to write the data to a new database table. It is recommended that the extraction is cloned in this case.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
using System.IO;
using System.Linq;
using SynthEHR;
using SynthEHR.Datasets;
using SynthEHR.Datasets;
using FAnsi.Discovery;
using NUnit.Framework;
using Rdmp.Core.CommandExecution;
Expand All @@ -33,6 +33,7 @@
using FAnsi;
using FAnsi.Discovery.QuerySyntax;
using TypeGuesser;
using Rdmp.Core.DataLoad.Triggers;

namespace Rdmp.Core.Tests.DataExport.DataExtraction;

Expand Down Expand Up @@ -530,7 +531,7 @@ public void ReExtractToADatabaseWithNewDataAndNoPKs()

dt = destinationTable.GetDataTable();

Assert.That(dt.Rows, Has.Count.EqualTo(2)); //The release ID is a PK, so won't extract the same relase ID twice
Assert.That(dt.Rows, Has.Count.EqualTo(2));
}

[Test]
Expand Down Expand Up @@ -760,12 +761,19 @@ public void ReExtractToADatabaseWithNewDataAndSinglePK()
_ = new AggregateConfiguration(CatalogueRepository, catalogue, "UnitTestShortcutAggregate");
conf.SaveToDatabase();
agg1.SaveToDatabase();
cic.RootCohortAggregateContainer.AddChild(agg12, 0);
cic.SaveToDatabase();
cic2.RootCohortAggregateContainer.AddChild(agg12, 0);
cic2.SaveToDatabase();
var dim2 = new AggregateDimension(CatalogueRepository, ei, agg12);
dim2.SaveToDatabase();
agg12.SaveToDatabase();

newCohortCmd = new ExecuteCommandCreateNewCohortByExecutingACohortIdentificationConfiguration(
new ThrowImmediatelyActivator(RepositoryLocator),
cic2,
newExternal,
"MyCohort",
project,
cohortPipeline
);
newCohortCmd.Execute();
ExtractableCohort _extractableCohort2 = new ExtractableCohort(DataExportRepository, newExternal, 2);
ec.Cohort_ID = _extractableCohort2.ID;
Expand Down Expand Up @@ -1052,4 +1060,265 @@ public void ReExtractToADatabaseWithNewDataAndExtractionIdentifierIsPK()

Assert.That(dt.Rows, Has.Count.EqualTo(2));
}

[Test]
public void ExtractToDatabaseUseTriggers()
{
var db = GetCleanedServer(FAnsi.DatabaseType.MicrosoftSQLServer);

//create catalogue from file
var csvFile = CreateFileInForLoading("bob.csv", 1, new Random(5000));
// Create the 'out of the box' RDMP pipelines (which includes an excel bulk importer pipeline)
var creator = new CataloguePipelinesAndReferencesCreation(
RepositoryLocator, UnitTestLoggingConnectionString, DataQualityEngineConnectionString);


// find the excel loading pipeline
var pipe = CatalogueRepository.GetAllObjects<Pipeline>().OrderByDescending(p => p.ID)
.FirstOrDefault(p => p.Name.Contains("BULK INSERT: CSV Import File (automated column-type detection)"));

if (pipe is null)
{
creator.CreatePipelines(new PlatformDatabaseCreationOptions { });
pipe = CatalogueRepository.GetAllObjects<Pipeline>().OrderByDescending(p => p.ID)
.FirstOrDefault(p => p.Name.Contains("BULK INSERT: CSV Import File (automated column-type detection)"));
}

// run an import of the file using the pipeline
var cmd = new ExecuteCommandCreateNewCatalogueByImportingFile(
new ThrowImmediatelyActivator(RepositoryLocator),
csvFile,
null, db, pipe, null);

cmd.Execute();
var catalogue = CatalogueRepository.GetAllObjects<Catalogue>().Where(c => c.Name == "bob").FirstOrDefault();
var chiColumnInfo = catalogue.CatalogueItems.Where(ci => ci.Name == "chi").First();
var ei = chiColumnInfo.ExtractionInformation;
ei.IsPrimaryKey = true;
ei.IsExtractionIdentifier = true;
ei.SaveToDatabase();

var project = new Project(DataExportRepository, "MyProject")
{
ProjectNumber = 500
};
project.ExtractionDirectory = Path.GetTempPath();
project.SaveToDatabase();
CohortIdentificationConfiguration cic = new CohortIdentificationConfiguration(CatalogueRepository, "Cohort1");
cic.CreateRootContainerIfNotExists();
var agg1 = new AggregateConfiguration(CatalogueRepository, catalogue, "agg1");
var conf = new AggregateConfiguration(CatalogueRepository, catalogue, "UnitTestShortcutAggregate");
conf.SaveToDatabase();
agg1.SaveToDatabase();
cic.RootCohortAggregateContainer.AddChild(agg1, 0);
cic.SaveToDatabase();
var dim = new AggregateDimension(CatalogueRepository, ei, agg1);
dim.SaveToDatabase();
agg1.SaveToDatabase();

string CohortDatabaseName = TestDatabaseNames.GetConsistentName("CohortDatabase");
string cohortTableName = "Cohort";
string definitionTableName = "CohortDefinition";
string ExternalCohortTableNameInCatalogue = "CohortTests";
const string ReleaseIdentifierFieldName = "ReleaseId";
const string DefinitionTableForeignKeyField = "cohortDefinition_id";
DiscoveredDatabase _cohortDatabase = DiscoveredServerICanCreateRandomDatabasesAndTablesOn.ExpectDatabase(CohortDatabaseName);
if (_cohortDatabase.Exists())
DeleteTables(_cohortDatabase);
else
_cohortDatabase.Create();

var definitionTable = _cohortDatabase.CreateTable("CohortDefinition", new[]
{
new DatabaseColumnRequest("id", new DatabaseTypeRequest(typeof(int)))
{ AllowNulls = false, IsAutoIncrement = true, IsPrimaryKey = true },
new DatabaseColumnRequest("projectNumber", new DatabaseTypeRequest(typeof(int))) { AllowNulls = false },
new DatabaseColumnRequest("version", new DatabaseTypeRequest(typeof(int))) { AllowNulls = false },
new DatabaseColumnRequest("description", new DatabaseTypeRequest(typeof(string), 3000))
{ AllowNulls = false },
new DatabaseColumnRequest("dtCreated", new DatabaseTypeRequest(typeof(DateTime)))
{ AllowNulls = false, Default = MandatoryScalarFunctions.GetTodaysDate }
});
var idColumn = definitionTable.DiscoverColumn("id");
var foreignKey =
new DatabaseColumnRequest(DefinitionTableForeignKeyField, new DatabaseTypeRequest(typeof(int)), false)
{ IsPrimaryKey = true };

_cohortDatabase.CreateTable("Cohort", new[]
{
new DatabaseColumnRequest("chi",
new DatabaseTypeRequest(typeof(string)), false)
{
IsPrimaryKey = true,

// if there is a single collation amongst private identifier prototype references we must use that collation
// when creating the private column so that the DBMS can link them no bother
Collation = null
},
new DatabaseColumnRequest(ReleaseIdentifierFieldName, new DatabaseTypeRequest(typeof(string), 300))
{ AllowNulls = true },
foreignKey
});

var newExternal =
new ExternalCohortTable(DataExportRepository, "TestExternalCohort", DatabaseType.MicrosoftSQLServer)
{
Database = CohortDatabaseName,
Server = _cohortDatabase.Server.Name,
DefinitionTableName = definitionTableName,
TableName = cohortTableName,
Name = ExternalCohortTableNameInCatalogue,
Username = _cohortDatabase.Server.ExplicitUsernameIfAny,
Password = _cohortDatabase.Server.ExplicitPasswordIfAny,
PrivateIdentifierField = "chi",
ReleaseIdentifierField = "ReleaseId",
DefinitionTableForeignKeyField = "cohortDefinition_id"
};

newExternal.SaveToDatabase();
var cohortPipeline = CatalogueRepository.GetAllObjects<Pipeline>().Where(p => p.Name == "CREATE COHORT:By Executing Cohort Identification Configuration").First();
var newCohortCmd = new ExecuteCommandCreateNewCohortByExecutingACohortIdentificationConfiguration(
new ThrowImmediatelyActivator(RepositoryLocator),
cic,
newExternal,
"MyCohort",
project,
cohortPipeline
);
newCohortCmd.Execute();
ExtractableCohort _extractableCohort = new ExtractableCohort(DataExportRepository, newExternal, 1);

var ec = new ExtractionConfiguration(DataExportRepository, project)
{
Name = "ext1",
Cohort_ID = _extractableCohort.ID
};
ec.AddDatasetToConfiguration(new ExtractableDataSet(DataExportRepository, catalogue));

ec.SaveToDatabase();

var extractionPipeline = new Pipeline(CatalogueRepository, "Empty extraction pipeline 4");
var component = new PipelineComponent(CatalogueRepository, extractionPipeline,
typeof(ExecuteFullExtractionToDatabaseMSSql), 0, "MS SQL Destination");
var destinationArguments = component.CreateArgumentsForClassIfNotExists<ExecuteFullExtractionToDatabaseMSSql>()
.ToList();
var argumentServer = destinationArguments.Single(a => a.Name == "TargetDatabaseServer");
var argumentDbNamePattern = destinationArguments.Single(a => a.Name == "DatabaseNamingPattern");
var argumentTblNamePattern = destinationArguments.Single(a => a.Name == "TableNamingPattern");
var reExtract = destinationArguments.Single(a => a.Name == "AppendDataIfTableExists");
Assert.That(argumentServer.Name, Is.EqualTo("TargetDatabaseServer"));
ExternalDatabaseServer _extractionServer = new ExternalDatabaseServer(CatalogueRepository, "myserver", null)
{
Server = DiscoveredServerICanCreateRandomDatabasesAndTablesOn.Name,
Username = DiscoveredServerICanCreateRandomDatabasesAndTablesOn.ExplicitUsernameIfAny,
Password = DiscoveredServerICanCreateRandomDatabasesAndTablesOn.ExplicitPasswordIfAny
};
_extractionServer.SaveToDatabase();

argumentServer.SetValue(_extractionServer);
argumentServer.SaveToDatabase();
argumentDbNamePattern.SetValue($"{TestDatabaseNames.Prefix}$p_$n");
argumentDbNamePattern.SaveToDatabase();
argumentTblNamePattern.SetValue("$c_$d");
argumentTblNamePattern.SaveToDatabase();
reExtract.SetValue(true);
reExtract.SaveToDatabase();

var component2 = new PipelineComponent(CatalogueRepository, extractionPipeline,
typeof(ExecuteCrossServerDatasetExtractionSource), -1, "Source");
var arguments2 = component2.CreateArgumentsForClassIfNotExists<ExecuteCrossServerDatasetExtractionSource>()
.ToArray();
arguments2.Single(a => a.Name.Equals("AllowEmptyExtractions")).SetValue(false);
arguments2.Single(a => a.Name.Equals("AllowEmptyExtractions")).SaveToDatabase();

//configure the component as the destination
extractionPipeline.DestinationPipelineComponent_ID = component.ID;
extractionPipeline.SourcePipelineComponent_ID = component2.ID;
extractionPipeline.SaveToDatabase();


var dbname = TestDatabaseNames.GetConsistentName($"{project.Name}_{project.ProjectNumber}");
var dbToExtractTo = DiscoveredServerICanCreateRandomDatabasesAndTablesOn.ExpectDatabase(dbname);
if (dbToExtractTo.Exists())
dbToExtractTo.Drop();

var runner = new ExtractionRunner(new ThrowImmediatelyActivator(RepositoryLocator), new ExtractionOptions
{
Command = CommandLineActivity.run,
ExtractionConfiguration = ec.ID.ToString(),
ExtractGlobals = true,
Pipeline = extractionPipeline.ID.ToString()
});

var returnCode = runner.Run(
RepositoryLocator,
ThrowImmediatelyDataLoadEventListener.Quiet,
ThrowImmediatelyCheckNotifier.Quiet,
new GracefulCancellationToken());

Assert.That(returnCode, Is.EqualTo(0), "Return code from runner was non zero");



var destinationTable = dbToExtractTo.ExpectTable("ext1_bob");
Assert.That(destinationTable.Exists());

var dt = destinationTable.GetDataTable();

Assert.That(dt.Rows, Has.Count.EqualTo(1));
var columns = dt.Columns.Cast<DataColumn>().Select(c => c.ColumnName).ToList();
Assert.That(columns, Does.Contain(SpecialFieldNames.DataLoadRunID));
Assert.That(columns, Does.Contain(SpecialFieldNames.ValidFrom));
////add new entry here
var tbl = db.DiscoverTables(false).First();
tbl.Insert(new Dictionary<string, object>
{
{ "chi","1111111111"},
{"notes","T"},
{"dtCreated", new DateTime(2001, 1, 2) },
{"century",19},
{"surname","1234"},
{"forename","yes"},
{"sex","M"},
});

CohortIdentificationConfiguration cic2 = new CohortIdentificationConfiguration(CatalogueRepository, "Cohort1");
cic2.CreateRootContainerIfNotExists();
var agg12 = new AggregateConfiguration(CatalogueRepository, catalogue, "agg1");
_ = new AggregateConfiguration(CatalogueRepository, catalogue, "UnitTestShortcutAggregate");
conf.SaveToDatabase();
agg1.SaveToDatabase();
cic.RootCohortAggregateContainer.AddChild(agg12, 0);
cic.SaveToDatabase();
var dim2 = new AggregateDimension(CatalogueRepository, ei, agg12);
dim2.SaveToDatabase();
agg12.SaveToDatabase();

newCohortCmd.Execute();
ExtractableCohort _extractableCohort2 = new ExtractableCohort(DataExportRepository, newExternal, 2);
ec.Cohort_ID = _extractableCohort2.ID;
ec.SaveToDatabase();

runner = new ExtractionRunner(new ThrowImmediatelyActivator(RepositoryLocator), new ExtractionOptions
{
Command = CommandLineActivity.run,
ExtractionConfiguration = ec.ID.ToString(),
ExtractGlobals = true,
Pipeline = extractionPipeline.ID.ToString()
});

returnCode = runner.Run(
RepositoryLocator,
ThrowImmediatelyDataLoadEventListener.Quiet,
ThrowImmediatelyCheckNotifier.Quiet,
new GracefulCancellationToken());

Assert.That(returnCode, Is.EqualTo(0), "Return code from runner was non zero");

Assert.That(destinationTable.Exists());

dt = destinationTable.GetDataTable();

Assert.That(dt.Rows, Has.Count.EqualTo(2));
}
}
Loading

0 comments on commit 2202774

Please sign in to comment.