Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GetItemLinqQueryable now works with null query #561

Merged
merged 7 commits into from Jul 17, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 8 additions & 1 deletion Microsoft.Azure.Cosmos/src/Linq/CosmosLinqExtensions.cs
Expand Up @@ -91,7 +91,14 @@ public static bool IsPrimitive(this object obj)
/// </example>
internal static string ToSqlQueryText<T>(this IQueryable<T> query)
{
return ((CosmosLinqQuery<T>)query).ToSqlQueryText();
CosmosLinqQuery<T> linqQuery = query as CosmosLinqQuery<T>;

if (linqQuery == null)
{
throw new ArgumentOutOfRangeException(nameof(linqQuery), "ToSqlQueryText is only supported on cosmos LINQ query operations");
}

return linqQuery.ToSqlQueryText();
}

/// <summary>
Expand Down
74 changes: 31 additions & 43 deletions Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQuery.cs
Expand Up @@ -11,8 +11,6 @@ namespace Microsoft.Azure.Cosmos.Linq
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Query;
using Microsoft.Azure.Documents;
using Newtonsoft.Json;

/// <summary>
Expand All @@ -21,54 +19,54 @@ namespace Microsoft.Azure.Cosmos.Linq
/// <seealso cref="CosmosLinqQueryProvider"/>
internal sealed class CosmosLinqQuery<T> : IDocumentQuery<T>, IOrderedQueryable<T>
{
private readonly Expression expression;
private readonly CosmosLinqQueryProvider queryProvider;
private readonly Guid correlatedActivityId;

private readonly ContainerCore container;
private readonly CosmosQueryClientCore queryClient;
private readonly CosmosSerializer cosmosJsonSerializer;
private readonly CosmosResponseFactory responseFactory;
private readonly QueryRequestOptions cosmosQueryRequestOptions;
private readonly bool allowSynchronousQueryExecution = false;
private readonly string continuationToken;

public CosmosLinqQuery(
ContainerCore container,
CosmosSerializer cosmosJsonSerializer,
CosmosResponseFactory responseFactory,
CosmosQueryClientCore queryClient,
string continuationToken,
QueryRequestOptions cosmosQueryRequestOptions,
Expression expression,
bool allowSynchronousQueryExecution)
{
this.container = container ?? throw new ArgumentNullException(nameof(container));
this.cosmosJsonSerializer = cosmosJsonSerializer ?? throw new ArgumentNullException(nameof(cosmosJsonSerializer));
this.responseFactory = responseFactory ?? throw new ArgumentNullException(nameof(responseFactory));
this.queryClient = queryClient ?? throw new ArgumentNullException(nameof(queryClient));
this.continuationToken = continuationToken;
this.cosmosQueryRequestOptions = cosmosQueryRequestOptions;
this.expression = expression ?? Expression.Constant(this);
this.Expression = expression ?? Expression.Constant(this);
this.allowSynchronousQueryExecution = allowSynchronousQueryExecution;
this.correlatedActivityId = Guid.NewGuid();

this.queryProvider = new CosmosLinqQueryProvider(
this.container,
this.cosmosJsonSerializer,
this.queryClient,
container,
responseFactory,
queryClient,
this.continuationToken,
this.cosmosQueryRequestOptions,
cosmosQueryRequestOptions,
this.allowSynchronousQueryExecution,
this.queryClient.OnExecuteScalarQueryCallback);
this.correlatedActivityId = Guid.NewGuid();
}

public CosmosLinqQuery(
ContainerCore container,
CosmosSerializer cosmosJsonSerializer,
CosmosResponseFactory responseFactory,
CosmosQueryClientCore queryClient,
string continuationToken,
QueryRequestOptions cosmosQueryRequestOptions,
bool allowSynchronousQueryExecution)
: this(
container,
cosmosJsonSerializer,
responseFactory,
queryClient,
continuationToken,
cosmosQueryRequestOptions,
Expand All @@ -79,7 +77,7 @@ internal sealed class CosmosLinqQuery<T> : IDocumentQuery<T>, IOrderedQueryable<

public Type ElementType => typeof(T);

public Expression Expression => this.expression;
public Expression Expression { get; }

public IQueryProvider Provider => this.queryProvider;

Expand All @@ -100,13 +98,12 @@ public IEnumerator<T> GetEnumerator()
" use GetItemsQueryIterator to execute asynchronously");
}

FeedIterator localQueryExecutionContext = this.CreateCosmosQueryExecutionContext();
while (localQueryExecutionContext.HasMoreResults)
FeedIterator<T> localFeedIterator = this.CreateFeedIterator(false);
while (localFeedIterator.HasMoreResults)
{
#pragma warning disable VSTHRD002 // Avoid problematic synchronous waits
ResponseMessage responseMessage = TaskHelper.InlineIfPossible(() => localQueryExecutionContext.ReadNextAsync(CancellationToken.None), null).GetAwaiter().GetResult();
FeedResponse<T> items = TaskHelper.InlineIfPossible(() => localFeedIterator.ReadNextAsync(CancellationToken.None), null).GetAwaiter().GetResult();
#pragma warning disable VSTHRD002 // Avoid problematic synchronous waits
FeedResponse<T> items = this.container.ClientContext.ResponseFactory.CreateQueryFeedResponse<T>(responseMessage);

foreach (T item in items)
{
Expand All @@ -126,7 +123,7 @@ IEnumerator IEnumerable.GetEnumerator()

public override string ToString()
{
SqlQuerySpec querySpec = DocumentQueryEvaluator.Evaluate(this.expression);
SqlQuerySpec querySpec = DocumentQueryEvaluator.Evaluate(this.Expression);
if (querySpec != null)
{
return JsonConvert.SerializeObject(querySpec);
Expand All @@ -137,21 +134,13 @@ public override string ToString()

public string ToSqlQueryText()
{
SqlQuerySpec querySpec = DocumentQueryEvaluator.Evaluate(this.expression);
if (querySpec != null)
{
return (querySpec.QueryText);
}

return this.container.LinkUri.ToString();
SqlQuerySpec querySpec = DocumentQueryEvaluator.Evaluate(this.Expression);
return querySpec?.QueryText;
}

public FeedIterator<T> ToFeedIterator()
{
return this.container.GetItemQueryIterator<T>(
queryDefinition: new QueryDefinition(this.ToSqlQueryText()),
continuationToken: this.continuationToken,
requestOptions: this.cosmosQueryRequestOptions);
return this.CreateFeedIterator(true);
}

public void Dispose()
Expand All @@ -169,20 +158,19 @@ Task<DocumentFeedResponse<dynamic>> IDocumentQuery<T>.ExecuteNextAsync(Cancellat
throw new NotImplementedException();
}

private FeedIterator CreateCosmosQueryExecutionContext()
private FeedIterator<T> CreateFeedIterator(bool isContinuationExcpected)
{
return new CosmosQueryExecutionContextFactory(
client: this.queryClient,
resourceTypeEnum: ResourceType.Document,
operationType: OperationType.Query,
resourceType: typeof(T),
sqlQuerySpec: DocumentQueryEvaluator.Evaluate(this.expression),
SqlQuerySpec querySpec = DocumentQueryEvaluator.Evaluate(this.Expression);

FeedIterator streamIterator = this.container.GetItemQueryStreamIteratorInternal(
sqlQuerySpec: querySpec,
isContinuationExcpected: isContinuationExcpected,
continuationToken: this.continuationToken,
queryRequestOptions: this.cosmosQueryRequestOptions,
resourceLink: this.container.LinkUri,
isContinuationExpected: false,
allowNonValueAggregateQuery: true,
correlatedActivityId: Guid.NewGuid());
requestOptions: this.cosmosQueryRequestOptions);

return new FeedIteratorCore<T>(
streamIterator,
this.responseFactory.CreateQueryFeedResponse<T>);
}
}
}
14 changes: 7 additions & 7 deletions Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQueryProvider.cs
Expand Up @@ -16,23 +16,23 @@ internal sealed class CosmosLinqQueryProvider : IQueryProvider
{
private readonly ContainerCore container;
private readonly CosmosQueryClientCore queryClient;
private readonly CosmosSerializer cosmosJsonSerializer;
private readonly CosmosResponseFactory responseFactory;
private readonly QueryRequestOptions cosmosQueryRequestOptions;
private readonly bool allowSynchronousQueryExecution;
private readonly Action<IQueryable> onExecuteScalarQueryCallback;
private readonly string continuationToken;

public CosmosLinqQueryProvider(
ContainerCore container,
CosmosSerializer cosmosJsonSerializer,
CosmosResponseFactory responseFactory,
CosmosQueryClientCore queryClient,
string continuationToken,
QueryRequestOptions cosmosQueryRequestOptions,
bool allowSynchronousQueryExecution,
Action<IQueryable> onExecuteScalarQueryCallback = null)
{
this.container = container;
this.cosmosJsonSerializer = cosmosJsonSerializer;
this.responseFactory = responseFactory;
this.queryClient = queryClient;
this.continuationToken = continuationToken;
this.cosmosQueryRequestOptions = cosmosQueryRequestOptions;
Expand All @@ -44,7 +44,7 @@ public IQueryable<TElement> CreateQuery<TElement>(Expression expression)
{
return new CosmosLinqQuery<TElement>(
this.container,
this.cosmosJsonSerializer,
this.responseFactory,
this.queryClient,
this.continuationToken,
this.cosmosQueryRequestOptions,
Expand All @@ -59,7 +59,7 @@ public IQueryable CreateQuery(Expression expression)
return (IQueryable)Activator.CreateInstance(
documentQueryType,
this.container,
this.cosmosJsonSerializer,
this.responseFactory,
this.queryClient,
this.continuationToken,
this.cosmosQueryRequestOptions,
Expand All @@ -73,7 +73,7 @@ public TResult Execute<TResult>(Expression expression)
CosmosLinqQuery<TResult> cosmosLINQQuery = (CosmosLinqQuery<TResult>)Activator.CreateInstance(
cosmosQueryType,
this.container,
this.cosmosJsonSerializer,
this.responseFactory,
this.queryClient,
this.continuationToken,
this.cosmosQueryRequestOptions,
Expand All @@ -90,7 +90,7 @@ public object Execute(Expression expression)
CosmosLinqQuery<object> cosmosLINQQuery = (CosmosLinqQuery<object>)Activator.CreateInstance(
cosmosQueryType,
this.container,
this.cosmosJsonSerializer,
this.responseFactory,
this.queryClient,
this.continuationToken,
this.cosmosQueryRequestOptions,
Expand Down
Expand Up @@ -241,36 +241,11 @@ internal partial class ContainerCore : Container
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
requestOptions = requestOptions ?? new QueryRequestOptions();

if (requestOptions.IsEffectivePartitionKeyRouting)
{
requestOptions.PartitionKey = null;
}

if (queryDefinition == null)
{
return new FeedIteratorCore(
this.ClientContext,
this.LinkUri,
resourceType: ResourceType.Document,
queryDefinition: null,
continuationToken: continuationToken,
options: requestOptions);
}

return new CosmosQueryExecutionContextFactory(
client: this.queryClient,
resourceTypeEnum: ResourceType.Document,
operationType: OperationType.Query,
resourceType: typeof(QueryResponse),
sqlQuerySpec: queryDefinition.ToSqlQuerySpec(),
return this.GetItemQueryStreamIteratorInternal(
sqlQuerySpec: queryDefinition?.ToSqlQuerySpec(),
isContinuationExcpected: true,
continuationToken: continuationToken,
queryRequestOptions: requestOptions,
resourceLink: this.LinkUri,
isContinuationExpected: true,
allowNonValueAggregateQuery: true,
correlatedActivityId: Guid.NewGuid());
requestOptions: requestOptions);
}

public override FeedIterator<T> GetItemQueryIterator<T>(
Expand Down Expand Up @@ -321,7 +296,7 @@ internal partial class ContainerCore : Container

return new CosmosLinqQuery<T>(
this,
this.ClientContext.CosmosSerializer,
this.ClientContext.ResponseFactory,
(CosmosQueryClientCore)this.queryClient,
continuationToken,
requestOptions,
Expand Down Expand Up @@ -397,6 +372,49 @@ public override Batch CreateBatch(PartitionKey partitionKey)
options: cosmosQueryRequestOptions);
}

/// <summary>
/// Helper method to create a stream feed iterator.
/// It decides if it is a query or read feed and create
/// the correct instance.
/// </summary>
internal FeedIterator GetItemQueryStreamIteratorInternal(
SqlQuerySpec sqlQuerySpec,
bool isContinuationExcpected,
string continuationToken,
QueryRequestOptions requestOptions)
{
requestOptions = requestOptions ?? new QueryRequestOptions();

if (requestOptions.IsEffectivePartitionKeyRouting)
{
requestOptions.PartitionKey = null;
}

if (sqlQuerySpec == null)
{
return new FeedIteratorCore(
this.ClientContext,
this.LinkUri,
resourceType: ResourceType.Document,
queryDefinition: null,
continuationToken: continuationToken,
options: requestOptions);
}

return new CosmosQueryExecutionContextFactory(
client: this.queryClient,
resourceTypeEnum: ResourceType.Document,
operationType: OperationType.Query,
resourceType: typeof(QueryResponse),
sqlQuerySpec: sqlQuerySpec,
continuationToken: continuationToken,
queryRequestOptions: requestOptions,
resourceLink: this.LinkUri,
isContinuationExpected: isContinuationExcpected,
allowNonValueAggregateQuery: true,
correlatedActivityId: Guid.NewGuid());
}

// Extracted partition key might be invalid as CollectionCache might be stale.
// Stale collection cache is refreshed through PartitionKeyMismatchRetryPolicy
// and partition-key is extracted again.
Expand Down