Skip to content

Commit

Permalink
Merge branch 'postgresql-dialect' into sample-application-django
Browse files Browse the repository at this point in the history
  • Loading branch information
olavloite committed Feb 18, 2023
2 parents 697a332 + a8022c2 commit 5ea9615
Show file tree
Hide file tree
Showing 43 changed files with 2,607 additions and 578 deletions.
5 changes: 5 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ updates:
directory: "/"
schedule:
interval: "daily"

- package-ecosystem: "nuget"
directory: "/src/test/csharp/pgadapter_npgsql_tests"
schedule:
interval: "daily"
19 changes: 19 additions & 0 deletions docs/psycopg3.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,25 @@ with psycopg.connect("host=localhost port=5432 dbname=my-database") as conn:
conn.commit()
```

#### Stale Reads
Read-only transactions and connections using AUTOCOMMIT will by default use strong reads for queries.
Cloud Spanner also supports stale reads. A strong read is a read at a current timestamp and is
guaranteed to see all data that has been committed up until the start of this read. Cloud Spanner
defaults to using strong reads to serve read requests.

A stale read is read at a timestamp in the past. If your application is latency sensitive but
tolerant of stale data, then stale reads can provide performance benefits.
See also https://cloud.google.com/spanner/docs/reads#read_types

You can create a connection that will use stale reads in autocommit mode by adding the following to
the connection string:

```python
with psycopg.connect("host=localhost port=5432 dbname=my-database options='-c spanner.read_only_staleness=\\'MAX_STALENESS\\ 10s\\''", autocommit=True) as conn:
# This connection will read data that is up to 10 seconds stale.
conn.execute("SELECT * FROM my_table WHERE key='my-key'")
```

### Batching / Pipelining
Use [batching / pipelining](https://www.psycopg.org/psycopg3/docs/advanced/pipeline.html) for
optimal performance when executing multiple statements. This both saves round-trips between your
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
com.google.cloud.spanner.pgadapter.nodejs.NodeJSTest
</excludedTests>

<spanner.version>6.35.1</spanner.version>
<spanner.version>6.35.2</spanner.version>
<junixsocket.version>2.6.2</junixsocket.version>
</properties>

Expand Down Expand Up @@ -85,7 +85,7 @@
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.5.3</version>
<version>42.5.4</version>
</dependency>
<dependency>
<groupId>com.kohlschutter.junixsocket</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,34 @@

package com.google.cloud.spanner.pgadapter.parsers;

import com.google.cloud.ByteArray;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Type;
import com.google.cloud.spanner.Type.Code;
import com.google.cloud.spanner.Value;
import com.google.cloud.spanner.pgadapter.error.PGException;
import com.google.cloud.spanner.pgadapter.error.PGExceptionFactory;
import com.google.cloud.spanner.pgadapter.error.SQLState;
import com.google.cloud.spanner.pgadapter.session.SessionState;
import com.google.cloud.spanner.pgadapter.statements.SimpleParser;
import com.google.common.base.Preconditions;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.text.StringEscapeUtils;
import org.postgresql.core.Oid;
import org.postgresql.util.ByteConverter;

/**
* Translate wire protocol to array. Since arrays house any other specified types (including
Expand All @@ -39,23 +54,44 @@ public class ArrayParser extends Parser<List<?>> {
private static final String SPANNER_ARRAY_OPEN = "[", SPANNER_ARRAY_CLOSE = "]";

private final Type arrayElementType;
private final int elementOid;
private final boolean isStringEquivalent;
private final SessionState sessionState;

public ArrayParser(ResultSet item, int position, SessionState sessionState) {
Preconditions.checkArgument(!item.isNull(position));
this.sessionState = sessionState;
this.arrayElementType = item.getColumnType(position).getArrayElementType();
this.elementOid = Parser.toOid(item.getColumnType(position).getArrayElementType());
if (this.arrayElementType.getCode() == Code.ARRAY) {
throw new IllegalArgumentException(
"Spanner does not support embedded Arrays."
+ " If you are seeing this, something went wrong!");
}
this.item = toList(item.getValue(position), this.arrayElementType.getCode());
this.isStringEquivalent = stringEquivalence(this.arrayElementType.getCode());
}

public ArrayParser(
byte[] item,
FormatCode formatCode,
SessionState sessionState,
Type arrayElementType,
int elementOid) {
this.sessionState = sessionState;
this.arrayElementType = arrayElementType;
this.elementOid = elementOid;
this.isStringEquivalent = stringEquivalence(elementOid);
if (item != null) {
this.arrayElementType = item.getColumnType(position).getArrayElementType();
if (this.arrayElementType.getCode() == Code.ARRAY) {
throw new IllegalArgumentException(
"Spanner does not support embedded Arrays."
+ " If you are seeing this, something went wrong!");
switch (formatCode) {
case TEXT:
this.item = stringArrayToList(new String(item, StandardCharsets.UTF_8), elementOid);
break;
case BINARY:
this.item = binaryArrayToList(item);
break;
default:
}
this.item = toList(item.getValue(position), this.arrayElementType.getCode());
this.isStringEquivalent = stringEquivalence(this.arrayElementType.getCode());
} else {
arrayElementType = null;
isStringEquivalent = false;
}
}

Expand Down Expand Up @@ -90,6 +126,67 @@ private List<?> toList(Value value, Code arrayElementType) {
}
}

private List<?> stringArrayToList(String value, int elementOid) {
Preconditions.checkNotNull(value);
List<String> values =
SimpleParser.readArrayLiteral(value, this.isStringEquivalent, elementOid == Oid.BYTEA);
ArrayList<Object> result = new ArrayList<>(values.size());
for (String element : values) {
if (element == null) {
result.add(null);
} else {
result.add(
Parser.create(
sessionState,
element.getBytes(StandardCharsets.UTF_8),
elementOid,
FormatCode.TEXT)
.item);
}
}
return result;
}

private List<?> binaryArrayToList(byte[] value) {
Preconditions.checkNotNull(value);
byte[] buffer = new byte[20];
try (DataInputStream dataStream = new DataInputStream(new ByteArrayInputStream(value))) {
dataStream.readFully(buffer);
int dimensions = ByteConverter.int4(buffer, 0);
if (dimensions != 1) {
throw PGExceptionFactory.newPGException(
"Only single-dimension arrays are supported", SQLState.InvalidParameterValue);
}
// Null flag indicates whether there is at least one null element in the array. This is
// ignored by PGAdapter, as we check for null elements in the conversion function below.
int nullFlag = ByteConverter.int4(buffer, 4);
int oid = ByteConverter.int4(buffer, 8);
int size = ByteConverter.int4(buffer, 12);
// Lower bound indicates whether the lower bound of the array is 1 or 0. This is irrelevant
// for Cloud Spanner.
int lowerBound = ByteConverter.int4(buffer, 16);
ArrayList<Object> result = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
buffer = new byte[4];
dataStream.readFully(buffer);
int elementSize = ByteConverter.int4(buffer, 0);
if (elementSize == -1) {
result.add(null);
} else {
buffer = new byte[elementSize];
dataStream.readFully(buffer);
result.add(Parser.create(sessionState, buffer, oid, FormatCode.BINARY).item);
}
}
return result;
} catch (IOException exception) {
throw PGException.newBuilder("Invalid array value")
.setSQLState(SQLState.InvalidParameterValue)
.setCause(exception)
.build();
}
}

@Override
public List<?> getItem() {
return this.item;
Expand All @@ -109,6 +206,28 @@ private boolean stringEquivalence(Code arrayElementType) {
|| arrayElementType == Code.PG_JSONB;
}

/**
* Whether a type is represented as string.
*
* @param arrayElementTypeOid The type to check.
* @return True if the type uses strings, false otherwise.
*/
private boolean stringEquivalence(int arrayElementTypeOid) {
switch (arrayElementTypeOid) {
case Oid.BYTEA:
case Oid.DATE:
case Oid.TIMESTAMPTZ:
case Oid.TIMESTAMP:
case Oid.VARCHAR:
case Oid.UUID:
case Oid.TEXT:
case Oid.JSONB:
return true;
default:
return false;
}
}

/**
* Put quotes around the item if it is string equivalent, otherwise do not modify it.
*
Expand All @@ -121,9 +240,7 @@ private String stringify(String value) {
return "NULL";
}
if (this.isStringEquivalent) {
if (value.indexOf('\\') > -1) {
value = value.replace("\\", "\\\\");
}
value = StringEscapeUtils.escapeJava(value);
return STRING_TOGGLE + value + STRING_TOGGLE;
}
return value;
Expand Down Expand Up @@ -190,9 +307,83 @@ protected byte[] binaryParse() {
}
}

@SuppressWarnings("unchecked")
@Override
public void bind(Statement.Builder statementBuilder, String name) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.UNIMPLEMENTED, "Array parameters not yet supported");
switch (elementOid) {
case Oid.BIT:
case Oid.BOOL:
statementBuilder.bind(name).toBoolArray((List<Boolean>) this.item);
break;
case Oid.INT2:
if (this.item == null) {
statementBuilder.bind(name).toInt64Array((long[]) null);
} else {
statementBuilder
.bind(name)
.toInt64Array(
((List<Short>) this.item)
.stream()
.map(s -> s == null ? null : s.longValue())
.collect(Collectors.toList()));
}
break;
case Oid.INT4:
if (this.item == null) {
statementBuilder.bind(name).toInt64Array((long[]) null);
} else {
statementBuilder
.bind(name)
.toInt64Array(
((List<Integer>) this.item)
.stream()
.map(i -> i == null ? null : i.longValue())
.collect(Collectors.toList()));
}
break;
case Oid.INT8:
statementBuilder.bind(name).toInt64Array((List<Long>) this.item);
break;
case Oid.NUMERIC:
statementBuilder.bind(name).toPgNumericArray((List<String>) this.item);
break;
case Oid.FLOAT4:
if (this.item == null) {
statementBuilder.bind(name).toFloat64Array((double[]) null);
} else {
statementBuilder
.bind(name)
.toFloat64Array(
((List<Float>) this.item)
.stream()
.map(f -> f == null ? null : f.doubleValue())
.collect(Collectors.toList()));
}
break;
case Oid.FLOAT8:
statementBuilder.bind(name).toFloat64Array((List<Double>) this.item);
break;
case Oid.UUID:
case Oid.VARCHAR:
case Oid.TEXT:
statementBuilder.bind(name).toStringArray((List<String>) this.item);
break;
case Oid.JSONB:
statementBuilder.bind(name).toPgJsonbArray((List<String>) this.item);
break;
case Oid.BYTEA:
statementBuilder.bind(name).toBytesArray((List<ByteArray>) this.item);
break;
case Oid.TIMESTAMPTZ:
case Oid.TIMESTAMP:
statementBuilder.bind(name).toTimestampArray((List<Timestamp>) this.item);
break;
case Oid.DATE:
statementBuilder.bind(name).toDateArray((List<Date>) this.item);
break;
default:
throw PGExceptionFactory.newPGException(
"Unsupported array element type: " + arrayElementType, SQLState.InvalidParameterValue);
}
}
}
Loading

0 comments on commit 5ea9615

Please sign in to comment.