Skip to content

Commit

Permalink
Merge branch 'release/3.7.0' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
Hyldahl committed Jul 19, 2023
2 parents c08d0bd + ed9d0fe commit 26570e4
Show file tree
Hide file tree
Showing 22 changed files with 703 additions and 87 deletions.
2 changes: 2 additions & 0 deletions Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
<PackageReference Update="AutoFixture.Xunit2" Version="4.11.0" />
<PackageReference Update="AutoFixture.AutoNSubstitute" Version="4.18.0" />
<PackageReference Update="coverlet.msbuild" Version="2.8.0" />
<PackageReference Update="CluedIn.Core" Version="3.6.1" />
<PackageReference Update="CluedIn.DataStore" Version="3.6.1" />
<PackageReference Update="xunit" Version="2.4.1" />
<PackageReference Update="xunit.runner.visualstudio" Version="2.4.1" />
<PackageReference Update="Moq" Version="4.13.1" />
Expand Down
2 changes: 1 addition & 1 deletion src/Connector.SqlServer/Connector.SqlServer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<EmbeddedResource Include="Resources\microsoft-sql-server-logo.svg" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="CluedIn.Connector.Common" />
<PackageReference Include="CluedIn.Core" />
</ItemGroup>

<!-- Disable to above cluedin assets and enable the below for local debugging -->
Expand Down
46 changes: 46 additions & 0 deletions src/Connector.SqlServer/Connector/ConnectionAndTransaction.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using Microsoft.Data.SqlClient;
using System;
using System.Data;
using System.Threading.Tasks;

namespace CluedIn.Connector.SqlServer.Connector
{
public class ConnectionAndTransaction : IDisposable, IAsyncDisposable
{
public readonly SqlConnection Connection;
public readonly SqlTransaction Transaction;

public ConnectionAndTransaction(SqlConnection connection, SqlTransaction transaction)
{
Connection = connection;
this.Transaction = transaction;
}

public void Dispose()
{
Transaction.Dispose();
Connection.Dispose();
}

public async ValueTask DisposeAsync()
{
if (Transaction is IAsyncDisposable t)
{
await t.DisposeAsync();
}
else
{
Transaction.Dispose();
}

if (Connection is IAsyncDisposable c)
{
await c.DisposeAsync();
}
else
{
Connection.Dispose();
}
}
}
}
9 changes: 6 additions & 3 deletions src/Connector.SqlServer/Connector/ISqlClient.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
using CluedIn.Connector.Common.Clients;
using Microsoft.Data.SqlClient;
using Microsoft.Data.SqlClient;
using System.Collections.Generic;
using System.Data;
using System.Threading.Tasks;

namespace CluedIn.Connector.SqlServer.Connector
{
public interface ISqlClient : ITransactionalClientBaseV2<SqlConnection, SqlTransaction, SqlParameter>
public interface ISqlClient
{
Task<SqlConnection> BeginConnection(IReadOnlyDictionary<string, object> config);

Task<DataTable> GetTableColumns(SqlConnection connection, string tableName, string schema);

Task<DataTable> GetTables(SqlConnection connection, string tableName = null, string schema = null);

Task<ConnectionAndTransaction> BeginTransaction(IReadOnlyDictionary<string, object> config);

Task ExecuteCommandInTransactionAsync(SqlTransaction transaction, string commandText, IEnumerable<SqlParameter> param = null);
}
}
79 changes: 75 additions & 4 deletions src/Connector.SqlServer/Connector/SqlClient.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
using CluedIn.Connector.Common.Clients;
using Microsoft.Data.SqlClient;
using Microsoft.Data.SqlClient;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Threading.Tasks;

namespace CluedIn.Connector.SqlServer.Connector
{
public class SqlClient : TransactionalClientBaseV2<SqlConnection, SqlTransaction, SqlParameter>, ISqlClient
public class SqlClient : ISqlClient
{
private readonly int _defaultPort = 1433;

public override string BuildConnectionString(IReadOnlyDictionary<string, object> config)
public string BuildConnectionString(IReadOnlyDictionary<string, object> config)
{
var connectionStringBuilder = new SqlConnectionStringBuilder
{
Expand Down Expand Up @@ -73,5 +73,76 @@ public Task<DataTable> GetTables(SqlConnection connection, string tableName = nu

return Task.FromResult(connection.GetSchema("tables", restrictionValues));
}

public async Task ExecuteCommandInTransactionAsync(SqlTransaction transaction, string commandText, IEnumerable<SqlParameter> param = null)
{
var cmd = transaction.Connection.CreateCommand();
cmd.CommandText = commandText;
cmd.Transaction = transaction;

if (param != null)
{
foreach (var parameter in param)
{
SanitizeParameter(parameter);
cmd.Parameters.Add(parameter);
}
}

await cmd.ExecuteNonQueryAsync();
}

public async Task<ConnectionAndTransaction> BeginTransaction(IReadOnlyDictionary<string, object> config)
{
var connectionString = BuildConnectionString(config);
var connection = Activator.CreateInstance(typeof(SqlConnection), connectionString) as SqlConnection;

// ReSharper disable once PossibleNullReferenceException
await connection.OpenAsync();
return new ConnectionAndTransaction(connection, await connection.BeginTransactionAsync() as SqlTransaction);
}

public static void SanitizeParameter(IDbDataParameter parameter)
{
// We need to check if the object is an typeof List, Array, etc. If it is, then the process will fail because of "The CLR Type <> isn't supported" error.

parameter.Value ??= DBNull.Value;
var parameterType = parameter.Value.GetType();

// check if parameterType is an Array or List, covert the Value to string.
if (parameterType.IsArray || parameterType.IsGenericType)
{
parameter.Value = string.Join(",", ((IList)parameter.Value).Cast<string>());
}

// TODO: Further investigate, futureproof and test a proper way to handle if the Value's object is not a List<>.
// If not handled in the above condition, it will fail when we add the Parameter to the command.
}

public async Task<DataTable> GetTableColumns(SqlTransaction transaction, string tableName, string schema = null)
{
return await GetRestrictedSchema(transaction.Connection, "Columns", tableName, schema);
}

public async Task<DataTable> GetTables(SqlTransaction transaction, string name = null, string schema = null)
{
return await GetRestrictedSchema(transaction.Connection, "Tables", name, schema);
}

public static Task<DataTable> GetRestrictedSchema(
SqlConnection connection,
string collectionName,
string tableName = null,
string schema = null)
{
if (string.IsNullOrWhiteSpace(tableName))
{
tableName = null;
}

// Read more about syntax of restrictions here: https://learn.microsoft.com/en-us/dotnet/framework/data/adonet/schema-restrictions
var restrictions = new[] { null, schema, tableName, null };
return Task.FromResult(connection.GetSchema(collectionName, restrictions));
}
}
}
62 changes: 56 additions & 6 deletions src/Connector.SqlServer/Connector/SqlServerConnector.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using CluedIn.Connector.Common.Connectors;
using CluedIn.Connector.SqlServer.Exceptions;
using CluedIn.Connector.SqlServer.Exceptions;
using CluedIn.Connector.SqlServer.Utils;
using CluedIn.Connector.SqlServer.Utils.TableDefinitions;
using CluedIn.Core;
Expand All @@ -12,26 +11,55 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace CluedIn.Connector.SqlServer.Connector
{
public class SqlServerConnector : SqlConnectorBaseV2<SqlConnection, SqlTransaction, SqlParameter>
public class SqlServerConnector : ConnectorBaseV2
{
private readonly ILogger<SqlServerConnector> _logger;
private readonly ISqlClient _client;
private readonly ISqlServerConstants _constants;

public static string DefaultSizeForFieldConfigurationKey = "SqlConnector.DefaultSizeForField";

public SqlServerConnector(
ILogger<SqlServerConnector> logger,
ISqlClient client,
ISqlServerConstants constants)
: base(logger, client, constants.ProviderId, supportsRetrievingLatestEntityPersistInfo: false)
: base(constants.ProviderId, supportsRetrievingLatestEntityPersistInfo: false)
{
_logger = logger;
_client = client;
_constants = constants;

}

public override async Task<string> GetValidContainerName(ExecutionContext executionContext, Guid connectorProviderDefinitionId, string containerName)
{
var config = await AuthenticationDetailsHelper.GetAuthenticationDetails(executionContext, connectorProviderDefinitionId);
await using var connectionAndTransaction = await _client.BeginTransaction(config.Authentication);
var transaction = connectionAndTransaction.Transaction;

var cleanName = containerName.ToSanitizedSqlName();

if (!await CheckTableExists(executionContext, connectorProviderDefinitionId, transaction, cleanName))
{
return cleanName;
}

// If exists, append count like in windows explorer
var count = 0;
string newName;
do
{
count++;
newName = $"{cleanName}{count}";
} while (await CheckTableExists(executionContext, connectorProviderDefinitionId, transaction, newName));

return newName;
}

public override IReadOnlyCollection<StreamMode> GetSupportedModes()
Expand Down Expand Up @@ -309,6 +337,23 @@ public override async Task<IAsyncEnumerable<ConnectorLatestEntityPersistInfo>> G
return result;
}

public override async Task<ConnectionVerificationResult> VerifyConnection(ExecutionContext executionContext, IReadOnlyDictionary<string, object> configurationData)
{
try
{
await using var connectionAndTransaction = await _client.BeginTransaction(configurationData);
var connectionIsOpen = connectionAndTransaction.Connection.State == ConnectionState.Open;
await connectionAndTransaction.DisposeAsync();

return new ConnectionVerificationResult(connectionIsOpen);
}
catch (Exception e)
{
_logger.LogError(e, "Error verifying connection");
return new ConnectionVerificationResult(false, e.Message);
}
}

public override async Task CreateContainer(ExecutionContext executionContext, Guid connectorProviderDefinitionId, IReadOnlyCreateContainerModelV2 model)
{
await ExecuteWithRetryAsync(async () =>
Expand Down Expand Up @@ -458,6 +503,11 @@ public override async Task RenameContainer(ExecutionContext executionContext, IR
});
}

public override Task<string> GetValidMappingDestinationPropertyName(ExecutionContext executionContext, Guid connectorProviderDefinitionId, string propertyName)
{
return Task.FromResult(propertyName.ToSanitizedSqlName());
}

private IEnumerable<SqlServerConnectorCommand> GetRenameTablesCommands(IReadOnlyStreamModel streamModel, SqlTableName oldMainTableName, SqlTableName newMainTableName, DateTimeOffset suffixDate, SqlName schema)
{
var builder = new StringBuilder();
Expand Down Expand Up @@ -634,7 +684,7 @@ from INFORMATION_SCHEMA.TABLES
}
}

protected override async Task<bool> CheckTableExists(ExecutionContext executionContext, Guid connectorProviderDefinitionId, SqlTransaction transaction, string name)
protected async Task<bool> CheckTableExists(ExecutionContext executionContext, Guid connectorProviderDefinitionId, SqlTransaction transaction, string name)
{
var config = await AuthenticationDetailsHelper.GetAuthenticationDetails(executionContext, connectorProviderDefinitionId);
var tableName = SqlTableName.FromUnsafeName(name, config);
Expand Down
17 changes: 8 additions & 9 deletions src/Connector.SqlServer/Connector/SqlServerConnectorCommand.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
using CluedIn.Connector.Common;
using Microsoft.Data.SqlClient;
using Microsoft.Data.SqlClient;
using System.Collections.Generic;
using System.Linq;

namespace CluedIn.Connector.SqlServer.Connector
{
public class SqlServerConnectorCommand : CommandBase<SqlParameter>
public class SqlServerConnectorCommand
{
}
public string Text;
public IEnumerable<SqlParameter> Parameters;

public static class SqlServerConnectorCommandExtensions
{
public static SqlCommand ToSqlCommand(this SqlServerConnectorCommand sqlServerConnectorCommand, SqlTransaction transaction)
public SqlCommand ToSqlCommand(SqlTransaction transaction)
{
var sqlCommand = transaction.Connection.CreateCommand();
sqlCommand.CommandText = sqlServerConnectorCommand.Text;
sqlCommand.Parameters.AddRange(sqlServerConnectorCommand.Parameters.ToArray());
sqlCommand.CommandText = Text;
sqlCommand.Parameters.AddRange(Parameters.ToArray());
sqlCommand.Transaction = transaction;

return sqlCommand;
Expand Down
14 changes: 14 additions & 0 deletions src/Connector.SqlServer/CrawlJobDataWrapper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using CluedIn.Core.Crawling;
using System.Collections.Generic;

namespace CluedIn.Connector.SqlServer;

public class CrawlJobDataWrapper : CrawlJobData
{
public CrawlJobDataWrapper(IDictionary<string, object> configurations)
{
Configurations = configurations;
}

public IDictionary<string, object> Configurations { get; }
}
7 changes: 5 additions & 2 deletions src/Connector.SqlServer/ISqlServerConstants.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
using CluedIn.Connector.Common.Configurations;
using CluedIn.Core.Providers;
using System;

namespace CluedIn.Connector.SqlServer
{
public interface ISqlServerConstants : IConfigurationConstants
public interface ISqlServerConstants : IExtendedProviderMetadata
{
Guid ProviderId { get; }
IProviderMetadata CreateProviderMetadata();
}
}
3 changes: 0 additions & 3 deletions src/Connector.SqlServer/InstallComponents.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using Castle.MicroKernel.Registration;
using Castle.MicroKernel.SubSystems.Configuration;
using Castle.Windsor;
using CluedIn.Connector.Common.Features;
using CluedIn.Connector.SqlServer.Connector;
using System.Reflection;

Expand All @@ -12,8 +11,6 @@ public class InstallComponents : IWindsorInstaller
public void Install(IWindsorContainer container, IConfigurationStore store)
{
var asm = Assembly.GetExecutingAssembly();
container.Register(Types.FromAssembly(asm).BasedOn<IFeatureStore>().WithServiceFromInterface()
.If(t => !t.IsAbstract).LifestyleSingleton());
container.Register(Component.For<ISqlClient>().ImplementedBy<SqlClient>().OnlyNewServices());
container.Register(Component.For<ISqlServerConstants>().ImplementedBy<SqlServerConstants>().LifestyleSingleton());
}
Expand Down

0 comments on commit 26570e4

Please sign in to comment.