In [1]:
#I'm using credentials created previously using databricks secrets
#More info here: https://docs.azuredatabricks.net/user-guide/secrets/secrets.html
ClientId = dbutils.secrets.get(scope = "Lambda", key = "ClientId")
ClientSecret = dbutils.secrets.get(scope = "Lambda", key = "ClientSecret")
TenantName = dbutils.secrets.get(scope = "Lambda", key = "TenantName")

In [2]:
#I now need to configure ADLS to use my credentials
spark.conf.set("dfs.adls.oauth2.access.token.provider.type", "ClientCredential")
spark.conf.set("dfs.adls.oauth2.client.id", ClientId)
spark.conf.set("dfs.adls.oauth2.credential", ClientSecret)
spark.conf.set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com/"+TenantName+"/oauth2/token")

In [3]:
#I'm using the current date/time to work out the active file, so lets exclude it
import datetime
ts = datetime.datetime.utcnow().strftime('%Y/%m/%d/%H_')

In [4]:
## Infer the schema from a known sample file
sampDF = sqlContext.read \
  .format("com.databricks.spark.csv") \
  .option("header","true") \
  .option("inferSchema","true") \
  .load("adl://[ADLS ACCOUNT].azuredatalakestore.net/[SAMPLE FILE LOCATION]")

## Create DataFrame over all files, using inferred schema. Append FilePath
speedDF = sqlContext.read \
  .format("com.databricks.spark.csv") \
  .schema(sampDF.schema) \
  .option("header", "true") \
  .load("adl://[ADLS ACCOUNT].azuredatalakestore.net/[ROOT FOLDER FOR DATASET]/*/*/*/*.csv") \
  .selectExpr('*','input_file_name() as filename')

In [5]:
# Use Filename Column to selectively filter files
filterDF = speedDF.filter(~speedDF.filename.like(ts))

filterDF.createOrReplaceTempView("Sales")

In [6]:
# Now lets actually do some data processing. This is a very small example, it would normally be our full ETL process!
aggDF = spark.sql("SELECT productname ProductName, sum(salesamount) TotalSales FROM Sales GROUP BY productname")

In [7]:
# We'll be writing the final results to sqldw via blob storage, so we need some more credentials
UserName = dbutils.secrets.get(scope = "Lambda", key = "username")
Password = dbutils.secrets.get(scope = "Lambda", key = "password")
BlobKey = dbutils.secrets.get(scope = "Lambda", key = "blobkey")

# Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.[BLOB ACCOUNT NAME].blob.core.windows.net",
  BlobKey)

In [8]:
# Finally, we push the data down to SQLDW via polybase
aggDF.write \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://[SQLDW SERVER NAME].database.windows.net;database=[DATABASE NAME]") \
  .option("user", UserName)\
  .option("password", Password) \
  .option("tempDir", "wasbs://[BLOB CONTAINER]@[BLOB ACCOUNT].blob.core.windows.net/Files") \
  .option("forward_spark_azure_storage_credentials", "true") \
  .option("dbtable", "dbo.BatchLayer") \
  .mode("overwrite") \
  .save()