### Mounting_processes

In [0]:
secrete_adls_key= dbutils.secrets.get("faizan-key-vault", "faizan-adls-store-key")

In [0]:
# Initialize the already_mounted variable
already_mounted = False

# Check if the mount point already exists
for x in dbutils.fs.mounts():
    if x.mountPoint == "/mnt/landing_zone":
        already_mounted = True
        break  # Exit the loop if the mount point is found
print("Already mounted:", already_mounted)

# Check if already mounted
if not already_mounted:
    # Define the storage account name and key
    storage_account_name = "faizanadlsstore"
    storage_account_key = secrete_adls_key
    container_name = "landing-zone"
    mount_point = "/mnt/landing_zone"

    # Set the configurations
    configs = {
        "fs.azure.account.key.{}.blob.core.windows.net".format(storage_account_name): storage_account_key
    }

    # Mount the storage
    dbutils.fs.mount(
        source = "wasbs://{}@{}.blob.core.windows.net/".format(container_name, storage_account_name),
        mount_point = mount_point,
        extra_configs = configs
    )
    already_mounted = True
    print("Mounting done successfully")
else:
    print("It is already mounted")


Already mounted: True
It is already mounted


In [0]:
# Initialize the already_mounted variable
already_mounted = False

# Check if the mount point already exists
for x in dbutils.fs.mounts():
    if x.mountPoint == "/mnt/lakehouse":
        already_mounted = True
        break  # Exit the loop if the mount point is found
print("Already mounted:", already_mounted)

# Check if already mounted
if not already_mounted:
    # Define the storage account name and key
    storage_account_name = "faizanadlsstore"
    storage_account_key = secrete_adls_key
    container_name = "lakehouse"
    mount_point = "/mnt/lakehouse"

    # Set the configurations
    configs = {
        "fs.azure.account.key.{}.blob.core.windows.net".format(storage_account_name): storage_account_key
    }

    # Mount the storage
    dbutils.fs.mount(
        source = "wasbs://{}@{}.blob.core.windows.net/".format(container_name, storage_account_name),
        mount_point = mount_point,
        extra_configs = configs
    )
    already_mounted = True
    print("Mounting done successfully")
else:
    print("It is already mounted")


Already mounted: True
It is already mounted


## drop table if exists

In [0]:

# Define the database name
database_name = "growth_lakehouse"

# Get the list of tables in the database
tables_df = spark.sql(f"SHOW TABLES IN {database_name}")

# Drop each table in the database
for row in tables_df.collect():
    table_name = row.tableName
    print(f"Dropping table: {table_name}")
    spark.sql(f"DROP TABLE IF EXISTS {database_name}.{table_name}")

# Drop the database
print(f"Dropping database: {database_name}")
spark.sql(f"DROP DATABASE IF EXISTS {database_name} CASCADE")


Dropping table: growth_marpho_gold
Dropping table: growth_morpho_silver
Dropping table: growth_yield_gold
Dropping table: iot_bronze
Dropping table: iot_gold
Dropping table: iot_silver
Dropping table: onprem_sql_bronze
Dropping table: phy_fst_bronze
Dropping table: phy_fst_gold
Dropping table: phy_fst_silver
Dropping table: phy_scnd_bronze
Dropping table: phy_scnd_gold
Dropping table: phy_scnd_silver
Dropping table: root_fst_bronze
Dropping table: root_fst_gold
Dropping table: root_fst_silver
Dropping table: root_scnd_bronze
Dropping table: root_scnd_gold
Dropping table: root_scnd_silver
Dropping table: yield_and_yield_attribute_silver
Dropping database: growth_lakehouse


DataFrame[]

## Create Database

In [0]:
# Define the database name
database_name = "growth_lakehouse"

# Check if the database exists
databases = spark.sql("SHOW DATABASES").collect()
database_exists = any(db.databaseName == database_name for db in databases)

# Create the database if it doesn't exist
if not database_exists:
    spark.sql(f"CREATE DATABASE {database_name}")
    print(f"Database '{database_name}' created.")
else:
    print(f"Database '{database_name}' already exists.")


Database 'growth_lakehouse' created.


## Create bronze layer folder

In [0]:
def create_directory_if_not_exists(directory_path):
    try:
        # Check if the directory exists by listing its contents
        dbutils.fs.ls(directory_path)
        print(f"Directory {directory_path} already exists.")
    except Exception as e:
        # If an exception is raised, it means the directory does not exist
        if 'java.io.FileNotFoundException' in str(e):
            print(f"Directory {directory_path} does not exist. Creating it.")
            dbutils.fs.mkdirs(directory_path)
            print("bronze layer folder created")
        else:
            # Raise the exception if it's a different error
            raise e

# Example usage
directory_path = "/mnt/lakehouse/bronze_layer"
create_directory_if_not_exists(directory_path)


Directory /mnt/lakehouse/bronze_layer does not exist. Creating it.
bronze layer folder created


## 1. create bronze layer tables

In [0]:
# 1. Bronze layer for IOT_DATA
# Read JSON data with inferred schema
iot_hub_df = spark.read.option("inferSchema", True).json("dbfs:/mnt/landing_zone/IOT-input/*")

# Repartition the DataFrame to 1 partition and write it as a Delta table
iot_hub_df.repartition(1).write.mode("overwrite").format("delta").option("path", "/mnt/lakehouse/bronze_layer/iot_hub").saveAsTable("growth_lakehouse.iot_bronze")

# 2. Bronze layer for onprem-sql data 
onprem_sql_df = spark.read.option("inferSchema", True).csv("dbfs:/mnt/landing_zone/onprem-sql-data/dbo.growth_parsed.txt", header=True)

# Repartition the DataFrame to 1 partition and write it as a Delta table
onprem_sql_df.repartition(1).write.mode("overwrite").format("delta").option("path", "/mnt/lakehouse/bronze_layer/onprem_sql").saveAsTable("growth_lakehouse.onprem_sql_bronze")

# 3. Bronze layer for public access data
phy_fst_df = spark.read.option("inferSchema", True).csv("dbfs:/mnt/landing_zone/public-data/phy_fst.csv", header=True)
phy_scnd_df = spark.read.option("inferSchema", True).csv("dbfs:/mnt/landing_zone/public-data/Phy_snd.csv", header=True)
root_fst_df = spark.read.option("inferSchema", True).csv("dbfs:/mnt/landing_zone/public-data/Root_fst.csv", header=True)
root_scnd_df = spark.read.option("inferSchema", True).csv("dbfs:/mnt/landing_zone/public-data/Root_scd.csv", header=True)

# Repartition the DataFrames to 1 partition and write them as Delta tables
phy_fst_df.repartition(1).write.mode("overwrite").format("delta").option("path", "/mnt/lakehouse/bronze_layer/public_access/phy_fst").saveAsTable("growth_lakehouse.phy_fst_bronze")
phy_scnd_df.repartition(1).write.mode("overwrite").format("delta").option("path", "/mnt/lakehouse/bronze_layer/public_access/phy_scnd").saveAsTable("growth_lakehouse.phy_scnd_bronze")
root_fst_df.repartition(1).write.mode("overwrite").format("delta").option("path", "/mnt/lakehouse/bronze_layer/public_access/root_fst").saveAsTable("growth_lakehouse.root_fst_bronze")
root_scnd_df.repartition(1).write.mode("overwrite").format("delta").option("path", "/mnt/lakehouse/bronze_layer/public_access/root_scnd").saveAsTable("growth_lakehouse.root_scnd_bronze")


## create silver layer folder 

In [0]:
def create_directory_if_not_exists(directory_path):
    try:
        # Check if the directory exists by listing its contents
        dbutils.fs.ls(directory_path)
        print(f"Directory {directory_path} already exists.")
    except Exception as e:
        # If an exception is raised, it means the directory does not exist
        if 'java.io.FileNotFoundException' in str(e):
            print(f"Directory {directory_path} does not exist. Creating it.")
            dbutils.fs.mkdirs(directory_path)
            print("silver_layer folder created")
        else:
            # Raise the exception if it's a different error
            raise e

# Example usage
directory_path = "/mnt/lakehouse/silver_layer"
create_directory_if_not_exists(directory_path)

Directory /mnt/lakehouse/silver_layer does not exist. Creating it.
silver_layer folder created


## required libraries

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import *
import re
import random
from pyspark.sql.window import Window

### silver_table IOT_Hub data

In [0]:

# Read the Delta table into a DataFrame with the provided schema
iot_df = spark.read.format("delta").load("dbfs:/mnt/lakehouse/bronze_layer/iot_hub/")

# Create or replace the temporary view
iot_df.createOrReplaceTempView("iot_df")

# Execute the SQL query
result_df = spark.sql("""
SELECT
    messageId,
    deviceId,
    temperature,
    humidity,
    soil_ph,
    soil_moisture,
    irrigation_requirement,
    co2_percentage,
    light_intensity,
    EventProcessedUtcTime,
    EventEnqueuedUtcTime
FROM iot_df
""")

# Function to convert camel case to snake case
def camel_to_snake(name):
    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()

# List of current column names
current_columns = result_df.columns

# Create a list of new column names
new_column_names = [camel_to_snake(c) for c in current_columns]

# Create a dictionary mapping old column names to new column names
rename_dict = dict(zip(current_columns, new_column_names))

# Apply the column renaming using selectExpr
renamed_df = result_df.selectExpr([f"`{old}` AS `{new}`" for old, new in rename_dict.items()])

# Convert date_format for relevant columns
for column_name in renamed_df.columns:
    if "date" in column_name or "time" in column_name:
        renamed_df = renamed_df.withColumn(
            column_name,
            date_format(
                from_utc_timestamp(
                    col(column_name).cast(TimestampType()), 
                    "UTC"
                ), 
                "yyyy-MM-dd"
            )
        )

# Create silver table
renamed_df.repartition(1).write.mode("overwrite").format("delta").option("path", "/mnt/lakehouse/silver_layer/iot_silver").saveAsTable("growth_lakehouse.iot_silver")


## silver tables of onprem_sql data

In [0]:

# Read the Delta table into a DataFrame with the provided schema
onprem_sql_silver_df = spark.read.format("delta").load("dbfs:/mnt/lakehouse/bronze_layer/onprem_sql/")
# ----------------------------------------------------------

# Function to convert camel case to snake case
def camel_to_snake(name):
    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()

# List of current column names
current_columns = onprem_sql_silver_df.columns

# Create a list of new column names
new_column_names = [camel_to_snake(c) for c in current_columns]

# Create a dictionary mapping old column names to new column names
rename_dict = dict(zip(current_columns, new_column_names))

# Apply the column renaming using selectExpr
renamed_df = onprem_sql_silver_df.selectExpr([f"`{old}` AS `{new}`" for old, new in rename_dict.items()])
# ----------------------------------------------------------

renamed_df.createOrReplaceTempView("growth_renamed")
#data_segregation
yield_and_yield_attribute_df = spark.sql(""" 
                                         SELECT 
                                        genotype_name, plant_height_30_dat_cm, plant_height_60_dat_cm, plant_height_90_dat_cm, 
nu_primery_branches__num, nu_fruits_per_plant, avg_fruit_weight_gram, days_flower_initiation_days, 
fruit_tss__brix, fruit_circumference_mm, fruit_length_cm, fruit_yield_per_plant__kg, yield_status
from growth_renamed
                                         """)

growth_morpho_df = spark.sql(""" 
                                         SELECT 
                               genotype_name, stem_anthocyanin, anthocyanin_intensity, stem_pubescence, stem_pubescence_intensity, 
leaf_blade_colour, leaf_blade_colour_intensity, colour_of_vein, intensity_of_colour8, spineon_leaf, 
flower_colour, fruiting_pattern, fruit_colour, intensityof_colour13, stripes, stripes_density, 
fruit_patches, spines_in_calyx, density_of_spininess, fruit_shape
from growth_renamed
                                         """)

# Rename columns in the DataFrame
growth_morpho_df = growth_morpho_df = growth_morpho_df \
    .withColumnRenamed("intensityof_colour13", "intensity_of_colour") \
    .withColumnRenamed("intensity_of_colour8", "intensity_of_colour_z")


# Create silver table
yield_and_yield_attribute_df.repartition(1).write.mode("overwrite").format("delta").option("path", "/mnt/lakehouse/silver_layer/yield_and_yield_attribute_silver").saveAsTable("growth_lakehouse.yield_and_yield_attribute_silver")

growth_morpho_df.repartition(1).write.mode("overwrite").format("delta").option("path", "/mnt/lakehouse/silver_layer/growth_morpho_silver").saveAsTable("growth_lakehouse.growth_morpho_silver")


## silver table for public access data

In [0]:


# Function to convert camel case to snake case
def camel_to_snake(name: str) -> str:
    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()

# Function to rename DataFrame columns from camel case to snake case
def rename_columns_to_snake_case(df: DataFrame) -> DataFrame:
    new_columns = [camel_to_snake(col) for col in df.columns]
    return df.toDF(*new_columns)

# Read DataFrames
phy_fst_silver_df = spark.read.format("delta").load("dbfs:/mnt/lakehouse/bronze_layer/public_access/phy_fst/")
phy_scnd_silver_df = spark.read.format("delta").load("dbfs:/mnt/lakehouse/bronze_layer/public_access/phy_scnd/")
root_fst_silver_df = spark.read.format("delta").load("dbfs:/mnt/lakehouse/bronze_layer/public_access/root_fst/")
root_scnd_silver_df = spark.read.format("delta").load("dbfs:/mnt/lakehouse/bronze_layer/public_access/root_scnd/")

# Rename columns to snake case
phy_fst_silver_df = rename_columns_to_snake_case(phy_fst_silver_df)
phy_scnd_silver_df = rename_columns_to_snake_case(phy_scnd_silver_df)
root_fst_silver_df = rename_columns_to_snake_case(root_fst_silver_df)
root_scnd_silver_df = rename_columns_to_snake_case(root_scnd_silver_df)


# ----------------------------------------------------------
# write back into silver tables
phy_fst_silver_df.repartition(1).write.mode("overwrite").format("delta").option("path", "/mnt/lakehouse/silver_layer/public_acess/phy_fst_silver").saveAsTable("growth_lakehouse.phy_fst_silver")
phy_scnd_silver_df.repartition(1).write.mode("overwrite").format("delta").option("path", "/mnt/lakehouse/silver_layer/public_acess/phy_scnd_silver").saveAsTable("growth_lakehouse.phy_scnd_silver")
root_fst_silver_df.repartition(1).write.mode("overwrite").format("delta").option("path", "/mnt/lakehouse/silver_layer/public_acess/root_fst_silver").saveAsTable("growth_lakehouse.root_fst_silver")
root_scnd_silver_df.repartition(1).write.mode("overwrite").format("delta").option("path", "/mnt/lakehouse/silver_layer/public_acess/root_scnd_silver").saveAsTable("growth_lakehouse.root_scnd_silver")


## creating gold layer

## create gold layer table

In [0]:
def create_directory_if_not_exists(directory_path):
    try:
        # Check if the directory exists by listing its contents
        dbutils.fs.ls(directory_path)
        print(f"Directory {directory_path} already exists.")
    except Exception as e:
        # If an exception is raised, it means the directory does not exist
        if 'java.io.FileNotFoundException' in str(e):
            print(f"Directory {directory_path} does not exist. Creating it.")
            dbutils.fs.mkdirs(directory_path)
            print("gold_layer folder created")
        else:
            # Raise the exception if it's a different error
            raise e

# Example usage
directory_path = "/mnt/lakehouse/gold_layer"
create_directory_if_not_exists(directory_path)

Directory /mnt/lakehouse/gold_layer does not exist. Creating it.
gold_layer folder created


## gold table for growth yield and yield attributes

In [0]:
growth_df = spark.read.format('delta').load("dbfs:/mnt/lakehouse/silver_layer/yield_and_yield_attribute_silver")

#-------------------------------------------------------
#Type casting
# List of columns to be rounded to two decimal places
columns_to_round = ['plant_height_30_dat_cm', 'plant_height_60_dat_cm', 'plant_height_90_dat_cm',
                    'avg_fruit_weight_gram', 'days_flower_initiation_days', 'fruit_tss__brix', 'nu_primery_branches__num', 'nu_fruits_per_plant',
                    'fruit_circumference_mm', 'fruit_length_cm', 'fruit_yield_per_plant__kg']

# Apply rounding to specified columns
for column in columns_to_round:
    growth_df = growth_df.withColumn(column, round(col(column), 2))

#-------------------------------------------------------
# adding polyhouse_number for reference as (randomly generated without any background verification)

# Function to generate random polyhouse numbers
def generate_random_polyhouse():
    return f"G{str(random.randint(1, 10)).zfill(2)}"

# Register UDF
generate_random_polyhouse_udf = udf(generate_random_polyhouse, StringType())

# Add new column with random polyhouse numbers
growth_df = growth_df.withColumn("polyhouse_number", generate_random_polyhouse_udf())

#-------------------------------------------------------
#  createing surrogated key 

# Define the window specification
window_spec = Window.orderBy("genotype_name")

# Add the SK1 column with sequential row numbers
growth_gold_df = growth_df.withColumn("SK1", row_number().over(window_spec))

#-------------------------------------------------------
#  write gold delta table
growth_gold_df.repartition(1).write.mode("overwrite").format("delta").option("path", "/mnt/lakehouse/gold_layer/growth_yield_gold").saveAsTable("growth_lakehouse.growth_yield_gold")



## gold table for growth marpho attributes

In [0]:
growth_marpho_df = spark.read.format("delta").load("dbfs:/mnt/lakehouse/silver_layer/growth_morpho_silver")

 #-------------------------------------------------------
#  create surrogated key
from pyspark.sql.window import Window

# Define the window specification
window_spec = Window.orderBy("genotype_name")

# Add the SK1 column with sequential row numbers
growth_marpho_gold_df = growth_marpho_df.withColumn("SK1", row_number().over(window_spec))

#-------------------------------------------------------
#  write gold delta table

growth_marpho_gold_df.repartition(1).write.mode("overwrite").format("delta").option("path", "/mnt/lakehouse/gold_layer/growth_marpho_gold").saveAsTable("growth_lakehouse.growth_marpho_gold")

## gold table for IOT_data

In [0]:
IOT_DATA_DF = spark.read.format("delta").load("dbfs:/mnt/lakehouse/silver_layer/iot_silver")

#-------------------------------------------------------
# type casting

# List of columns to round
columns = ['temperature', 'humidity']

# Iterate over each column and round its values to 2 decimal places
renamed_df = IOT_DATA_DF
for cola in columns:
    renamed_df = renamed_df.withColumn(cola, round(col(cola), 2))


# add polyhouse_number
# Add new column with random polyhouse numbers
IOT_gold_df = renamed_df.withColumn("polyhouse_number", generate_random_polyhouse_udf())

#-------------------------------------------------------

# write the gold table

IOT_gold_df.repartition(1).write.mode("overwrite").format("delta").option("path", "/mnt/lakehouse/gold_layer/IOT_gold").saveAsTable("growth_lakehouse.IOT_gold")


## gold table for physiological and root data

In [0]:
phy_fst_gold_df = spark.read.format("delta").load("dbfs:/mnt/lakehouse/silver_layer/public_acess/phy_fst_silver")
phy_scnd_gold_df = spark.read.format("delta").load("dbfs:/mnt/lakehouse/silver_layer/public_acess/phy_scnd_silver")
root_fst_gold_df = spark.read.format("delta").load("dbfs:/mnt/lakehouse/silver_layer/public_acess/root_fst_silver")
root_scnd_gold_df = spark.read.format("delta").load("dbfs:/mnt/lakehouse/silver_layer/public_acess/root_scnd_silver")
 #-------------------------------------------------------
#  create surrogated key
# Define the window specification
window_spec = Window.orderBy("genotype_name")

# Add the SK1 column with sequential row numbers
phy_fst_gold_df = phy_fst_gold_df.withColumn("SK1", row_number().over(window_spec))
phy_scnd_gold_df = phy_scnd_gold_df.withColumn("SK1", row_number().over(window_spec))
root_fst_gold_df = root_fst_gold_df.withColumn("SK1", row_number().over(window_spec))
root_scnd_gold_df = root_scnd_gold_df.withColumn("SK1", row_number().over(window_spec))

phy_fst_gold_df.repartition(1).write.mode("overwrite").format("delta").option("path", "/mnt/lakehouse/gold_layer/phy_fst_gold").saveAsTable("growth_lakehouse.phy_fst_gold")

phy_scnd_gold_df.repartition(1).write.mode("overwrite").format("delta").option("path", "/mnt/lakehouse/gold_layer/phy_scnd_gold").saveAsTable("growth_lakehouse.phy_scnd_gold")

root_fst_gold_df.repartition(1).write.mode("overwrite").format("delta").option("path", "/mnt/lakehouse/gold_layer/root_fst_gold").saveAsTable("growth_lakehouse.root_fst_gold")

root_scnd_gold_df.repartition(1).write.mode("overwrite").format("delta").option("path", "/mnt/lakehouse/gold_layer/root_scnd_gold").saveAsTable("growth_lakehouse.root_scnd_gold")