Use the following Azure Databricks storage setup block only if you are using Azure Databricks. You can refer to the instructions here to get started:
https://docs.microsoft.com/en-us/azure/databricks/data/data-sources/azure/adls-gen2/azure-datalake-gen2-sp-access

If you are using Synapse Spark and if your data is residing on the storage attached to the Synapse Spark workspace, you can skip the below storage setup section.

In [None]:
%scala
val storageAccountName = "<INSERT STORAGE ACCOUNT>"
val fileSystemName = "<INSERT CONTAINER NAME>"

val commonPath = "abfss://" + fileSystemName  + "@" + storageAccountName + ".dfs.core.windows.net"

# AAD Application Details
val appID = "<INSERT APP ID>"
val secret = "<INSERT SECRET>"
val tenantID = "<INSERT TENANT ID>"

spark.conf.set("fs.azure.account.auth.type." + storageAccountName + ".dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type." + storageAccountName + ".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id." + storageAccountName + ".dfs.core.windows.net", "" + appID + "")
spark.conf.set("fs.azure.account.oauth2.client.secret." + storageAccountName + ".dfs.core.windows.net", "" + secret + "")
spark.conf.set("fs.azure.account.oauth2.client.endpoint." + storageAccountName + ".dfs.core.windows.net", "https://login.microsoftonline.com/" + tenantID + "/oauth2/token")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
dbutils.fs.ls("abfss://" + fileSystemName  + "@" + storageAccountName + ".dfs.core.windows.net/")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false")

In [None]:
%scala

// Let us see how to write to a JSON file
import org.apache.spark.sql.{DataFrame, Row, SaveMode}
import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}

val tripsCSVPath = commonPath + "/batch/csv/trips"
val faresParquetPath = commonPath + "/batch/parquet/fares"

// Generate sample data
val tripSchema = new StructType()
      .add("tripId",IntegerType)
      .add("driverId",IntegerType)
      .add("customerId",IntegerType)
      .add("cabId",IntegerType)
      .add("tripDate",StringType)
      .add("startLocation",StringType)
      .add("endLocation",StringType)
      
val tripData = Seq(
  Row(100, 200, 300, 400, "20220101", "New York", "New Jersey"),
  Row(101, 201, 301, 401, "20220102", "Tempe", "Phoenix"),
  Row(102, 202, 302, 402, "20220103", "San Jose", "San Franciso"),
  Row(103, 203, 303, 403, "20220102", "New York", "Boston"),
  Row(104, 204, 304, 404, "20220103", "New York", "Washington"),
  Row(105, 205, 305, 405, "20220201", "Miami", "Fort Lauderdale"),
  Row(106, 206, 306, 406, "20220202", "Seattle", "Redmond"),
  Row(107, 207, 307, 407, "20220203", "Los Angeles", "San Diego"),
  Row(108, 208, 308, 408, "20220301", "Phoenix", "Las Vegas"),
  Row(109, 209, 309, 409, "20220302", "Washington", "Baltimore"),
  Row(110, 210, 310, 410, "20220303", "Dallas", "Austin"),
)

// Write Trips to CSV file
val tripDF = spark.createDataFrame(spark.sparkContext.parallelize(tripData),tripSchema)
tripDF.printSchema()
tripDF.show(false)
tripDF.write.mode("overwrite").option("header", "true").csv(tripsCSVPath)

// Generate sample fares data
val fareSchema = new StructType()
      .add("tripId",IntegerType)
      .add("fare",IntegerType)
      .add("currency",StringType)

val fareData = Seq(
  Row(100, 100, "USD"),
  Row(101, 20, "USD"),
  Row(102, 25, "USD"),
  Row(103, 140, "USD"),
  Row(104, 340, "USD"),
  Row(105, 75, "USD"),
  Row(106, 50, "USD"),
  Row(107, 125, "USD"),
  Row(108, 40, "USD"),
  Row(109, 80, "USD"),
  Row(110, 160, "USD")
)

// Write Trips to Parquet file
val faresDF = spark.createDataFrame(spark.sparkContext.parallelize(fareData),fareSchema)
faresDF.printSchema()
faresDF.show(false)
faresDF.write.mode("overwrite").option("header", "true").parquet(faresParquetPath)


In [None]:
%scala
//Sample Batch Transformation using ADB. Let us try this one in scala
import org.apache.spark.sql.{DataFrame, Row, SaveMode}
import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}

val tripsCSVPath = commonPath + "/batch/csv/trips/*"
val faresParquetPath = commonPath + "/batch/parquet/fares/*"
val outputParquetPath = commonPath + "/batch/parquet/output"

// Read  the Trip data (stored as CSV file) and the Fares data (stored as Parquet files)
val tripsSchema = new StructType()
      .add("tripId",IntegerType)
      .add("driverId",IntegerType)
      .add("customerId",IntegerType)
      .add("cabId",IntegerType)
      .add("tripDate",IntegerType)
      .add("startLocation",StringType)
      .add("endLocation",StringType)

val tripsCSV = spark.read.format("csv")
      .option("header", "true")
      .schema(tripsSchema)
      .load(tripsCSVPath)
tripsCSV.printSchema()
tripsCSV.show(false)

val faresSchema = new StructType()
      .add("tripId",IntegerType)
      .add("fare",IntegerType)
      .add("currency",StringType)

val faresParquet = spark.read.format("parquet")
            .schema(faresSchema)
            .load(faresParquetPath)
faresParquet.printSchema()
faresParquet.show(false)



// Join them on the tripID and group by StartLocation.
val joinDF = tripsCSV.join(
faresParquet,tripsCSV("tripId") === 
      faresParquet("tripId"),"inner")
.groupBy("startLocation")
.sum("fare");

// Print the output table with columns: City and Fare
import org.apache.spark.sql.functions.col;
val outputDF = joinDF.select(col("startLocation").alias("City"),col("sum(fare)").alias("Fare"));
display(outputDF)
//	Finally, write the output back to ADLS Gen2 under the transform/fares/out folder.
outputDF.write.mode("overwrite").parquet(outputParquetPath)
