Skip to content

Commit

Permalink
* Fix compilation
Browse files Browse the repository at this point in the history
  • Loading branch information
ag-ramachandran committed Jul 2, 2024
1 parent 79f6da5 commit ac0a9e6
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 58 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.


package com.microsoft.kusto.spark

import com.microsoft.azure.kusto.data.ClientFactory
Expand All @@ -10,7 +9,10 @@ import com.microsoft.kusto.spark.KustoTestUtils.getSystemTestOptions
import com.microsoft.kusto.spark.datasink.{KustoSinkOptions, SinkTableCreationMode}
import com.microsoft.kusto.spark.datasource.{KustoSourceOptions, ReadMode}
import com.microsoft.kusto.spark.sql.extension.SparkExtension.DataFrameReaderExtension
import com.microsoft.kusto.spark.utils.CslCommandsGenerator.generateTempTableCreateCommand
import com.microsoft.kusto.spark.utils.CslCommandsGenerator.{
generateTableAlterStreamIngestionCommand,
generateTempTableCreateCommand
}
import com.microsoft.kusto.spark.utils.{KustoQueryUtils, KustoDataSourceUtils => KDSU}
import org.apache.spark.SparkContext
import org.apache.spark.sql._
Expand All @@ -31,7 +33,6 @@ class KustoSinkBatchE2E extends AnyFlatSpec with BeforeAndAfterAll {
private val rowId2 = new AtomicInteger(1)
private val timeoutMs: Int = 8 * 60 * 1000 // 8 minutes
private val sleepTimeTillTableCreate: Int = 3 * 60 * 1000 // 2 minutes

private def newRow(): String = s"row-${rowId.getAndIncrement()}"
private def newAllDataTypesRow(v: Int): (
String,
Expand Down Expand Up @@ -394,7 +395,7 @@ class KustoSinkBatchE2E extends AnyFlatSpec with BeforeAndAfterAll {
KustoTestUtils.validateResultsAndCleanup(
kustoAdminClient,
table,
database,
kustoTestConnectionOptions.database,
expectedNumberOfRows,
timeoutMs,
tableCleanupPrefix = prefix)
Expand All @@ -405,27 +406,25 @@ class KustoSinkBatchE2E extends AnyFlatSpec with BeforeAndAfterAll {
val df = rows.toDF("name", "value")
val prefix = "KustoBatchSinkE2EIngestStreamIngestion"
val table = KustoQueryUtils.simplifyName(s"${prefix}_${UUID.randomUUID()}")
val engineKcsb = ConnectionStringBuilder.createWithAadApplicationCredentials(
s"https://$cluster.kusto.windows.net",
appId,
appKey,
authority)
val engineKcsb = ConnectionStringBuilder.createWithAadAccessTokenAuthentication(
kustoTestConnectionOptions.cluster,
kustoTestConnectionOptions.accessToken)
val kustoAdminClient = ClientFactory.createClient(engineKcsb)
kustoAdminClient.execute(
database,
kustoTestConnectionOptions.database,
generateTempTableCreateCommand(table, columnsTypesAndNames = "ColA:string, ColB:int"))
kustoAdminClient.execute(database, generateTableAlterStreamIngestionCommand(table))
kustoAdminClient.execute(
kustoTestConnectionOptions.database,
generateTableAlterStreamIngestionCommand(table))

Thread.sleep(sleepTimeTillTableCreate)

df.write
.format("com.microsoft.kusto.spark.datasource")
.option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
.option(KustoSinkOptions.KUSTO_DATABASE, database)
.option(KustoSinkOptions.KUSTO_CLUSTER, kustoTestConnectionOptions.cluster)
.option(KustoSinkOptions.KUSTO_DATABASE, kustoTestConnectionOptions.database)
.option(KustoSinkOptions.KUSTO_TABLE, table)
.option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
.option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
.option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authority)
.option(KustoSinkOptions.KUSTO_ACCESS_TOKEN, kustoTestConnectionOptions.accessToken)
.option(KustoSinkOptions.KUSTO_WRITE_MODE, "Stream")
.mode(SaveMode.Append)
.save()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.


package com.microsoft.kusto.spark

import com.microsoft.azure.kusto.data.ClientFactory
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder
import com.microsoft.kusto.spark.KustoTestUtils.{KustoConnectionOptions, getSystemTestOptions}
import com.microsoft.kusto.spark.KustoTestUtils.getSystemTestOptions
import com.microsoft.kusto.spark.common.KustoDebugOptions
import com.microsoft.kusto.spark.datasink.{
KustoSinkOptions,
Expand Down Expand Up @@ -47,8 +46,7 @@ class KustoSinkStreamingE2E extends AnyFlatSpec with BeforeAndAfterAll {
super.afterAll()
sc.stop()
}
private lazy val kustoConnectionOptions: KustoConnectionOptions =
getSystemTestOptions
private lazy val kustoTestConnectionOptions = getSystemTestOptions

val csvPath: String = System.getProperty("path", "connector/src/test/resources/TestData/csv")
val customSchema: StructType = new StructType()
Expand All @@ -59,8 +57,8 @@ class KustoSinkStreamingE2E extends AnyFlatSpec with BeforeAndAfterAll {
val prefix = "KustoStreamingSparkE2E_Ingest"
val table = s"${prefix}_${UUID.randomUUID().toString.replace("-", "_")}"
val engineKcsb = ConnectionStringBuilder.createWithAadAccessTokenAuthentication(
kustoConnectionOptions.cluster,
kustoConnectionOptions.accessToken)
kustoTestConnectionOptions.cluster,
kustoTestConnectionOptions.accessToken)
val kustoAdminClient = ClientFactory.createClient(engineKcsb)

val csvDf = spark.readStream
Expand All @@ -83,10 +81,10 @@ class KustoSinkStreamingE2E extends AnyFlatSpec with BeforeAndAfterAll {
val kustoQ = csvDf.writeStream
.format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
.options(Map(
KustoSinkOptions.KUSTO_CLUSTER -> kustoConnectionOptions.cluster,
KustoSinkOptions.KUSTO_CLUSTER -> kustoTestConnectionOptions.cluster,
KustoSinkOptions.KUSTO_TABLE -> table,
KustoSinkOptions.KUSTO_DATABASE -> kustoConnectionOptions.database,
KustoSinkOptions.KUSTO_ACCESS_TOKEN -> kustoConnectionOptions.accessToken,
KustoSinkOptions.KUSTO_DATABASE -> kustoTestConnectionOptions.database,
KustoSinkOptions.KUSTO_ACCESS_TOKEN -> kustoTestConnectionOptions.accessToken,
KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS -> SinkTableCreationMode.CreateIfNotExist.toString,
KustoDebugOptions.KUSTO_ENSURE_NO_DUPLICATED_BLOBS -> true.toString,
KustoSinkOptions.KUSTO_SPARK_INGESTION_PROPERTIES_JSON -> sp.toString))
Expand All @@ -99,7 +97,7 @@ class KustoSinkStreamingE2E extends AnyFlatSpec with BeforeAndAfterAll {
KustoTestUtils.validateResultsAndCleanup(
kustoAdminClient,
table,
kustoConnectionOptions.database,
kustoTestConnectionOptions.database,
expectedNumberOfRows,
timeoutMs - sleepTimeTillTableCreate,
tableCleanupPrefix = prefix)
Expand All @@ -109,12 +107,12 @@ class KustoSinkStreamingE2E extends AnyFlatSpec with BeforeAndAfterAll {
val prefix = "KustoStreamingSparkE2EAsync_Ingest"
val table = s"${prefix}_${UUID.randomUUID().toString.replace("-", "_")}"
val engineKcsb = ConnectionStringBuilder.createWithAadAccessTokenAuthentication(
kustoConnectionOptions.cluster,
kustoConnectionOptions.accessToken)
kustoTestConnectionOptions.cluster,
kustoTestConnectionOptions.accessToken)
val kustoAdminClient = ClientFactory.createClient(engineKcsb)

kustoAdminClient.execute(
kustoConnectionOptions.database,
kustoTestConnectionOptions.database,
generateTempTableCreateCommand(table, columnsTypesAndNames = "ColA:string, ColB:int"))

val csvDf = spark.readStream
Expand All @@ -132,10 +130,10 @@ class KustoSinkStreamingE2E extends AnyFlatSpec with BeforeAndAfterAll {
val kustoQ = csvDf.writeStream
.format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
.options(Map(
KustoSinkOptions.KUSTO_CLUSTER -> kustoConnectionOptions.cluster,
KustoSinkOptions.KUSTO_CLUSTER -> kustoTestConnectionOptions.cluster,
KustoSinkOptions.KUSTO_TABLE -> table,
KustoSinkOptions.KUSTO_DATABASE -> kustoConnectionOptions.database,
KustoSinkOptions.KUSTO_AAD_APP_ID -> kustoConnectionOptions.accessToken,
KustoSinkOptions.KUSTO_DATABASE -> kustoTestConnectionOptions.database,
KustoSinkOptions.KUSTO_AAD_APP_ID -> kustoTestConnectionOptions.accessToken,
KustoSinkOptions.KUSTO_WRITE_ENABLE_ASYNC -> "true"))
.trigger(Trigger.Once)

Expand All @@ -144,7 +142,7 @@ class KustoSinkStreamingE2E extends AnyFlatSpec with BeforeAndAfterAll {
KustoTestUtils.validateResultsAndCleanup(
kustoAdminClient,
table,
kustoConnectionOptions.database,
kustoTestConnectionOptions.database,
expectedNumberOfRows,
timeoutMs,
tableCleanupPrefix = prefix)
Expand All @@ -153,20 +151,18 @@ class KustoSinkStreamingE2E extends AnyFlatSpec with BeforeAndAfterAll {
"KustoStreamingSinkStreamingIngestion" should "ingest structured data to a Kusto cluster using stream ingestion" taggedAs KustoE2E in {
val prefix = "KustoStreamingSparkE2E_StreamIngest"
val table = s"${prefix}_${UUID.randomUUID().toString.replace("-", "_")}"
val engineKcsb = ConnectionStringBuilder.createWithAadApplicationCredentials(
s"https://${kustoConnectionOptions.cluster}.kusto.windows.net",
kustoConnectionOptions.appId,
kustoConnectionOptions.appKey,
kustoConnectionOptions.authority)
val engineKcsb = ConnectionStringBuilder.createWithAadAccessTokenAuthentication(
s"https://${kustoTestConnectionOptions.cluster}.kusto.windows.net",
kustoTestConnectionOptions.accessToken)
val kustoAdminClient = ClientFactory.createClient(engineKcsb)
kustoAdminClient.execute(
kustoConnectionOptions.database,
kustoTestConnectionOptions.database,
generateTempTableCreateCommand(table, columnsTypesAndNames = "ColA:string, ColB:int"))
kustoAdminClient.execute(
kustoConnectionOptions.database,
kustoTestConnectionOptions.database,
generateTableAlterStreamIngestionCommand(table))
kustoAdminClient.execute(
kustoConnectionOptions.database,
kustoTestConnectionOptions.database,
generateClearStreamingIngestionCacheCommand(table))

val csvDf = spark.readStream
Expand All @@ -178,12 +174,10 @@ class KustoSinkStreamingE2E extends AnyFlatSpec with BeforeAndAfterAll {
val kustoQ = csvDf.writeStream
.format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
.options(Map(
KustoSinkOptions.KUSTO_CLUSTER -> kustoConnectionOptions.cluster,
KustoSinkOptions.KUSTO_CLUSTER -> kustoTestConnectionOptions.cluster,
KustoSinkOptions.KUSTO_TABLE -> table,
KustoSinkOptions.KUSTO_DATABASE -> kustoConnectionOptions.database,
KustoSinkOptions.KUSTO_AAD_APP_ID -> kustoConnectionOptions.appId,
KustoSinkOptions.KUSTO_AAD_APP_SECRET -> kustoConnectionOptions.appKey,
KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID -> kustoConnectionOptions.authority,
KustoSinkOptions.KUSTO_DATABASE -> kustoTestConnectionOptions.database,
KustoSinkOptions.KUSTO_ACCESS_TOKEN -> kustoTestConnectionOptions.accessToken,
KustoSinkOptions.KUSTO_WRITE_MODE -> WriteMode.Stream.toString))
.trigger(Trigger.Once)

Expand All @@ -192,7 +186,7 @@ class KustoSinkStreamingE2E extends AnyFlatSpec with BeforeAndAfterAll {
KustoTestUtils.validateResultsAndCleanup(
kustoAdminClient,
table,
kustoConnectionOptions.database,
kustoTestConnectionOptions.database,
expectedNumberOfRows,
10,
tableCleanupPrefix = prefix)
Expand All @@ -201,11 +195,9 @@ class KustoSinkStreamingE2E extends AnyFlatSpec with BeforeAndAfterAll {
"KustoStreamingSinkStreamingIngestionWithCreate" should "ingest structured data to a Kusto cluster using stream ingestion" taggedAs KustoE2E in {
val prefix = "KustoStreamingSparkE2E_StreamIngest"
val table = s"${prefix}_${UUID.randomUUID().toString.replace("-", "_")}"
val engineKcsb = ConnectionStringBuilder.createWithAadApplicationCredentials(
s"https://${kustoConnectionOptions.cluster}.kusto.windows.net",
kustoConnectionOptions.appId,
kustoConnectionOptions.appKey,
kustoConnectionOptions.authority)
val engineKcsb = ConnectionStringBuilder.createWithAadAccessTokenAuthentication(
s"https://${kustoTestConnectionOptions.cluster}.kusto.windows.net",
kustoTestConnectionOptions.accessToken)
val kustoAdminClient = ClientFactory.createClient(engineKcsb)

val csvDf = spark.readStream
Expand All @@ -217,12 +209,10 @@ class KustoSinkStreamingE2E extends AnyFlatSpec with BeforeAndAfterAll {
val kustoQ = csvDf.writeStream
.format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
.options(Map(
KustoSinkOptions.KUSTO_CLUSTER -> kustoConnectionOptions.cluster,
KustoSinkOptions.KUSTO_CLUSTER -> kustoTestConnectionOptions.cluster,
KustoSinkOptions.KUSTO_TABLE -> table,
KustoSinkOptions.KUSTO_DATABASE -> kustoConnectionOptions.database,
KustoSinkOptions.KUSTO_AAD_APP_ID -> kustoConnectionOptions.appId,
KustoSinkOptions.KUSTO_AAD_APP_SECRET -> kustoConnectionOptions.appKey,
KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID -> kustoConnectionOptions.authority,
KustoSinkOptions.KUSTO_DATABASE -> kustoTestConnectionOptions.database,
KustoSinkOptions.KUSTO_ACCESS_TOKEN -> kustoTestConnectionOptions.accessToken,
KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS -> SinkTableCreationMode.CreateIfNotExist.toString,
KustoSinkOptions.KUSTO_WRITE_MODE -> WriteMode.Stream.toString))
.trigger(Trigger.Once)
Expand All @@ -232,7 +222,7 @@ class KustoSinkStreamingE2E extends AnyFlatSpec with BeforeAndAfterAll {
KustoTestUtils.validateResultsAndCleanup(
kustoAdminClient,
table,
kustoConnectionOptions.database,
kustoTestConnectionOptions.database,
expectedNumberOfRows,
10,
tableCleanupPrefix = prefix)
Expand Down

0 comments on commit ac0a9e6

Please sign in to comment.