In [None]:
from delta.tables import DeltaTable
from notebookutils import mssparkutils
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType, TimestampType, BooleanType, ShortType, DateType
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
import logging
import pandas as pd
import sys
import re
import json
import datetime
import pytz
import random
import io
import urllib.request

logger = logging.getLogger('OEA')

class OEA:
    """ OEA (Open Education Analytics) framework simplifies the process of working with large data sets within the context of a lakehouse architecture.

    """
    def __init__(self, storage_account='', salt='', timezone='US/Eastern', logging_level=logging.INFO):
        spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true") # more info here: https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/optimize-write-for-apache-spark
        if storage_account:
            self.storage_account = storage_account
        else:
            oea_id = mssparkutils.env.getWorkspaceName()[8:] # extracts the OEA id for this OEA instance from the synapse workspace name (based on OEA naming convention)
            self.storage_account = 'stoea' + oea_id # sets the name of the storage account based on OEA naming convention
            self.keyvault = 'kv-oea-' + oea_id
        self.keyvault_linked_service = 'LS_KeyVault_OEA'
        self.serverless_sql_endpoint = mssparkutils.env.getWorkspaceName() + '-ondemand.sql.azuresynapse.net'
        self._initialize_stage_paths()
        self._initialize_logger(logging_level)
        self.salt = salt
        self.timezone = timezone

        # todo: decide if this is needed (maybe it's something we should introduce again later)
        #self.framework_path = 'abfss://synapse-workspace@' + self.storage_account + '.dfs.core.windows.net/oea_framework'
        # Initialize framework db
        #spark.sql(f"CREATE DATABASE IF NOT EXISTS oea")
        #spark.sql(f"CREATE TABLE IF NOT EXISTS oea.env (name string not null, value string not null, description string) USING DELTA LOCATION '{self.framework_path}/db/env'")
        #df = spark.sql("select value from oea.env where name='storage_account'")
        #if df.first(): spark.sql(f"UPDATE oea.env set value='{self.storage_account}' where name='storage_account'")
        #else: spark.sql(f"INSERT INTO oea.env VALUES ('storage_account', '{self.storage_account}', 'The name of the data lake storage account for this OEA instance.')")
        #spark.sql(f"CREATE TABLE IF NOT EXISTS OEA.watermark (source string not null, entity string not null, watermark timestamp not null) USING DELTA LOCATION '{self.framework_path}/db/watermark'")

        logger.info("OEA initialized.")

    def _initialize_stage_paths(self):
        self.stage1 = 'abfss://stage1@' + self.storage_account + '.dfs.core.windows.net'
        self.stage2 = 'abfss://stage2@' + self.storage_account + '.dfs.core.windows.net'
        self.stage3 = 'abfss://stage3@' + self.storage_account + '.dfs.core.windows.net'

    def _initialize_logger(self, logging_level):
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        for handler in logging.getLogger().handlers:
            handler.setFormatter(formatter)           
        # Customize log level for all loggers
        logging.getLogger().setLevel(logging_level)        

    def use_workspace(self, workspace_name):
        """ Allows you to use OEA against your workspace
            (eg, you specify Jon as workspace_name, then instead of reading in from stage1 OEA will use workspace/Jon/stage1
        """
        self.stage1 = f'abfss://workspace@{self.storage_account}.dfs.core.windows.net/{workspace_name}/stage1'
        self.stage2 = f'abfss://workspace@{self.storage_account}.dfs.core.windows.net/{workspace_name}/stage2'
        self.stage3 = f'abfss://workspace@{self.storage_account}.dfs.core.windows.net/{workspace_name}/stage3'
        logger.info(f'Now using workspace: {workspace_name}')

    def stop_using_workspace(self): 
        """ Resets OEA to use the standard stage1, stage2, stage3 paths instead of the workspace lake (see use_workspace) """
        self._initialize_stage_paths()

    def to_url(self, path):
        """ Converts the given path into a valid url.
            eg, convert_path('stage1/contoso_sis/student') # returns abfss://stage1@storageaccount.dfs.core.windows.net/contoso_sis/student
            [Note that if "use_sandbox" has been invoked, the url returned will be something like abfss://dev@storageaccount.dfs.core.windows.net/sandbox1/stage1/contoso_sis/student]
        """
        if path.startswith('abfss://'): return path # if a url is given, just return that same url (allows to_url to be invoked just in case translation may be needed)
        path_args = path.split('/')
        stage = path_args.pop(0)
        if stage == 'stage1': stage = self.stage1
        elif stage == 'stage2': stage = self.stage2
        elif stage == 'stage3': stage = self.stage3
        else: raise ValueError("Path must begin with either 'stage1', 'stage2', or 'stage3'")
        url = f"{stage}/{'/'.join(path_args)}"
        logger.debug(f'to_url: {url}')
        return url

    def parse_path(self, path):
        """ Parses a path that looks like one of the following:
                stage1/Transactional/ms_insights/v0.1
                stage1/Transactional/ms_insights/v0.1/students
            (the path must either be the path to a specific entity, or the path to the parent folder containing entities)
            and returns a dictionary like one of the following:
                {'stage': 'stage1', 'stage_num': '1', 'category': 'Transactional', 'source_system': 'contoso_sis', 'entity': None, 'entity_list': ['studentattendance'], 'entity_path': None, 'entity_parent_path': 'stage1/Transactional/contoso_sis/v0.1'}
                {'stage': 'stage1', 'stage_num': '1', 'category': 'Transactional', 'source_system': 'contoso_sis', 'entity': 'studentattendance', 'entity_list': None, 'entity_path': 'stage1/Transactional/contoso_sis/v0.1/studentattendance', 'entity_parent_path': 'stage1/Transactional/contoso_sis/v0.1'}

            This method assumes the standard OEA data lake, in which paths have this structure: <stage number>/<category>/<source system>/<optional version and partitioning>/<entity>/<either batch_data folder or _delta_log>
        """
        if type(path) is dict: return path # this means the path was already parsed (allows this method to be called liberally)
        ar = path.split('/')
        path_dict = {'stage':ar[0], 'stage_num':ar[0][-1], 'category':ar[1], 'source_system':ar[2], 'entity':None, 'entity_list':None, 'entity_path':None, 'entity_parent_path':None}

        folders = self.get_folders(self.to_url(path))
        print(folders)
        # Identify an entity folder by the presence of the "_delta_log" folder in stage2 and stage3
        if (path_dict['stage_num'] == '1' and ('additive_batch_data' in folders[0] or 'delta_batch_data' in folders[0] or 'snapshot_batch_data' in folders[0])) or ((path_dict['stage_num'] == '2' or path_dict['stage_num'] == '3') and '_delta_log' in folders[0]):
            path_dict['entity'] = ar[-1]
            path_dict['entity_path'] = path
            path_dict['entity_parent_path'] = '/'.join(ar[0:-1])
        else:
            path_dict['entity_list'] = folders
            path_dict['entity_parent_path'] = path

        path_dict['between_path'] = '/'.join(path_dict['entity_parent_path'].split('/')[2:]) # strip off the first 2 args in the entity parent path (eg, stage1/Transactional)
        #print(path_dict)
        return path_dict

    def rm_if_exists(self, path, recursive_remove=True):
        """ Remove a folder if it exists (defaults to use of recursive removal). """
        try:
            mssparkutils.fs.rm(self.to_url(path), recursive_remove)
        except Exception as e:
            pass

    def ls(self, path):
        """ List the contents of the given path. """
        url = self.to_url(path)
        folders = []
        files = []
        try:
            items = mssparkutils.fs.ls(url)
            for item in items:
                if item.isFile:
                    files.append(item.name)
                elif item.isDir:
                    folders.append(item.name)
        except Exception as e:
            logger.warning("[OEA] Could not peform ls on specified path: " + path + "\nThis may be because the path does not exist.")
        return (folders, files)

    def path_exists(self, path):
        """ Returns true if path exists, false if it doesn't (no exception will be thrown). 
            eg, path_exists('stage1/mytest/v1.0')
        """
        try:
            items = mssparkutils.fs.ls(self.to_url(path))
        except Exception as e:
            # This Exception comes as a generic Py4JJavaError that occurs when the path specified is not found.
            return False
        return True

    def get_stage_num(self, path):
        m = re.match(r'.*stage(\d)/.*', path)
        if m:
            return m.group(1)
        else:
            raise ValueError("Path must begin with either 'stage1', 'stage2', or 'stage3'")

    def get_folders(self, path):
        """ Return the list of folders found in the given path. """
        dirs = []
        try:
            items = mssparkutils.fs.ls(self.to_url(path))
            for item in items:
                #print(item.name, item.isDir, item.isFile, item.path, item.size)
                if item.isDir:
                    dirs.append(item.name)
        except Exception as e:
            logger.warning("[OEA] Could not get list of folders in specified path: " + path + "\nThis may be because the path does not exist.")
        return dirs

    def get_latest_folder(self, path):
        """ Gets the last folder listed in the given path. """
        folders = self.get_folders(path)
        if len(folders) > 0: return folders[-1]
        else: return None

    def contains_batch_folder(self, path):
        for name in self.get_folders(self.to_url(path)):
            if name == 'additive_batch_data' or name == 'snapshot_batch_data' or name == 'delta_batch_data':
                return True
        return False

    def get_batch_info(self, source_path):
        """ Given a source data path, returns a tuple with the batch type (based on the name of the folder) and file type (based on a file extension) 
            eg, get_batch_info('stage1/Transactional/sis/v1.0/students') # returns ('snapshot', 'csv')
        """
        url = self.to_url(source_path)
        source_folder_name = self.get_latest_folder(url) #expects to find one of: additivie_batch_data, snapshot_batch_data, delta_batch_data
        batch_type = source_folder_name.split('_')[0]

        rundate_dir = self.get_latest_folder(f'{url}/{source_folder_name}')
        data_files = self.ls(f'{url}/{source_folder_name}/{rundate_dir}')[1]
        file_extension = data_files[0].split('.')[1]
        return batch_type, file_extension        

    def load(self, path):
        df = spark.read.format('delta').load(self.to_url(path))
        return df        

    def display(self, path, limit=4):
        df = spark.read.format('delta').load(self.to_url(path))
        display(df.limit(limit))
        return df

    def show(self, path, limit=4):
        df = spark.read.format('delta').load(self.to_url(path))
        df.show(limit)
        return df

    def fix_column_names(self, df):
        """ Fix column names to satisfy the Parquet naming requirements by substituting invalid characters with an underscore. """
        df_with_valid_column_names = df.select([F.col(col).alias(re.sub("[ ,;{}()\n\t=]+", "_", col)) for col in df.columns])
        return df_with_valid_column_names

    def to_spark_schema(self, schema):#: list[list[str]]):
        """ Creates a spark schema from a schema specified in the OEA schema format. 
            Example:
            schemas['Person'] = [['Id','string','hash'],
                                    ['CreateDate','timestamp','no-op'],
                                    ['LastModifiedDate','timestamp','no-op']]
            to_spark_schema(schemas['Person'])
        """
        fields = []
        for col_name, dtype, op in schema:
            fields.append(StructField(col_name, globals()[dtype.lower().capitalize() + "Type"](), True))
        spark_schema = StructType(fields)
        return spark_schema

    def get_text_from_url(self, url):
        """ Retrieves the text doc at the given url. 
            eg: get_text_from_url("https://raw.githubusercontent.com/microsoft/OpenEduAnalytics/modules/module_catalog/Student_and_School_Data_Systems/metadata.csv")
        """
        response = urllib.request.urlopen(url)
        str = response.read().decode('utf-8')  
        return str

    def parse_metadata_from_csv(self, csv_str):
        """ Parses out metadata from a csv string and returns the metadata dictionary. """
        metadata = {}
        current_entity = ''
        header = None
        for line in csv_str.splitlines():
            line = line.strip()
            # skip empty lines, lines that start with # (because these are comments), and lines with only commas (which is what happens if someone uses excel and leaves a row blank) 
            if len(line) == 0 or line.startswith('#') or re.match(r'^,+$', line): continue
            ar = line.split(',')

            if not header:
                header = []
                for column_name in ar:
                    header.append(re.sub("[ ,;{}()\n\t=]+", "_", column_name))
                continue
            
            # check for the start of a new entity definition
            if ar[0] != '':
                current_entity = ar[0]
                metadata[current_entity] = []
            # an attribute row must have an attribute name in the second column
            elif len(ar[1]) > 0:
                ar = ar[1:] # remove the first element because it will be blank
                metadata[current_entity].append(ar)
            else:
                logger.info('Invalid metadata row: ' + line)
        return metadata

    def write(self, data_str, destination_path_and_filename):
        """ Writes the given data string to a file on blob storage """
        destination_url = self.to_url(destination_path_and_filename)
        mssparkutils.fs.put(destination_url, data_str, True) # Set the last parameter as True to create the file if it does not exist

    def get_metadata_from_url(self, url):
        csv_str = self.get_text_from_url(url)
        metadata = self.parse_metadata_from_csv(csv_str)
        return metadata        

    def create_run_date(self, date_str=None):  
        """ Creates a datetime string in a format like 2022-09-30T14-51-02
            You can pass in a date_str like 2022-09-30 and it will return a datetime string.
            If no date_str is passed in, the the datetime string returned will be the current datetime for the default timezone.
        """  
        if not date_str: 
            rundate = datetime.datetime.now(pytz.timezone(self.timezone))    
        elif re.match(r'^\d\d\d\d-\d\d-\d\dT\d\d-\d\d-\d\d$', date_str.strip()):
            return date_str # return the string passed in since it was already formatted correctly (allows this method to be called liberally)
        else:
            rundate = datetime.datetime.strptime(date_str, '%Y-%m-%d')
        return rundate.strftime('%Y-%m-%dT%H-%M-%S') # Path names can't have a colon - https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/introduction.md#path-names

    def land_data(self, data, destination_path, destination_filename, rundate=None):
        """ Lands data in the given destination_path, adding a rundate folder.
        """
        rundate = self.create_run_date(rundate)
        destination_path = f'{destination_path}/rundate={rundate}/{destination_filename}'
        self.write(data, destination_path)
        return destination_path

    def land_data_from_url(self, url, destination_path, rundate=None):
        """ Pulls data from the given url and lands it in the specified destination path.
            eg, land_data_from_url('https://contoso.com/testdata/students.csv', 'stage1/Transactional/contoso_sis/v0.1/students')
        """
        filename = url.split('/')[-1] # assumes the last element in the url is the name of the file (eg, myfile.csv)
        data = self.get_text_from_url(url)
        destination_path = self.land_data(data, destination_path, filename, rundate)
        return destination_path

    def upsert(self, df, destination_path, primary_key='id'):
        """ Upserts the data in the given dataframe into the specified destination using the given primary_key_column to identify the updates.
            If there is no delta table found in the destination_path, one will be created.    
        """
        destination_url = self.to_url(destination_path)
        df = self.fix_column_names(df)
        if DeltaTable.isDeltaTable(spark, destination_url):
            delta_table_sink = DeltaTable.forPath(spark, destination_url)
            #delta_table_sink.alias('sink').option('mergeSchema', 'true').merge(df.alias('updates'), f'sink.{primary_key} = updates.{primary_key}').whenMatchedUpdateAll().whenNotMatchedInsertAll()
            delta_table_sink.alias('sink').merge(df.alias('updates'), f'sink.{primary_key} = updates.{primary_key}').whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
        else:
            logger.info('No existing delta table found. Creating delta table.')
            df.write.format('delta').save(destination_url)

    def overwrite(self, df, destination_path):
        """ Overwrites the existing delta table with the given dataframe.
            If there is no delta table found in the destination_path, one will be created.    
        """
        destination_url = self.to_url(destination_path)
        df = self.fix_column_names(df)
        df.write.format('delta').mode('overwrite').save(destination_url)  # https://docs.delta.io/latest/delta-batch.html#overwrite        

    def append(self, df, destination_path):
        """ Appends the given dataframe to the delta table in the specified destination.
            If there is no delta table found in the destination_path, one will be created.    
        """
        destination_url = self.to_url(destination_path)
        df = self.fix_column_names(df)
        if DeltaTable.isDeltaTable(spark, destination_url):
            df.write.format('delta').mode('append').save(destination_url)  # https://docs.delta.io/latest/delta-batch.html#append
        else:
            logger.info('No existing delta table found. Creating delta table.')
            df.write.format('delta').save(destination_url)

    def process(self, source_path, foreach_batch_function, format='delta'):
        """ This simplifies the process of using structured streaming when processing transformations.
            Provide a source_path and a function that receives a dataframe to work with (which will be a dataframe with data from the given source_path).
            Use it like this...
            def refine_contoso_dataset(df_source):
                metadata = oea.get_metadata_from_url('https://raw.githubusercontent.com/microsoft/OpenEduAnalytics/gene/v0.7dev/modules/module_catalog/Student_and_School_Data_Systems/metadata.csv')
                df_pseudo, df_lookup = oea.pseudonymize(df, metadata['studentattendance'])
                oea.upsert(df_pseudo, 'stage2/Refined/contoso_sis/v0.1/studentattendance/general')
                oea.upsert(df_lookup, 'stage2/Refined/contoso_sis/v0.1/studentattendance/sensitive')
            oea.process('stage2/Ingested/contoso_sis/v0.1/studentattendance', refine_contoso_dataset)             
        """
        def wrapped_function(df, batch_id):
            df.persist() # cache the df so it doesn't get read in multiple times when we write to multiple destinations. See: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch
            foreach_batch_function(df)
            df.unpersist()

        spark.sql("set spark.sql.streaming.schemaInference=true")
        if format == 'csv':
            #todo: add the option of specifying whether or not there's a header in the csv file
            streaming_df = spark.readStream.load(self.to_url(source_path), format=format, header='true')            
        else:
            streaming_df = spark.readStream.load(self.to_url(source_path), format=format)

        # for more info on append vs complete vs update modes for structured streaming: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#basic-concepts
        query = streaming_df.writeStream.outputMode("append").trigger(once=True).option("checkpointLocation", self.to_url(source_path) + '/_checkpoints').foreachBatch(wrapped_function).start()
        query.awaitTermination()   # block until query is terminated, with stop() or with error; A StreamingQueryException will be thrown if an exception occurs.
        logger.info(query.lastProgress)

    def ingest(self, source_path, primary_key='id'):
        """ Ingests all the entities in the given source_path, or ingests the data for the entity if given an entity path. 
            eg, ingest('stage1/Transactional/contoso_sis/v0.1') # ingests all entities found in that path
            eg, ingest('stage1/Transactional/contoso_sis/v0.1/studentattendance') # ingests the batch data for the single entity in this path
        """
        source_dict = self.parse_path(source_path)
        if source_dict['entity']:
            self._ingest_entity(source_dict, primary_key)
        else:
            for name in source_dict['entity_list']:
                self._ingest_entity(source_dict, primary_key)        

    def _ingest_entity(self, source_path, primary_key='id'):
        """ Performs the basic data ingestion - ingesting incoming batch data from stage1 into delta lake format in stage2. """
        source_dict = self.parse_path(source_path)
        destination_path = f'stage2/Ingested/{source_dict["between_path"]}/{source_dict["entity"]}'
        batch_type, source_data_format = self.get_batch_info(source_dict['entity_path'])
        source_url = self.to_url(f'{source_dict["entity_path"]}/{batch_type}_batch_data')

        if batch_type == 'snapshot': source_url = f'{source_url}/{self.get_latest_folder(source_url)}' 
            
        logger.info(f'Processing {batch_type} data from: {source_url} and writing out to: {destination_path}')
        if batch_type == 'snapshot':
            def batch_func(df): self.overwrite(df, destination_path)
        elif batch_type == 'additive':
            def batch_func(df): self.append(df, destination_path)
        elif batch_type == 'delta':
            def batch_func(df): self.upsert(df, destination_path, primary_key)
        else:
            raise ValueError("No valid batch folder was found at that path (expected to find a single folder with one of the following names: snapshot_batch_data, additive_batch_data, or delta_batch_data). Are you sure you have the right path?")                      
        
        self.process(source_url, batch_func, source_data_format)
        
        self.add_to_lake_db(destination_path)

    def load_csv(self, source_path, schema=None, has_header=True):
        """ Loads a csv file as a dataframe based on the path specified """
        if has_header: header_flag = 'true'
        else: header_flag = 'false'
        if schema:
            df = spark.read.load(self.to_url(source_path), format='csv', header=header_flag, schema=schema)
        else:
            df = spark.read.load(self.to_url(source_path), format='csv', header=header_flag)
        return df        

    def pseudonymize(self, df, metadata): #: list[list[str]]):
        """ Performs pseudonymization of the given dataframe based on the provided metadata (in the OEA format).
            For example, if the given df is for an entity called person, 
            2 dataframes will be returned, one called person that has hashed ids and masked fields, 
            and one called person_lookup that contains the original person_id, person_id_pseudo,
            and the non-masked values for columns marked to be masked.           
            The lookup table should be written to a "sensitive" folder in the data lake.
            eg, df_pseudo, df_lookup = oea.pseudonymize(df, metadata)
            [More info on this approach here: https://learn.microsoft.com/en-us/azure/databricks/security/privacy/gdpr-delta#pseudonymize-data]
        """
        df_pseudo = df
        df_lookup = df
        for col_name, dtype, op in metadata:
            if op == "hash-no-lookup" or op == "hnl":
                # This means that the lookup can be performed against a different table so no lookup is needed.
                df_pseudo = df_pseudo.withColumn(col_name, F.sha2(F.concat(F.col(col_name), F.lit(self.salt)), 256)).withColumnRenamed(col_name, col_name + "_pseudonym")
                df_lookup = df_lookup.drop(col_name)           
            elif op == "hash" or op == 'h':
                df_pseudo = df_pseudo.withColumn(col_name, F.sha2(F.concat(F.col(col_name), F.lit(self.salt)), 256)).withColumnRenamed(col_name, col_name + "_pseudonym")
                df_lookup = df_lookup.withColumn(col_name + "_pseudonym", F.sha2(F.concat(F.col(col_name), F.lit(self.salt)), 256))
            elif op == "mask" or op == 'm':
                df_pseudo = df_pseudo.withColumn(col_name, F.lit('*'))
            elif op == "partition-by":
                pass # make no changes for this column so that it will be in both dataframes and can be used for partitioning
            elif op == "no-op" or op == 'x':
                df_lookup = df_lookup.drop(col_name)
        return (df_pseudo, df_lookup)

    def add_to_lake_db(self, source_entity_path):
        """ Adds the given entity as a table (if the table doesn't already exist) to the proper lake db based on the path.
            This method will also create the lake db if it doesn't already exist.
            eg: add_to_lake_db('ldb_s2_, 'stage2/Ingested/contoso_sis/v0.1/students')

            Note that a spark db that points to source data in the delta format can't be queried via SQL serverless pool. More info here: https://docs.microsoft.com/en-us/azure/synapse-analytics/sql/resources-self-help-sql-on-demand#delta-lake
        """
        source_dict = self.parse_path(source_entity_path)
        db_name = f'ldb_s{source_dict["stage_num"]}_{source_dict["source_system"]}'
        spark.sql(f'CREATE DATABASE IF NOT EXISTS {db_name}')
        spark.sql(f"create table if not exists {db_name}.{source_dict['entity']} using DELTA location '{self.to_url(source_dict['entity_path'])}'")

    def drop_lake_db(self, db_name):
        spark.sql(f'DROP DATABASE IF EXISTS {db_name} CASCADE')
        result = "Database dropped: " + db_name
        logger.info(result)
        return result

    def create_sql_db(self, source_path):
        """ Prints out the sql script needed for creating a sql serverless db and set of views. """
        source_dict = self.parse_path(source_path)
        db_name = f's{source_dict["stage_num"]}_{source_dict["source_system"]}'
        cmd = '-- Create a new sql script then execute the following in it:\n'
        cmd += f"IF NOT EXISTS (SELECT * FROM sys.databases WHERE name = '{db_name}')\nBEGIN\n  CREATE DATABASE {db_name};\nEND;\nGO\n"
        cmd += f"USE {db_name};\nGO\n\n"
        cmd += self.create_sql_views(source_dict['entity_parent_path'])
        print(cmd)

    def create_sql_views(self, source_path):
        cmd = ''      
        dirs = self.get_folders(source_path)
        for table_name in dirs:
            cmd += f"CREATE OR ALTER VIEW {table_name} AS\n  SELECT * FROM OPENROWSET(BULK '{self.to_url(source_path)}/{table_name}', FORMAT='delta') AS [r];\nGO\n"
        return cmd 

    def drop_sql_db(self, db_name):
        cmd = '-- Create a new sql script then execute the following in it. Alternatively, you can click on the menu next to the SQL db and select "Delete"\n'
        cmd += '-- [Note that this does not affect the data in the data lake - this will only delete the sql db that points to that data.]\n\n'
        cmd += f'DROP DATABASE {db_name}'
        print(cmd)       

class DataLakeWriter:
    def __init__(self, root_destination):
        self.root_destination = root_destination

    def write(self, path_and_filename, data_str, format='csv'):
        mssparkutils.fs.append(f"{self.root_destination}/{path_and_filename}", data_str, True) # Set the last parameter as True to create the file if it does not exist
