## Access Data Lake Gen2 Through Access Key & SAS Token

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame

class SparkDataLoader:

    """
    A class to handle configuration and data operations for Azure Storage Account.

    Parameters:
        spark (SparkSession): The SparkSession object.
        storage_account_name (str): The name of the Azure Storage Account.
        token_type (str): The type of token ('access' for Access Key or 'sas' for SAS Token).
        token (str): The Access Key or SAS Token for authentication.
    """
    def __init__(self, spark: SparkSession, storage_account_name: str, token_type: str, token: str):
        self.spark = spark
        self.storage_account_name = storage_account_name
        self.token_type = token_type
        self.token = token

    def set_config(self):

        """
        Sets the configuration for Azure Storage Account based on the provided token type and token.

        Raises:
            ValueError: If the token_type is invalid. It should be either 'access' or 'sas'.
        """
        if self.token_type == "access":
            fs_key = f"fs.azure.account.key.{self.storage_account_name}.dfs.core.windows.net"
            self.spark.conf.set(fs_key, self.token)
        elif self.token_type == "sas":
            fs_auth_type_key = f"fs.azure.account.auth.type.{self.storage_account_name}.dfs.core.windows.net"
            fs_sas_provider_key = f"fs.azure.sas.token.provider.type.{self.storage_account_name}.dfs.core.windows.net"
            fs_sas_token_key = f"fs.azure.sas.fixed.token.{self.storage_account_name}.dfs.core.windows.net"
            
            self.spark.conf.set(fs_auth_type_key, "SAS")
            self.spark.conf.set(fs_sas_provider_key, "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
            self.spark.conf.set(fs_sas_token_key, self.token)
        else:
            raise ValueError("Invalid token_type. Use 'access' or 'sas'.")

    def list_files(self, path: str):

        """
        Lists the files in the specified path.

        Parameters:
            path (str): The path to the Azure Storage Account container or directory.

        Returns:
            List: A list of file paths.
        """

        return dbutils.fs.ls(path)

    def display_files(self, path: str):
        
        """
        Displays the files in the specified path.

        Parameters:
            path (str): The path to the Azure Storage Account container or directory.
        """

        file_list = self.list_files(path)
        for file in file_list:
            print(file.path)

    def read_csv(self, path: str, header: bool = True) -> DataFrame:

        """
        Reads a CSV file from the specified path.

        Parameters:
            path (str): The path to the CSV file in Azure Storage Account.
            header (bool, optional): Whether the CSV file has a header row. Defaults to True.

        Returns:
            DataFrame: The DataFrame containing the CSV data.
        """

        return self.spark.read.csv(path, header=header)


In [0]:
spark = SparkSession.builder.appName("AccessDataLakeGen2").getOrCreate()
storage_account_name = "xxxxxxxxxxxxxxx"
file_name = 'xxxxxxxxxxxxxxxxxxx.csv'

## Access Data Lake Gen2 Through Access Key

In [0]:
class AccessTokenHandler:

    """
    A class to handle Access Token configuration and data operations.

    Parameters:
        spark (SparkSession): The SparkSession object.
        storage_account_name (str): The name of the Azure Storage Account.
        access_key (str): The Access Key for authentication.
    """
    
    def __init__(self, spark: SparkSession, storage_account_name: str, access_key: str):
        self.data_handler = SparkDataLoader(spark, storage_account_name, "access", access_key)

    def execute(self):

        """
        Executes the Access Token configuration and performs data operations.
        """

        self.data_handler.set_config()
        df_access = self.data_handler.read_csv(f"abfss://demo@encstorageacc97.dfs.core.windows.net/{file_name}", header=True)
        df_access.show()

# Access Token Configuration
access_key = dbutils.secrets.get(scope="EncSecretScope", key="EncStorageAccessKey")
access_token_handler = AccessTokenHandler(spark, storage_account_name, access_key)
display(access_token_handler.execute())


+------+--------------+----------+----------+--------------+-----------+------------------+-----------+-------------+---------------+--------------+-----------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+
|Row ID|      Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|     Customer Name|    Segment|      Country|           City|         State|Postal Code| Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|Quantity|Discount|  Profit|
+------+--------------+----------+----------+--------------+-----------+------------------+-----------+-------------+---------------+--------------+-----------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+
|     1|CA-2016-152156|08-11-2016|11-11-2016|  Second Class|   CG-12520|       Claire Gute|   Consumer|United States|      Henderson|      Kentucky|      42420|  South|FUR-BO-10001798|   

## Access Data Lake Gen2 Through SAS Token

In [0]:

class SASTokenHandler:

    """
    A class to handle SAS Token configuration and data operations.

    Parameters:
        spark (SparkSession): The SparkSession object.
        storage_account_name (str): The name of the Azure Storage Account.
        sas_token (str): The SAS Token for authentication.
    """

    def __init__(self, spark: SparkSession, storage_account_name: str, sas_token: str):
        self.data_handler = SparkDataLoader(spark, storage_account_name, "sas", sas_token)

    def execute(self):
        """
        Executes the SAS Token configuration and performs data operations.
        """
        self.data_handler.set_config()
        df_sas = self.data_handler.read_csv(f"abfss://demo@encstorageacc97.dfs.core.windows.net/{file_name}", header=True)
        df_sas.show()


# SAS Token Configuration
sas_token = dbutils.secrets.get(scope="EncSecretScope", key="EncStorageAccessSASToken")
sas_token_handler = SASTokenHandler(spark, storage_account_name, sas_token)
sas_token_handler.execute()

+------+--------------+----------+----------+--------------+-----------+------------------+-----------+-------------+---------------+--------------+-----------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+
|Row ID|      Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|     Customer Name|    Segment|      Country|           City|         State|Postal Code| Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|Quantity|Discount|  Profit|
+------+--------------+----------+----------+--------------+-----------+------------------+-----------+-------------+---------------+--------------+-----------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+
|     1|CA-2016-152156|08-11-2016|11-11-2016|  Second Class|   CG-12520|       Claire Gute|   Consumer|United States|      Henderson|      Kentucky|      42420|  South|FUR-BO-10001798|   