#Computing total storage size of a folder in Azure Data Lake with Pyspark

<div style="float:right"><img src="https://github.com/Alexkuva/The-Rougon-Macquart-project/blob/dev/Architecture/Azure/Data%20Lake/computing-total-storage/_img/compute-total-storage-architecture.png?raw=true" width="500"/></div>

## Datalake API's configuration

In [4]:
#Load libraries
#--you must load azure.datalake.store into your cluster first
from azure.datalake.store import core, lib

In [5]:
#Declare variables
directory_id = ""
application_key = ""
application_id = ""
adls_name = ""

In [6]:
#Connect to Azure
adls_credentials = lib.auth(tenant_id=directory_id, client_secret=application_key, client_id=application_id)
#Create the connection
adls_client = core.AzureDLFileSystem(adls_credentials, store_name=adls_name)

In [7]:
print("Configuration API ADLS :")
print(adls_name)
print(adls_client.listdir())

##Define functions

In [9]:
#Load libraries
sql("set spark.sql.execution.arrow.enabled true")
from pyspark.sql.functions import concat, col, lit, pandas_udf

In [10]:
#Total size for a path
def recursiveDirSize(path):
  total = 0
  dir_files = adls_client.listdir(path=path,detail=True)
  for file in dir_files:
    if file['type']=='DIRECTORY':
      total += recursiveDirSize(file['name'])
    else:
      total += file['length']
  return total
#UDF
udfRecursiveDirSize = udf(recursiveDirSize)

In [11]:
#Number of files for a path
def recursiveNbFile(path):
  total = 0
  dir_files = adls_client.listdir(path=path,detail=True)
  for file in dir_files:
    if file['type']=='DIRECTORY':
      total += recursiveNbFile(file['name'])
    else:
      total += 1
  return total
#UDF
udfrecursiveNbFile = udf(recursiveNbFile)

In [12]:
#Number of folders for a path
def recursiveNbFolder(path):
  total = 0
  dir_files = adls_client.listdir(path=path,detail=True)
  for file in dir_files:
    if file['type']=='DIRECTORY':
      total += 1
      total += recursiveNbFolder(file['name'])
  return total
#UDF
udfrecursiveNbFolder = udf(recursiveNbFolder)

## Load your dataframe

In [14]:
DataLakePath = ""
environment = ""

In [15]:
#Define schema
dfSchema = StructType([
  StructField("datasource", StringType(), True),
  StructField("path", StringType(), True),
  StructField("env", StringType(), True)
])

In [16]:
#Init dataframe
df = sqlContext.createDataFrame(sc.emptyRDD(), dfSchema)
if len(adls_client.listdir(path=DataLakePath, detail=True, invalidate_cache=True)) != 0:
  df = spark.createDataFrame(adls_client.listdir(path=DataLakePath, detail=True, invalidate_cache=True))
  df  = (df
    .withColumn('env',lit(environment))
    .select(col("pathSuffix").alias("datasource"),col("name").alias("path"),col("env"))
  )
display(df)

In [17]:
#Add storage information to the dataframe
(
df
   .withColumn('size',udfRecursiveDirSize(col('path')))
   .withColumn('nbFiles',udfrecursiveNbFile(col('path')))
   .withColumn('nbFolder',udfrecursiveNbFolder(col('path')))
)