In [None]:
# get the file name from the Azure Data Factory
fileName = dbutils.widgets.get('fileName')
# fileName = 'ProductData.csv'


In [None]:
import pyspark.sql.functions as F
from pyspark.sql import types as T
#from datetime import datetme as dt

storageAccount='iot12colinstorageaccount'
storageContainers = [
    'input', 'staging', 'rejected'
]
stgAccountSASTokenKey = 'storage-sas'
databricksScopeName ='azurekeyvault'

In [None]:
# dbutils.fs.mounts()
# dbutils.fs.unmount('/mnt/input')

/mnt/input has been unmounted.


True

In [None]:
sasKey = dbutils.secrets.get(
    scope = databricksScopeName, 
    key= stgAccountSASTokenKey
)
for container in storageContainers :
    mountPoint = '/mnt/{}'.format(container)
    if any(mount.mountPoint == mountPoint for mount in dbutils.fs.mounts()):
        print('Mount point {} already mounted'.format(mountPoint))
        continue

    dbutils.fs.mount( 
        source = 'wasbs://{}@{}.blob.core.windows.net'.format(container, storageAccount), 
        mount_point= mountPoint, 
        extra_configs = {'fs.azure.sas.{}.{}.blob.core.windows.net'.format(container,storageAccount) : sasKey}
    )
    print('Mounted {} successfully'.format(mountPoint))


Mounted /mnt/input successfully
Mount point /mnt/staging already mounted
Mount point /mnt/rejected already mounted


In [None]:
df_formats = spark.createDataFrame([
    {'FileName': 'Product', 'ColumnName': 'StartDate', 'ColumnDateFormat': 'MM-dd-yyyy'},
    {'FileName': 'Product', 'ColumnName': 'EndDate', 'ColumnDateFormat': 'MM/dd/yyyy'},
    {'FileName': 'Product', 'ColumnName': 'CreateDate', 'ColumnDateFormat': 'MM/dd/yyyy'},
    {'FileName': 'Product', 'ColumnName': 'ModifiedDate', 'ColumnDateFormat': 'MM/dd/yyyy'}
])
# display(df_formats)

In [None]:
def get_csv_schema(*args):
    return T.StructType([
        T.StructField(*arg)
        for arg in args
    ])
def read_csv(fname, schema):
    return spark.read.csv(
        path=fname,
        header=True,
        schema=get_csv_schema(*schema)
    )

In [None]:
errorFlag=False
errorMessage = ''
# df1 = spark.read.csv('/mnt/input/'+fileName, inferSchema=True, header=True
df = read_csv('/mnt/input/'+fileName, schema = [
    ("ProductId", T.StringType(), False),
    ("Price", T.DecimalType(10,4), True),
    ("guid", T.StringType(), True),
    ("StartDate", T.StringType(), True),
    ("EndDate", T.StringType(), True),
    ("CreateDate", T.StringType(), True),
    ("ModifiedDate", T.StringType(), True)
])
#display(df1)

debug {}.test [REDACTED]


In [None]:
# Rule 1

totalcount = df.count()
print(totalcount)
distinctCount = df.distinct().count()
print(distinctCount)
if distinctCount !=totalcount:
    errorFlag = True
    errorMessage = 'Duplication Found. Rule 1 Failed'
print(errorMessage)

500
500



In [None]:
# Rule 2
for r in df_formats.collect():
    colName = r['ColumnName']
    colFormat =r['ColumnDateFormat']
    print(colName, colFormat)
    #display(df1.filter(F.to_date(colName, colFormat).isNull() ==True))
    formatCount = df.filter(F.to_date(colName, colFormat).isNotNull() ==True).count()
    if formatCount != totalcount:
        errorFlag = True
        errorMessage = errorMessage +' DateFormat is incorrect for {} '.format(colName)
    else:
        print('All rows are good for ', colName)
print(errorMessage)



StartDate yyyy-dd-MM
All rows are good for  StartDate
EndDate yyyy/dd/MM
All rows are good for  EndDate
CreateDate yyyy-dd-MM
All rows are good for  CreateDate
ModifiedDate yyyy-dd-MM
All rows are good for  ModifiedDate



In [None]:
if errorFlag:
    dbutils.fs.mv('/mnt/input/'+fileName,'/mnt/rejected/'+fileName )
    dbutils.notebook.exit('{"errorFlag": "true", "errorMessage":"'+errorMessage +'"}')
else:
    dbutils.fs.mv('/mnt/input/'+fileName,'/mnt/staging/'+fileName )
    dbutils.notebook.exit('{"errorFlag": "false", "errorMessage":"No error"}')

