Use the following Azure Databricks storage setup block only if you are using Azure Databricks. You can refer to the instructions here to get started:
https://docs.microsoft.com/en-us/azure/databricks/data/data-sources/azure/adls-gen2/azure-datalake-gen2-sp-access

If you are using Synapse Spark and if your data is residing on the storage attached to the Synapse Spark workspace, you can skip the below storage setup section.

In [None]:
%scala
val storageAccountName = "<INSERT STORAGE ACCOUNT>"
val fileSystemName = "<INSERT CONTAINER NAME>"

val commonPath = "abfss://" + fileSystemName  + "@" + storageAccountName + ".dfs.core.windows.net"

# AAD Application Details
val appID = "<INSERT APP ID>"
val secret = "<INSERT SECRET>"
val tenantID = "<INSERT TENANT ID>"

spark.conf.set("fs.azure.account.auth.type." + storageAccountName + ".dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type." + storageAccountName + ".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id." + storageAccountName + ".dfs.core.windows.net", "" + appID + "")
spark.conf.set("fs.azure.account.oauth2.client.secret." + storageAccountName + ".dfs.core.windows.net", "" + secret + "")
spark.conf.set("fs.azure.account.oauth2.client.endpoint." + storageAccountName + ".dfs.core.windows.net", "https://login.microsoftonline.com/" + tenantID + "/oauth2/token")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
dbutils.fs.ls("abfss://" + fileSystemName  + "@" + storageAccountName + ".dfs.core.windows.net/")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false")

In [None]:
import org.apache.spark.sql.{DataFrame, Row, SaveMode}
import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}

val tripsParquetPath = commonPath + "/hyperspace/trips/"
val driverParquetPath = commonPath + "/hyperspace/driver/"

// Generate sample trips data
val tripSchema = new StructType().add("tripId", StringType).add("driverId", StringType).add("customerId",StringType).add("cabId",StringType).add("tripDate",StringType).add("startLocation",StringType).add("endLocation",StringType)

val tripData = Seq(
  Row("100", "200", "300", "400", "20240101", "New York", "New Jersey"),
  Row("101", "201", "301", "401", "20240102", "Tempe", "Phoenix"),
  Row("102", "202", "302", "402", "20240103", "San Jose", "San Franciso"),
  Row("103", "203", "303", "403", "20240102", "New York", "Boston"),
  Row("104", "204", "304", "404", "20240103", "New York", "Washington"),
  Row("105", "205", "305", "405", "20240201", "Miami", "Fort Lauderdale"),
  Row("106", "206", "306", "406", "20240202", "Seattle", "Redmond"),
  Row("107", "207", "307", "407", "20240203", "Los Angeles", "San Diego"),
  Row("108", "208", "308", "408", "20240301", "Phoenix", "Las Vegas"),
  Row("109", "209", "309", "409", "20240302", "Washington", "Baltimore"),
  Row("110", "210", "310", "410", "20240303", "Dallas", "Austin"),
  Row("111", "211", "311", "411", "20240303", "New York", "New Jersey"),
  Row("112", "212", "312", "412", "20240304", "New York", "Boston"),
  Row("113", "212", "312", "412", "20240401", "San Jose", "San Ramon"),
  Row("114", "212", "312", "412", "20240404", "San Jose", "Oakland"),
  Row("115", "212", "312", "412", "20240404", "Tempe", "Scottsdale"),
  Row("116", "212", "312", "412", "20240405", "Washington", "Atlanta"),
  Row("117", "212", "312", "412", "20240405", "Seattle", "Portland"),
  Row("118", "212", "312", "412", "20240405", "Miami", "Tampa")
)

// Write Trips to Parquet
val tripWriteDF = spark.createDataFrame(spark.sparkContext.parallelize(tripData),tripSchema)
tripWriteDF.write.mode("overwrite").parquet(tripsParquetPath)

val driverSchema = new StructType().add("driverId", StringType).add("name", StringType).add("license",StringType).add("gender",StringType).add("salary",IntegerType)

val driverData = Seq(
  Row("200", "Alice", "A224455", "Female", 3000),
  Row("202", "Bryan","B992244","Male",4000),
  Row("204", "Catherine","C887733","Female",4000),
  Row("208", "Daryl","D229988","Male",3000),
  Row("212", "Jenny","J663300","Female", 5000)
)
// Write Driver to Parquet
val driverWriteDF = spark.createDataFrame(spark.sparkContext.parallelize(driverData),driverSchema)
driverWriteDF.write.mode("overwrite").parquet(driverParquetPath)



In [None]:
// Let us read back the files to check if the data is showing up correctly
val tripsDF: DataFrame = spark.read.parquet(tripsParquetPath)
val driverDF: DataFrame = spark.read.parquet(driverParquetPath)

// Verify the data is available and correct
tripsDF.show()
driverDF.show()

In [None]:
// Now let us try to join the tables and create a query, which we can later optimize using Hyperspace indexing
val driverFilter: DataFrame = tripsDF.join(driverDF, tripsDF("driverId") === driverDF("driverId")).select(tripsDF("tripId"), driverDF("name"))
driverFilter.show()

driverFilter.explain(true)

In [None]:
// Let us try the same query with Hypserspace enabled now

// Create an instance of Hyperspace
import com.microsoft.hyperspace._
import com.microsoft.hyperspace.index._

val hs: Hyperspace = Hyperspace()

// Delete and vacuum the index if you are trying to rerun the query
//hs.deleteIndex("TripIndex")
//hs.deleteIndex("DriverIndex")
//hs.vacuumIndex("TripIndex")
//hs.vacuumIndex("DriverIndex")

// Create the trips and driver indexes
hs.createIndex(tripsDF, IndexConfig("TripIndex", indexedColumns = Seq("driverId"), includedColumns = Seq("tripId")))
hs.createIndex(driverDF, IndexConfig("DriverIndex", indexedColumns = Seq("driverId"), includedColumns = Seq("name")))

// List the indexes to check if the new indexes have been created
hs.indexes.show()

In [None]:
// Enable Hyperspace
spark.enableHyperspace

// Read back the same trip and driver parquet data into dataframes again
val tripIndexDF: DataFrame = spark.read.parquet(tripsParquetPath)
val driverIndexDF: DataFrame = spark.read.parquet(driverParquetPath)

tripIndexDF.show(5)
driverIndexDF.show(5)

In [None]:
// Run a Join query again
val filterJoin: DataFrame = tripIndexDF.join(driverIndexDF, tripIndexDF("driverId") === driverIndexDF("driverId")).select(tripIndexDF("tripId"), driverIndexDF("name"))
filterJoin.show()

In [None]:
// Check the comparision of the queryplan with and without Index

spark.conf.set("spark.hyperspace.explain.displayMode", "html")
hs.explain(filterJoin)(displayHTML(_))