Skip to content
Permalink
Browse files

initial commit

  • Loading branch information...
Simon D'Morias Simon D'Morias
Simon D'Morias authored and Simon D'Morias committed Nov 30, 2018
1 parent 36e4134 commit 1465952971f29de6c4690948fd021146bdf2bef2
Showing with 125 additions and 0 deletions.
  1. +1 −0 .gitignore
  2. +28 −0 CreateJobExample.ps1
  3. +4 −0 DataLake/Test1.csv
  4. +20 −0 Jobs/Job1.py
  5. +7 −0 Utils/helpersfunctions.py
  6. +10 −0 build.ps1
  7. +19 −0 deploy.ps1
  8. +36 −0 main.py
@@ -328,3 +328,4 @@ ASALocalRun/

# MFractors (Xamarin productivity tool) working folder
.mfractor/
MyBearerToken.txt
@@ -0,0 +1,28 @@
Set-Location $PSScriptRoot
if (!(Get-Module -ListAvailable -Name azure.databricks.cicd.Tools)) {
Install-Module azure.databricks.cicd.Tools -Force -Scope CurrentUser
}
Import-Module -Name azure.databricks.cicd.Tools
$BearerToken = Get-Content "MyBearerToken.txt" # Create this file in this folder with just your bearer token in
$Region = "westeurope"

$JobName = "MyApplication-Test-PythonJob"
$SparkVersion = "4.1.x-scala2.11"
$NodeType = "Standard_D3_v2"
$MinNumberOfWorkers = 1
$MaxNumberOfWorkers = 1
$Timeout = 1000
$MaxRetries = 1
$ScheduleCronExpression = "0 15 22 ? * *"
$Timezone = "UTC"
$PythonPath = "dbfs:/MyApplication/Code/Main.py"
$PythonParameters = "Job1.MyMethod", "2018/11/30"

Add-DatabricksPythonJob -BearerToken $BearerToken -Region $Region -JobName $JobName `
-SparkVersion $SparkVersion -NodeType $NodeType `
-MinNumberOfWorkers $MinNumberOfWorkers -MaxNumberOfWorkers $MaxNumberOfWorkers `
-Timeout $Timeout -MaxRetries $MaxRetries `
-ScheduleCronExpression $ScheduleCronExpression `
-Timezone $Timezone -PythonPath $PythonPath `
-PythonParameters $PythonParameters `
-PythonVersion 3
@@ -0,0 +1,4 @@
col1,col2,col3
1,2,3
4,5,6
7,8,9
@@ -0,0 +1,20 @@
import os


from helpersfunctions import lookupDimensionKey
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()


def MyMethod(slot='2018/01/01'):
print(slot)
filePath = os.path.join(spark.conf.get("ADLS"), 'Test1.csv')
df = spark.read.format('csv').options(header='true', inferSchema=True).load(filePath)
df = lookupDimensionKey(df)
df.show()
return


if __name__ == "__main__":
MyMethod()
@@ -0,0 +1,7 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
import os
spark = SparkSession.builder.getOrCreate()

def lookupDimensionKey(df):
return df.withColumn("DimensionSKey", lit(1))
@@ -0,0 +1,10 @@
Set-Location $PSScriptRoot
Remove-Item ./bin -Recurse -Force -ErrorAction:SilentlyContinue
New-Item ./bin -ItemType Directory -Force | Out-Null
Copy-Item "Jobs/*.py" ./bin
Copy-Item "Utils/*.py" ./bin
$source = Resolve-Path ./bin/*.py
$ZipFilePath = "./bin/scripts"
Compress-Archive -LiteralPath $source -DestinationPath $ZipFilePath
Remove-Item ./bin/*.py -Force
Copy-Item "./main.py" ./bin
@@ -0,0 +1,19 @@
Set-Location $PSScriptRoot
./build.ps1 -environment $environment

if (!(Get-Module -ListAvailable -Name azure.databricks.cicd.Tools)) {
Install-Module azure.databricks.cicd.Tools -Force -Scope CurrentUser
}
Import-Module -Name azure.databricks.cicd.Tools

$BearerToken = Get-Content -Path ./MyBearerToken.txt -Raw # Create this file with your bearer token and add to gitignore
$Region = "westeurope"
$localBinfolder = Join-Path $PSScriptRoot "/bin/"
$TargetDBFSFolderCode = "/MyApplication/Code"

# Clean Target Folder
Remove-DatabricksDBFSItem -BearerToken $BearerToken -Region $Region -Path $TargetDBFSFolderCode

# Upload files to DBFS
Add-DatabricksDBFSFile -BearerToken $BearerToken -Region $Region -LocalRootFolder $localBinfolder -FilePattern "main.py" -TargetLocation $TargetDBFSFolderCode -Verbose
Add-DatabricksDBFSFile -BearerToken $BearerToken -Region $Region -LocalRootFolder $localBinfolder -FilePattern "*.zip" -TargetLocation $TargetDBFSFolderCode -Verbose
36 main.py
@@ -0,0 +1,36 @@

import os
import sys
import argparse
from importlib import import_module
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Load configuration
parser = argparse.ArgumentParser()
parser.add_argument("job", type=str, nargs='?', default="Job1.MyMethod")
parser.add_argument("slot", type=str, nargs='?', default="2018/11/19")
args = parser.parse_args()
job_module, job_method = args.job.rsplit('.',1)
slot = args.slot


if "local" in spark.sparkContext.master:
dirname = os.path.dirname(__file__)
sys.path.insert(0, (os.path.join(dirname, 'Utils')))
sys.path.insert(0, (os.path.join(dirname, 'Jobs')))
spark.conf.set("ADLS",os.path.join(dirname, 'DataLake'))
else:
spark.sparkContext.addPyFile("dbfs:/MyApplication/Code/scripts.zip")
spark.conf.set("ADLS",'adl://myazuredatalake.azuredatalakestore.net/')
spark.conf.set("dfs.adls.oauth2.access.token.provider.type", "ClientCredential")
spark.conf.set("dfs.adls.oauth2.client.id", dbutils.secrets.get(scope = "SparkADLS - Secrets", key = "clientid"))
spark.conf.set("dfs.adls.oauth2.credential", dbutils.secrets.get(scope = "SparkADLS - Secrets", key = "credential"))
spark.conf.set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com/[tenantid]/oauth2/token")


# Execute Job
mod = import_module(job_module)
met = getattr(mod, job_method)
met(slot)

0 comments on commit 1465952

Please sign in to comment.
You can’t perform that action at this time.