In [34]:
import os
import re
import shutil #directory controls
import json
import yaml
import shutil
import fnmatch
import errno
import itertools

In [35]:
#set path variables

dbt_sources_file_path = "../../dbt/square_dbt/models/sources/" #dbt-sources-path
dbt_models_file_pathdbt_models_file_path = "../../dbt/square_dbt/models/" #dbt-sources-path
dbt_snapshots_file_path = "../../dbt/square_dbt/snapshots/" #dbt-sources-path

dataform_output_sources_path = "../_dataform_output/definitions/sources"
dataform_output_models_path = "../_dataform_output/definitions"

dataform_root_path = "../_dataform_output"
target_schema = 'DATAFORM_SQUARE'
dlh_timestamp_field = '"MD_ELT_UPDATED_DTS"'


In [36]:
#find files on directories and subdirectories
def find_files(directory, pattern):
    for root, dirs, files in os.walk(directory):
        for basename in files:
            if fnmatch.fnmatch(basename, pattern):
                filename = os.path.join(root, basename)
                yield filename

In [37]:
#YAML loader
def read_yaml_file(filename):
    with open(filename, 'r') as stream:
        try:
            yaml_dict = yaml.load(stream, Loader=yaml.FullLoader)
            return yaml_dict
        except yaml.YAMLError as exc:
            print(exc)

#YAML -> SQLx conerter
def create_sqlx_json_source_file(source_file_path):

#creating directory
    isExist = os.path.exists(dataform_output_sources_path)
    if not isExist:
        os.makedirs(dataform_output_sources_path)
        
    for file in source_file_path:
        #reading all YAML files on source directory
        dic = read_yaml_file(file)
        #getting only sources entity from yaml file
        sources = dic["sources"]
        
        for source in sources:
            #declaring variables for each element of dictionary that will be used. tables = list, schema and database = str
            tables = source["tables"]            
            database = source["database"]
            schema = source["schema"]
            
            #iterating through all tables for each schema
            for tables in tables:
                    #getting tables from list
                    tables= tables['name']                                
                    #creating json .sqlx structure
                    jmodel = "{\n"
                    jmodel += f'"type": "declaration",\n'                
                    jmodel += f'"database": "{database}",\n'
                    jmodel += f'"schema": "{schema}",\n'
                    jmodel += f'"name": "{tables}"'
                    jmodel += "\n}" #close out JSON file EOF

                    #parsing json and replacing double quotes for single quotes on keys
                    parsed = json.dumps(json.loads(jmodel), sort_keys=False, indent=2).replace('"type":','type:').replace('"database":','database:').replace('"schema":','schema:').replace('"name":','name:')

                    #creating .sqlx files
                    #write out to file in the appropriate location defined in variables
                    with open(f'{dataform_output_sources_path}/{schema}_{tables}.sqlx', "w") as jmodel_file:
                        #jmodel_file.write( script_header_template )
                        jmodel_file.write("config "+parsed)
                        jmodel_file.close()


In [38]:
def create_sqlx_models_files(model_files_path):
    for filename in model_files_path:  
        single_file_name = os.path.basename(filename)
        destination_path = os.path.dirname(filename)
        destination_path = dataform_output_models_path+(destination_path.split('models')[1])+'/'
        destination_full_path = (destination_path+(os.path.basename(filename)).replace("sql","sqlx"))
        
     #######copying files to new directory##############   
        try:
            #copy files if directory already exists
            
            shutil.copyfile(filename,destination_full_path)
        except IOError as e:
            # ENOENT(2): file does not exist, raised also on missing dest parent dir
            if e.errno != errno.ENOENT:
                raise
            # create directory if not exists
            
            os.makedirs(destination_path)
            # copy source files to new directory
            
            shutil.copyfile(filename,destination_full_path)
    #######end copying files to new directory##############

    #######reading file and replacing model syntax differences########
        # Read in the file
        with open(destination_full_path, 'r') as file :
            filedata = file.read()

            #getting header data to be replaced
            header=filedata[filedata.find("{{"):filedata.find("}}")+2]
            header_old=header

            header_replace_dict = {"{{":"", "}}":"}", "materialized":"type","=":":","(":"{",")":"","'":'"',"unique_key":'uniqueKey'}

            #iterate through dictionary keys
            for key in header_replace_dict.keys():
                #replacing all headers
                header = header.replace(key, header_replace_dict[key])         
                

        #writing file with replaced header and models references
        with open(destination_full_path, 'w') as file:
            #replacing files content
            new_file = filedata.replace(header_old,header)
            #removing transient parameter from header
            new_file = re.sub(r'(transient)[ ]{0,9}[\:][ ]{0,9}(true|false|True|False|TRUE|FALSE)','',new_file)
            #changing {{ref to ${ref
            new_file = re.sub(r"[{][{][ ]{0,6}(ref|REF|Ref|SOURCE|source|Source)[ ]{0,9}[(][ ]{0,9}[']?",'${ref("', new_file)
            
            #including just 1 space before config header
            new_file = re.sub(r'(config)[ ]{0,40}[{]{0,40}','config {',new_file)
            #closing ) with ") after sources and refs
            new_file = re.sub(r"(['][)][ ]{0,9}[}])",'")',new_file)
            #removing alone commmas on start of config block
            new_file = re.sub(r"({)\n[ ]{0,100}[,]",'{',new_file)
            #changing syntax reference to macros from {{ to ${
            new_file = re.sub(r"(?<!\')({{)[ ]{0,20}[.]{0,20}",'${',new_file)
            new_file = re.sub(r"([\$].+)[ ]{0,9}(?<=\()(')",r'\1"',new_file)
            #changing ' to " on sources references
            new_file = re.sub(r"([\$].+)((?<=[0-9A-Za-z])[ ]{0,9}[_'][.]{0,10}[ ]{0,20}[,][.]{0,20}[ ]{0,20}[.]{0,20}['])",r'\1","',new_file)
            #changing invocation_id to snowflake uuid_string
            new_file = re.sub(r"('{{invocation_id}}'|{{invocation_id}})",'uuid_string()',new_file)
            #changing incremental loads macro
            new_file = re.sub(r"({[ ]{0,10}%[ ]{0,10}if[ ]{0,10}is_incremental.*)((\s*)(.*)\s*)({%[ ]{0,10}endif..*})",r'${ when(incremental(), \n`\4`) }',new_file)
            new_file = re.sub(r"((?:\${[ ]{0,10}when.*(\s.*)))(\${this\}\})",r'\1${self()}',new_file)
            
            

            try:
                file.write(new_file)                
                print('Generated file: '+destination_full_path)
            except IOError as e:
                print ("I/O error({0}): {1}".format(e.errno, e.strerror))
    #######end reading file and replacing model syntax differences########
        


In [39]:
def create_sqlx_snapshot_files(snapshots_file_path):
    for filename in snapshots_file_path:  
        single_file_name = os.path.basename(filename)
        destination_path = os.path.dirname(filename)
        destination_path = dataform_output_models_path+'/snapshots'+(destination_path.split('snapshots')[1])+'/'
        destination_full_path = (destination_path+(os.path.basename(filename)).replace(".sql",".js"))
        
     #######copying files to new directory##############   
        try:
            #copy files if directory already exists
            
            shutil.copyfile(filename,destination_full_path)
        except IOError as e:
            # ENOENT(2): file does not exist, raised also on missing dest parent dir
            if e.errno != errno.ENOENT:
                raise
            # create directory if not exists
            
            os.makedirs(destination_path)
            # copy source files to new directory
            
            shutil.copyfile(filename,destination_full_path)
    #######end copying files to new directory##############
    
     
                    
        with open(destination_full_path, 'r') as file :
            filedata = file.read()   
            table = re.search(r"({{)[ ]{0,9}(ref).*((?<=').*(?='))",filedata).group(3)
            file_name = single_file_name.replace('.sql','')
            
            #print(table)
            filedata = re.sub(r'({%)[ ]{0,10}(endsnapshot|snapshot).*(})','',filedata)
            filedata = re.sub(r"(check_cols.*[\]]{1,1}|check_cols.*[\"]{1,1})",'timestamp: '+dlh_timestamp_field,filedata)            
            filedata = re.sub(r'(target_database.*,|target_schema.*,|strategy.*|invalidate_hard_deletes.*|check_cols.*)','',filedata)
            filedata=filedata[filedata.find("{"):filedata.find("}")+2]
            
            
            filedata = filedata.replace(')', 'source: {\n schema: "{{schema}}",\n name: "{{table}}",\n}, \n});')
            filedata = filedata.replace('{{schema}}',target_schema).replace('{{table}}',table)
            replace_dict = {"updated_at":"timestamp","unique_key":"  uniqueKey", "config":'scd("{{source_data_scd}}", ',"'":'"',"{{":"","}}":"","=":": "}

            #iterate through dictionary keys
            for key in replace_dict.keys():
                #replacing all headers
                filedata = filedata.replace(key, replace_dict[key]).replace('{{source_data_scd}}',file_name)
                filedata = re.sub(r'(scd\(.*)(\()',r'\1{',filedata)

                
        #writing file with replaced header and models references
        with open(destination_full_path, 'w') as file:
            #replacing files content
            new_file = filedata
            new_file = re.sub(r'(strategy)[ ]{0,9}[\:][ ]{0,9}.*(,)','',new_file)            
            new_file = re.sub(r"[ ]{2,999}",'',new_file)
            new_file = re.sub(r"^\s*$",'',new_file,re.MULTILINE)
            new_file = re.sub(r'\n\s*\n','\n',new_file,re.MULTILINE)            
            new_file = new_file.replace('uniqueKey:','  uniqueKey:').replace('timestamp:','  timestamp:').replace('source:','  source:').replace('schema:','   schema:').replace('name:','   name:').replace('},','  },')
            #print(new_file)
            try:
                file.write('const scd = require("dataform-scd");\n\n')
                file.write(new_file)
                print('Generated file: '+destination_full_path)
            except IOError as e:
                print ("I/O error({0}): {1}".format(e.errno, e.strerror))
    #######end reading file and replacing model syntax differences########

In [52]:
def dataform_install_configuration(dataform_root_path):
    
    try:
        shutil.rmtree(dataform_root_path)
    except OSError as err:
        print(err)

    
    os.system("dataform init snowflake "+dataform_root_path)
    shutil.copy("../.df-credentials.json",dataform_root_path)

    
    packages_file = dataform_root_path+ '/' +'package.json'
        
    with open(packages_file, 'r') as file:
        packages_file_data = file.read()
        
    #with open(packages_file, 'w') as file:        
        #packages_file_data = packages_file_data.replace('    }','    "    dataform-scd": "git+https://github.com/dataform-co/dataform-scd.git#0.1"\n    }')
        #try:                
        #    file.write(packages_file_data)                
        #except IOError as e:
        #    print ("I/O error({0}): {1}".format(e.errno, e.strerror))
                
    #adding scd package to package.json

In [53]:
def dataform_assertions_documentation(dbt_models_file_path,schema_files):

    for file in schema_files:
        #reading all YAML files on source directory
        dic = read_yaml_file(file)
        #getting only sources entity from yaml file
        models_dir = dic["models"]
        dictionary={}
        #print(models_dir)
        unique_list=[]
        not_null_list=[]
        not_null = {} 
        table_descriptions = {} 
        description={}
        unique = {}  
        tables_list=[]
        description_list=[]
        table_description_list=[]
        for model in models_dir:
            #declaring variables for each element of dictionary that will be used. tables = list, schema and database = str        
            #print(model["name"])

            columns = (model["columns"]) 
            tables = (model["name"])


            tables_list.append(tables)


            if 'description' in model: 
                descriptions = (model["description"])
                table_descriptions = {descriptions:tables}
                table_description_list.append(table_descriptions.copy())


            #print(dictionary)
            for column in columns:
                tests= (column.get('tests'))

                column_get = column.get('name')           
                descriptions= (column.get('description'))

                #print(tests)

                if 'unique' in tests:
                    unique = {column_get:tables}                                                        
                    unique_list.append(unique.copy())

                if 'not_null' in tests:
                    not_null = {column_get:tables}
                    not_null_list.append(not_null.copy())

                if 'description'in column:
                    description = {'|    '+column_get+': '+"'"+descriptions+"'":tables}
                    description_list.append(description.copy())





        #print(not_null_list)

        keyfunc = lambda d: next(iter(d.values()))   


        not_null_dict={k: [x for d in g for x in d] 
            for k, g in itertools.groupby(sorted(not_null_list, key=keyfunc), key=keyfunc)}

        unique_dict={k: [x for d in g for x in d] 
            for k, g in itertools.groupby(sorted(unique_list, key=keyfunc), key=keyfunc)}

        description_dict={k: [x for d in g for x in d] 
            for k, g in itertools.groupby(sorted(description_list, key=keyfunc), key=keyfunc)}

        table_description_dict={k: [x for d in g for x in d] 
                for k, g in itertools.groupby(sorted(table_description_list, key=keyfunc), key=keyfunc)}





        # list of all .sql files in a directory
        dataform_files = find_files(dataform_output_models_path, '*.sqlx')


        files_list =[]
        file_name_list =[]
        for file in dataform_files:

            if os.path.basename(file).replace('.sqlx','') in tables_list:
                file_name = os.path.basename(file)

                destination_path = os.path.dirname(file)

                full_path = destination_path+'/'+file_name     


                files_list.append(file_name.replace('.sqlx',''))



                with open(full_path, 'r') as file :

                    file_name = file_name.replace('.sqlx','')

                    if file_name in table_description_dict.keys():
                        jmodel_tables_description='  description: '+str(table_description_dict[file_name]).replace("'",'"').replace('[','').replace(']','')+',\n'



                    filedata = file.read()
                    if file_name in unique_dict.keys() or file_name in unique_dict.keys():
                        jmodel='  assertions: {\n'


                        #print(file)
                        if file_name in unique_dict.keys():                    
                            jmodel+='    uniqueKey: '+str(unique_dict[file_name]).replace("'",'"')+',\n'

                        if file_name.replace('.sqlx','') in not_null_dict.keys():
                            jmodel+='    nonNull: '+str(not_null_dict[file_name]).replace("'",'"')+'\n'

                        jmodel+='\n  }\n}'
                        #print(filedata)



                    if file_name.replace('.sqlx','') in description_dict.keys():         
                        jmodel_description = '  columns: {\n'
                        jmodel_description += str(description_dict[file_name]).replace('"','').replace("'",'"').replace('[','').replace(']','').replace('|','\n')
                        jmodel_description += '\n  }\n'


                    header=filedata[filedata.find("{"):filedata.find("}")+2]                
                    header_old=header
                    header = header.replace('}','')+jmodel_tables_description+jmodel_description+jmodel


                    with open(f'{full_path}', "w") as jmodel_file:

                            jmodel_file.write(filedata.replace(header_old,header))


In [54]:
# list of all .sql files in a directory
model_files = find_files(dbt_models_file_path, '*.sql')
# list of all .yaml files in a directory
source_files = find_files(dbt_sources_file_path, '*.yml')
# list of all .sql files in a directory
snapshot_files = find_files(dbt_snapshots_file_path, '*.sql')
# list of test and documentation files
schema_files = find_files(dbt_models_file_path, 'schema.yml')

def dbt_dataform_converter(dataform_root_path,source_files,model_files,snapshot_files,schema_files):
    
    
    #installing dataform
    dataform_install_configuration(dataform_root_path)    
    #creating source
    create_sqlx_json_source_file(source_files)
    #creating models
    create_sqlx_models_files(model_files)
    #creating snapshot files
    create_sqlx_snapshot_files(snapshot_files)
    #addind test to models
    dataform_assertions_documentation(dbt_models_file_path,schema_files)

    
    

In [55]:
dbt_dataform_converter(dataform_root_path,source_files,model_files,snapshot_files,schema_files)

Writing project files...

[32mDirectories successfully created:[0m
  /Users/guilhermealcantara/OneDrive/brf consulting/dbt - dataform converter/_dataform_output
  /Users/guilhermealcantara/OneDrive/brf consulting/dbt - dataform converter/_dataform_output/definitions
  /Users/guilhermealcantara/OneDrive/brf consulting/dbt - dataform converter/_dataform_output/includes
[32mFiles successfully written:[0m
  /Users/guilhermealcantara/OneDrive/brf consulting/dbt - dataform converter/_dataform_output/dataform.json
  /Users/guilhermealcantara/OneDrive/brf consulting/dbt - dataform converter/_dataform_output/package.json
  /Users/guilhermealcantara/OneDrive/brf consulting/dbt - dataform converter/_dataform_output/.gitignore
[32mNPM packages successfully installed.[0m
Generated file: ../_dataform_output/definitions/staging/CUSTOMERS/V_CUSTOMER_STG.sqlx
Generated file: ../_dataform_output/definitions/staging/LOCATION/V_MERCHANT_LOCATION_STG.sqlx
Generated file: ../_dataform_output/definitio