# Mount an Azure Blob Container as a directory in the linux file system (in the Worker node)
This has to be done once - the mounting is remembered between cluster restarts.

The mount point is in a distributed file system called DBUTILS -- a product of Databricks.

See the documentation at  https://docs.databricks.com/files/index.html  to understand the difference between this mount point and using the local storage of a cluster

This file is part of https://github.com/cnoam/iem_teachinglab.git in 'databricks' folder

In [None]:
storage_account = "ddscoursedatastorage"  
container = "dds-students"  

# Generate an "Access key" for the storage account. 
# Once used, ROTATE the key in the portal, making it void
secret = "base64 access key =="
mount_point   = "/mnt/{storage_account}/{container}".format(storage_account = storage_account, container = container)  

if mount_point not in [m.mountPoint for m in dbutils.fs.mounts()]:  
                dbutils.fs.mount(  
                   source        = "wasbs://{container}@{storage_account}.blob.core.windows.net/".format(container = container, storage_account = storage_account),  
                   mount_point   = mount_point,  
                   extra_configs = {"fs.azure.account.key.{storage_account}.blob.core.windows.net".format(storage_account = storage_account) : secret}  
                 )  
else:
    print(f"{mount_point} is already mounted")
  



/mnt/ddscoursedatastorage/dds-students is already mounted


In [None]:
dbutils.fs.ls("/mnt/ddscoursedatastorage/dds-students") 

Out[2]: [FileInfo(path='dbfs:/mnt/ddscoursedatastorage/dds-students/readme.md', name='readme.md', size=59, modificationTime=1684250570000),
 FileInfo(path='dbfs:/mnt/ddscoursedatastorage/dds-students/test.csv', name='test.csv', size=21, modificationTime=1684252823000)]

In [None]:
try: open("dbfs:/mnt/ddscoursedatastorage/dds-students/readme.md", "r")
except: print("calling open() will fail!")

calling open() will fail!


In [None]:
dbutils.fs.ls("/mnt/ddscoursedatastorage/fwm-stb-data/demographic") 

Out[5]: [FileInfo(path='dbfs:/mnt/ddscoursedatastorage/fwm-stb-data/demographic/SintecMedia.rpt_demodata.date_2015-12-31.2016-01-01.pd.gz', name='SintecMedia.rpt_demodata.date_2015-12-31.2016-01-01.pd.gz', size=9227110, modificationTime=1677512271000)]

In [None]:
# list the mount points
dbutils.fs.mounts()
#dbutils.fs.unmount('/mnt/ddscoursedatastorage/fwm-stb-data/')

Out[6]: [MountInfo(mountPoint='/databricks-datasets', source='databricks-datasets', encryptionType=''),
 MountInfo(mountPoint='/mnt/ddscoursedatastorage/dds-students', source='wasbs://dds-students@ddscoursedatastorage.blob.core.windows.net/', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-tracking', source='databricks/mlflow-tracking', encryptionType=''),
 MountInfo(mountPoint='/databricks-results', source='databricks-results', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-registry', source='databricks/mlflow-registry', encryptionType=''),
 MountInfo(mountPoint='/mnt/ddscoursedatastorage/fwm-stb-data', source='wasbs://fwm-stb-data@ddscoursedatastorage.blob.core.windows.net/', encryptionType=''),
 MountInfo(mountPoint='/', source='DatabricksRoot', encryptionType='')]

In [None]:
%fs
ls /mnt/ddscoursedatastorage/fwm-stb-data/refxml

path,name,size,modificationTime
dbfs:/mnt/ddscoursedatastorage/fwm-stb-data/refxml/.listing,.listing,41729,1677512270000
dbfs:/mnt/ddscoursedatastorage/fwm-stb-data/refxml/SintecMedia.rpt_refxml.date_2015-01-01.2016-11-21.xml.gz,SintecMedia.rpt_refxml.date_2015-01-01.2016-11-21.xml.gz,5465767,1677512265000
dbfs:/mnt/ddscoursedatastorage/fwm-stb-data/refxml/SintecMedia.rpt_refxml.date_2015-01-02.2016-11-21.xml.gz,SintecMedia.rpt_refxml.date_2015-01-02.2016-11-21.xml.gz,5465767,1677512266000
dbfs:/mnt/ddscoursedatastorage/fwm-stb-data/refxml/SintecMedia.rpt_refxml.date_2015-01-03.2016-11-21.xml.gz,SintecMedia.rpt_refxml.date_2015-01-03.2016-11-21.xml.gz,5465767,1677512264000
dbfs:/mnt/ddscoursedatastorage/fwm-stb-data/refxml/SintecMedia.rpt_refxml.date_2015-01-04.2016-11-21.xml.gz,SintecMedia.rpt_refxml.date_2015-01-04.2016-11-21.xml.gz,5465767,1677512260000
dbfs:/mnt/ddscoursedatastorage/fwm-stb-data/refxml/SintecMedia.rpt_refxml.date_2015-01-05.2016-11-21.xml.gz,SintecMedia.rpt_refxml.date_2015-01-05.2016-11-21.xml.gz,5465767,1677512253000
dbfs:/mnt/ddscoursedatastorage/fwm-stb-data/refxml/SintecMedia.rpt_refxml.date_2015-01-06.2016-11-21.xml.gz,SintecMedia.rpt_refxml.date_2015-01-06.2016-11-21.xml.gz,5465767,1677512260000
dbfs:/mnt/ddscoursedatastorage/fwm-stb-data/refxml/SintecMedia.rpt_refxml.date_2015-01-07.2016-11-21.xml.gz,SintecMedia.rpt_refxml.date_2015-01-07.2016-11-21.xml.gz,5465767,1677512266000
dbfs:/mnt/ddscoursedatastorage/fwm-stb-data/refxml/SintecMedia.rpt_refxml.date_2015-01-08.2016-11-21.xml.gz,SintecMedia.rpt_refxml.date_2015-01-08.2016-11-21.xml.gz,5465767,1677512264000
dbfs:/mnt/ddscoursedatastorage/fwm-stb-data/refxml/SintecMedia.rpt_refxml.date_2015-01-09.2016-11-21.xml.gz,SintecMedia.rpt_refxml.date_2015-01-09.2016-11-21.xml.gz,5465767,1677512260000


## Read XML file

An external library is needed to be installed in the cluster.

Follow the instructions in https://learn.microsoft.com/en-us/azure/databricks/libraries/cluster-libraries. <br>
The library name is `com.databricks:spark-xml_2.12:0.16.0`

and can be downloaded from `https://repo1.maven.org/maven2/com/databricks/spark-xml_2.12/0.16.0/spark-xml_2.12-0.16.0.jar`

or installed directly using the Maven coordinates

In [None]:
fname = "dbfs:/mnt/ddscoursedatastorage/fwm-stb-data/refxml/SintecMedia.rpt_refxml.date_2015-01-01.2016-11-21.xml.gz"
df = spark.read.format("xml").option("compression","gzip").option("rowTag", "mapping").load(fname)

In [None]:
df.show(5)

+------------+------+---------+-------------+---------------+------------+--------+
|  _device-id|  _dma|_dma-code|_household-id|_household-type|_system-type|_zipcode|
+------------+------+---------+-------------+---------------+------------+--------+
|00000008354e|Toledo|      547|      1522829|         FWM-ID|           H|   43434|
|0000009ef08e|Toledo|      547|      1522829|         FWM-ID|           H|   43434|
|0000037bb803|Toledo|      547|      1522829|         FWM-ID|           H|   43434|
|0000006610c1|Toledo|      547|      1438798|         FWM-ID|           H|   43460|
|00000066807b|Toledo|      547|      1438798|         FWM-ID|           H|   43460|
+------------+------+---------+-------------+---------------+------------+--------+
only showing top 5 rows



In [None]:
df.printSchema()

root
 |-- _device-id: string (nullable = true)
 |-- _dma: string (nullable = true)
 |-- _dma-code: long (nullable = true)
 |-- _household-id: long (nullable = true)
 |-- _household-type: string (nullable = true)
 |-- _system-type: string (nullable = true)
 |-- _zipcode: long (nullable = true)



In [None]:
from pyspark.sql.functions import window, column, desc, col
df.filter(col('_system-type') != 'H').show(4)

+------------+--------+---------+-------------+---------------+------------+--------+
|  _device-id|    _dma|_dma-code|_household-id|_household-type|_system-type|_zipcode|
+------------+--------+---------+-------------+---------------+------------+--------+
|000001610ea7|Bend, OR|      821|       376958|         FWM-ID|           T|   97701|
|000001e31e63|Bend, OR|      821|       376958|         FWM-ID|           T|   97701|
|000004351013|Bend, OR|      821|       376958|         FWM-ID|           T|   97701|
|000013fb006b|Bend, OR|      821|       376958|         FWM-ID|           T|   97701|
+------------+--------+---------+-------------+---------------+------------+--------+
only showing top 4 rows



In [None]:
df.count()

Out[11]: 577186

In [None]:
# how many records generated from each of the system types?
df.groupBy("_system-type").count().show()

+------------+------+
|_system-type| count|
+------------+------+
|           T|113082|
|       H,T,V|403803|
|         H,T|  4218|
|           H| 56083|
+------------+------+

