## 1. Mounting of ADLS GEN 2 Storage

In [None]:
# Initialize the already_mounted variable
already_mounted = False

# Check if the mount point already exists
for x in dbutils.fs.mounts():
    if x.mountPoint == "/mnt/sales":
        already_mounted = True
        break

# Print whether it's already mounted
print("Already mounted:", already_mounted)

# Check if already mounted
if not already_mounted:
    # Define the storage account name and key
    storage_account_name = "faizanstore"
    storage_account_key = secrete_adls_key
    container_name = "data-import"
    mount_point = "/mnt/sales"

    # Set the configurations
    configs = {
        "fs.azure.account.key.{}.blob.core.windows.net".format(storage_account_name): storage_account_key
    }

    # Mount the storage
    dbutils.fs.mount(
        source="wasbs://{}@{}.blob.core.windows.net/".format(container_name, storage_account_name),
        mount_point=mount_point,
        extra_configs=configs
    )
    already_mounted = True
    print("Mounting done successfully")
else:
    print("It is already mounted")

## 2. Create Database

In [None]:
# Define the database name
database_name = "growth_lakehouse"

# Check if the database exists
databases = spark.sql("SHOW DATABASES").collect()
database_exists = any(db.databaseName == database_name for db in databases)

# Create the database if it doesn't exist
if not database_exists:
    spark.sql(f"CREATE DATABASE {database_name}")
    print(f"Database '{database_name}' created.")
else:
    print(f"Database '{database_name}' already exists.")


## 3. Drop Database

In [None]:
# Define the database name
database_name = "growth_lakehouse"

# Get the list of tables in the database
tables_df = spark.sql(f"SHOW TABLES IN {database_name}")

# Drop each table in the database
for row in tables_df.collect():
    table_name = row['tableName']
    print(f"Dropping table: {table_name}")
    spark.sql(f"DROP TABLE IF EXISTS {database_name}.{table_name}")

# Drop the database
print(f"Dropping database: {database_name}")
spark.sql(f"DROP DATABASE IF EXISTS {database_name} CASCADE")

## 4. Create Folder

In [None]:
def create_directory_if_not_exists(directory_path):
    try:
        # Check if the directory exists by listing its contents
        dbutils.fs.ls(directory_path)
        print(f"Directory {directory_path} already exists.")
    except Exception as e:
        # If an exception is raised, it means the directory does not exist
        if 'java.io.FileNotFoundException' in str(e):
            print(f"Directory {directory_path} does not exist. Creating it.")
            dbutils.fs.mkdirs(directory_path)
            print("bronze layer folder created")
        else:
            # Raise the exception if it's a different error
            raise e

# Example usage
directory_path = "/mnt/lakehouse/bronze_layer"
create_directory_if_not_exists(directory_path)

## 5. Flatten the JSON File Format

In [None]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import *

def child_struct(nested_df):
    # Creating python set to store dataframe metadata
    list_schema = [((), nested_df)]
    
    # Creating empty python list for final flattened columns
    flat_columns = []

    # Looping until there are no more schemas to process
    while len(list_schema) > 0:
        # Removing the latest or recently added item (dataframe schema) and returning it into the df variable
        parents, df = list_schema.pop()
        
        # Creating columns for non-struct fields
        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes if c[1][:6] != "struct"
        ]
        
        # Identifying columns that are of struct type
        struct_cols = [c[0] for c in df.dtypes if c[1][:6] == "struct"]
        
        # Adding flat columns to the flat_columns list
        flat_columns.extend(flat_cols)
        
        # Reading nested columns and appending into the stack list
        for i in struct_cols:
            projected_df = df.select(i + ".*")
            list_schema.append((parents + (i,), projected_df))
    
    # Returning the flattened DataFrame with all columns
    return nested_df.select(flat_columns)



from pyspark.sql.functions import explode_outer

def master_array(df: DataFrame) -> DataFrame:
    # Get initial list of array columns
    array_cols = [c[0] for c in df.dtypes if c[1].startswith("array")]
    
    while len(array_cols) > 0:
        for c in array_cols:
            df = df.withColumn(c, explode_outer(col(c)))
        
        # Update the list of array columns after the explosion
        # Assume child_struct is a function that handles additional struct flattening
        df = child_struct(df)
        
        # Get the updated list of array columns
        array_cols = [c[0] for c in df.dtypes if c[1].startswith("array")]
    
    return df


final_output = master_array(df)
display(final_output)

## 6. Metadata backup

In [None]:
dbutils.fs.mkdirs("/temp/ddls/")

from multiprocessing.pool import ThreadPool

# Function to create metadata DDLs for all tables in a database
def create_metadata_DDL(database):
    # List all tables in the given database
    all_tables = spark.catalog.listTables(database)
    
    # Open a file to write the DDL statements
    with open("/temp/ddls/bkp_{}.sql".format(database), "w") as f:
        for t in all_tables:
            # Generate the DDL statement for each table
            ddls = spark.sql("SHOW CREATE TABLE {}.{};".format(database, t.name))
            # Write the DDL to the file
            f.write(ddls.first()[0])
            f.write(";\n")  # Add a semicolon and a newline after each DDL statement

# Get a list of all databases
DB_List = [db.databaseName for db in spark.sql("SHOW DATABASES").collect()]

# Use threading to create DDLs in parallel for each database
processes = ThreadPool(4)
processes.map(create_metadata_DDL, DB_List)

## 7. Capture File Name through ADF parameter

##### To get ADF pipeline parameter into databricks notebook
##### There is function with support to get filename through storage event trigger.
#### @triggerBody().fileName

In [None]:
filename = dbutils.widgets.get('filename')
df = spark.read.option("permissive", "true").option("badRecordsPath", "<file_path>/badRecords/{}".format(filename)).csv(f"<file_path>/{filename}", header=True)

## 8. Dynamic SQL Execution

In [None]:
table_name = "my_table"
filter_condition = "column1 = 'value'"

query = f"SELECT * FROM {table_name} WHERE {filter_condition}"
result_df = spark.sql(query)
result_df.show()

## 9. Integration with External Systems

In [None]:
import requests

response = requests.get("https://api.example.com/data")
if response.status_code == 200:
    print("Data fetched successfully")
    data = response.json()
    # Process the data
else:
    print("Failed to fetch data")

## 10. Dynamic Loading of Data

In [None]:
data_source = "s3://my-bucket/data/"

# Load different datasets based on some condition
if input_value == "dataset1":
    df = spark.read.csv(data_source + "dataset1.csv")
elif input_value == "dataset2":
    df = spark.read.parquet(data_source + "dataset2.parquet")

## 11. Dynamic Partitioning and Bucketing

In [None]:
partition_column = "date"
bucket_column = "user_id"

df.write.partitionBy(partition_column).bucketBy(10, bucket_column).saveAsTable("my_table")

## 12. Dynamic Configuration of Spark Settings

In [None]:
# Dynamically adjust shuffle partitions based on input size
input_size_gb = 50  # This could be calculated or passed as an argument
shuffle_partitions = max(10, input_size_gb * 2)

spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions)

## 13. Parameter-Driven Data Pipelines

In [None]:
dbutils.widgets.text("stage", "all", "Pipeline Stage")
stage = dbutils.widgets.get("stage")

if stage == "ingest" or stage == "all":
    # Ingest data
    print("Ingesting data...")
if stage == "transform" or stage == "all":
    # Transform data
    print("Transforming data...")
if stage == "load" or stage == "all":
    # Load data
    print("Loading data into the warehouse...")

## 14. Dynamic Error Handling and Logging

In [None]:
import logging

logging.basicConfig(level=logging.INFO)

try:
    df = spark.read.csv("data.csv")
    df.show()
except Exception as e:
    logging.error(f"Error processing data: {str(e)}")
    # Implement dynamic retry logic or alternative processing

## 15. Dynamic Notification and Alerts

In [None]:
from pyspark.sql.functions import col

if df.filter(col("error_flag") == True).count() > 0:
    # Trigger an alert
    dbutils.notebook.exit("Error detected in data processing")
else:
    print("Data processing started")