Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DR-3038] Fix Azure Synapse Querying #1478

Merged
merged 6 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public final class IngestUtils {
private static final String TARGET_DATA_SOURCE_PREFIX = "target_data_source_";
private static final String INGEST_TABLE_NAME_PREFIX = "ingest_";
private static final String SCRATCH_TABLE_NAME_PREFIX = "scratch_";
private static final String FLIGHT_ID_PREFIX = "flight_";

private static final Logger logger = LoggerFactory.getLogger(IngestUtils.class);

Expand Down Expand Up @@ -386,7 +387,7 @@ public static List<Column> getDatasetFileRefColumns(

// Note: this is the unqualified path (e.g. it gets used in metadata and scratch directories)
public static String getParquetFilePath(String targetTableName, String flightId) {
return "parquet/" + targetTableName + "/" + flightId + ".parquet";
return "parquet/" + targetTableName + "/" + FLIGHT_ID_PREFIX + flightId + ".parquet";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to create a constant for parquet/ too? It's used four times in this file.

}

/**
Expand Down
10 changes: 7 additions & 3 deletions src/test/java/bio/terra/common/SynapseUtils.java
Copy link
Member

@pshapiro4broad pshapiro4broad Jul 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While looking at this file I noticed synapseTestCleanup() calls logger like this:

      logger.warn("[Cleanup exception] Unable to drop tables.", ex.getMessage());

Which is incorrect; only logger.error() accepts an exception as a final parameter. Actually it does accept an exception, but this code is passing in a String. For this to work as expected, it needs to format the exception in the string:

      logger.warn("[Cleanup exception] Unable to drop tables. {}", ex.getMessage());

or it could pass the exception in:

      logger.warn("[Cleanup exception] Unable to drop tables.", ex);

(Although I doubt the value of these log statements as I don't think anything is looking at the log outputs to monitor for these messages, so they won't be acted upon.)

Original file line number Diff line number Diff line change
Expand Up @@ -407,11 +407,11 @@ public void performIngest(
// 3 - Retrieve info about database schema so that we can populate the parquet create query
String tableName = destinationTable.getName();
String destinationParquetFile =
FolderType.METADATA.getPath("parquet/" + tableName + "/" + ingestFlightId + ".parquet");
FolderType.METADATA.getPath(IngestUtils.getParquetFilePath(tableName, ingestFlightId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thanks for refactoring out to call the existing utility method!


String scratchParquetFile =
FolderType.SCRATCH.getPath(
"parquet/" + SCRATCH_TABLE_NAME_PREFIX + tableName + "/" + ingestFlightId + ".parquet");
IngestUtils.getParquetFilePath(SCRATCH_TABLE_NAME_PREFIX + tableName, ingestFlightId));

// 4 - Create parquet files via external table
// All inputs should be sanitized before passed into this method
Expand Down Expand Up @@ -492,7 +492,11 @@ public DatasetTable ingestIntoAllDataTypesTable(

String scratchParquetFile =
FolderType.SCRATCH.getPath(
"parquet/scratch_" + destinationTable.getName() + "/" + randomFlightId + ".parquet");
"parquet/scratch_"
+ destinationTable.getName()
+ "/flight"
Copy link
Member

@pshapiro4broad pshapiro4broad Jul 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be /_flight? And if so, could this use IngestUtils.getParquetFilePath() instead?

+ randomFlightId
+ ".parquet");
addParquetFileName(scratchParquetFile, datasetStorageAccountResource);
addParquetFileName(
IngestUtils.getParquetFilePath(destinationTable.getName(), randomFlightId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.params.provider.Arguments.arguments;

import bio.terra.model.IngestRequestModel;
import bio.terra.model.IngestRequestModel.FormatEnum;
import bio.terra.service.dataset.exception.InvalidBlobURLException;
import bio.terra.service.job.JobMapKeys;
import bio.terra.stairway.FlightMap;
import bio.terra.stairway.ShortUUID;
import com.azure.storage.blob.BlobUrlParts;
import java.util.stream.Stream;
import org.junit.jupiter.api.Tag;
Expand Down Expand Up @@ -123,6 +125,14 @@ void testShouldIgnoreUserSpecifiedRowIds() {
IngestUtils.shouldIgnoreUserSpecifiedRowIds(flightMapMerge));
}

@Test
void testGetParquetFilePath() {
String targetTableName = "sample";
String flightId = "_" + ShortUUID.get();
String expectedPath = "parquet/" + targetTableName + "/flight_" + flightId + ".parquet";
assertEquals(IngestUtils.getParquetFilePath(targetTableName, flightId), expectedPath);
}

/**
* @param updateStrategy to specify on a new stub ingest request
* @return a new FlightMap whose ingest request contains the provided update strategy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,33 @@ public void testSnapshotByQuery() throws SQLException, IOException {
createSnapshotRowIdParquet(snapshotByQueryTableRowCounts);
}

@Test
public void testQuerySnapshotWithUnderscoreInFlightId() throws SQLException, IOException {
String ingestFlightId = "_" + UUID.randomUUID().toString();
samanehsan marked this conversation as resolved.
Show resolved Hide resolved
SnapshotTable participantSnapshotTable = setupParticipantTable(ingestFlightId);
List<SnapshotTable> tables = List.of(participantSnapshotTable);
snapshot.snapshotTables(tables);
Map<String, Long> tableRowCounts =
azureSynapsePdao.createSnapshotParquetFiles(
tables, snapshotId, sourceDatasetDataSourceName, snapshotDataSourceName, false, null);
synapseUtils.addTableName(IngestUtils.formatSnapshotTableName(snapshotId, "participant"));

String snapshotParquetFileName =
IngestUtils.getSnapshotParquetFilePathForQuery(participantTable.getName());
synapseUtils.addParquetFileName(snapshotParquetFileName, snapshotStorageAccountResource);
List<String> snapshotFirstNames =
synapseUtils.readParquetFileStringColumn(
snapshotParquetFileName, snapshotDataSourceName, "first_name", true);
assertThat(
"List of names in snapshot should equal the dataset names",
snapshotFirstNames,
equalTo(List.of("Sally", "Bobby", "Freddy", "Charles")));
assertThat(
"Table row count should equal 4 for destination table",
tableRowCounts.get(participantTable.getName()),
equalTo(4L));
samanehsan marked this conversation as resolved.
Show resolved Hide resolved
}

private void setupFourTableDataset() throws SQLException, IOException {
// Prep dataset data for snapshot
snapshot = new Snapshot().id(snapshotId);
Expand Down Expand Up @@ -447,8 +474,8 @@ private void createSnapshotRowIdParquet(Map<String, Long> tableRowCounts) throws
assertThat("Snapshot contains expected number or rows", snapshotRowIds.size(), equalTo(5));
}

private SnapshotTable setupParticipantTable() throws SQLException, IOException {
String participantTableIngestFlightId = UUID.randomUUID().toString();
private SnapshotTable setupParticipantTable(String participantTableIngestFlightId)
throws SQLException, IOException {
participantTable =
synapseUtils.ingestIntoTable(
"ingest-test-dataset-table-participant.json",
Expand All @@ -462,6 +489,11 @@ private SnapshotTable setupParticipantTable() throws SQLException, IOException {
return setupSnapshotTable(participantTable);
}

private SnapshotTable setupParticipantTable() throws SQLException, IOException {
String participantTableIngestFlightId = UUID.randomUUID().toString();
return setupParticipantTable(participantTableIngestFlightId);
}

private SnapshotTable setupSampleTable() throws SQLException, IOException {
String sampleTableIngestFlightId = UUID.randomUUID().toString();
sampleTable =
Expand Down
Loading