# Dummy data generator for Lake Databases, generated and managed from the Synapse workspace modeller
## Attention: As designed, the data existing in that database folders will be **deleted**

## Prerequisites
A Synapse Workspace with an **existing Lake Database**. Creating that database from the lake database templates is a simple manner to generate a good starting point for a dummy environment. 

A Spark pool with the **faker python library**
 installed (simplest method is to have a **requirements.txt** with a line of text with the library name “faker” loaded in the cluster configuration). For detailed information https://docs.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-manage-python-packages#install-python-packages


## Configuration
You must introduce the name of the lake database in the notebook first cell. Then we can execute the cells of the notebook. The first code cell contains configuration and libraries. The second code cell contains the python class definition, the creation of one instance of the generator and its execution

In [None]:
import random, string
import datetime as dtime
import re   
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
import pandas as pd
import requests, json
import faker
import math

# Please fill the name of the database in the following line
database = 'mydatabase'
tables = spark.sql('show tables in ' + database)
print("Tables number: " + str(tables.count()))

pd.set_option('display.max_rows', 20)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', None)
seed = math.floor(time.time())%20000

In [None]:
# Here onwards we define a class with everything needed to populate lake DB.
# Last 3 lines of the cell creates an instance of the class and executes the generation for the whole database.
class templateDataGenerator():
    databasename = ""
    def __init__(self,database):
        self.databasename = database
        self.enrichTablesData()
        return
    def enrichTablesData(self):
        entries = []
        baseurl="https://"+mssparkutils.env.getWorkspaceName()+".dev.azuresynapse.net"
        headers = {"Authorization": "Bearer "+mssparkutils.credentials.getToken("Synapse")}
        constructed_url = baseurl + "/databases/"+database+"/tables?api-version=2021-04-01"
        request = requests.get(constructed_url, headers=headers)
        response = request.json()
        results = []
        for item in response['items']:
            if item['type'] == 'TABLE':
                tdict = {}
                tdict['tableName'] = item['name']
                tdict['database'] = database
                columns=[]
                for column in item['properties']['StorageDescriptor']['Columns']:
                    cdict = {}                   
                    cdict['colname'] = column['Name']
                    cdatatype=column['OriginDataTypeName']
                    cdict['type']=cdatatype['TypeName']
                    cdict['length'] = cdatatype['Length']
                    columns.append(cdict)
                tdict['columns'] = columns
                results.append(tdict)
        self.schemaDic = results
        for table in self.schemaDic:
            tname = table['tableName']
            if tname.__contains__('Transaction') or tname.__contains__('Order'):
                sizeorder = 100
            elif tname.__contains__('Type') or tname.__contains__('Method') or tname.__contains__('Channel') or tname.__contains__('Class') :
                sizeorder = 1
            else:
                sizeorder = 10
            # This case/swithc like looks for some string in the name to identify the nature of some tables. 
            # The transaction like should be high freq events. 
            # type and categories should be like 100 times smaller. 
            # Other tables will be in between
            colcount,skcount = 0,0
            pkname = ""
            sknames = []
            for column in table['columns']:
                colcount += 1
                name = column['colname']
                if name.endswith('Id'):
                    if name == (tname + 'Id'):
                        pkname = name
                    else:
                        sknames.append(name)
                        skcount += 1
            table['skcount'] = skcount, 
            table['sizeorder'] = sizeorder
        return
    def calculatedecimal(self,datatype): 
        matches = re.findall(r'decimal(\b,\b)', datatype)
        if len(matches) == 2:
            precision = matches[0]
            scale = matches[1]
        else:
            precision = 4
            scale = 2
        return precision,scale

    def createtypes(self, dtype, table, name, dlength, dfix):
        global seed
        # dtype = Data type for the column
        # table = tablename
        # name = column name
        # dfix = Dataframe filled with ids with the size expected.
        aseed = random.randint(0,seed)
        seed = (seed + aseed)%20003
        faker.Faker.seed(aseed)
        if dtype == 'integer' or dtype == 'long':
            if name == (table + 'Id'):
                # this column must be primary key, then we fill it with integer sequence == id, which is a sequence
                valuesdf = dfix.select("id", col("id").alias(name))
            elif name.endswith('Id'):
                # This is a secondary key.
                # some of the secondary tables can exist or not
                # we try/except to check if exists.
                try:
                    mydf = self.tablesDf
                    sksize = mydf.filter(mydf['tablename'] == name[:-2]).first()['nrows']
                    valuesdf = dfix.select("id", floor(rand(seed=aseed)*sksize).alias(name))
                except:
                    # in case we do not know the foreign table size we generate a simple sequence == primary key
                    valuesdf = dfix.select("id", col("id").alias(name))
                # creates a dataframe with the id columsn and a column identified with 'name'
            else:
                valuesdf = dfix.select("id", floor(rand(seed=aseed)*256).alias(name))
                #random values from 0 to 256 integers
        elif dtype == 'string':
            if name.__contains__('Url'):
                fake = faker.Faker()
                udf_fake = udf(fake.image_url)
                valuesdf = dfix.select('id',udf_fake().alias(name))
            elif name.__contains__('Email'):
                fake = faker.Faker()
                udf_fake = udf(fake.email)
                valuesdf = dfix.select('id',udf_fake().alias(name))
            elif name.__contains__('IpAddress'):
                fake = faker.Faker()
                udf_fake = udf(fake.ipv4)
                valuesdf = dfix.select('id',udf_fake().alias(name))
            elif name.__contains__('EmployeeName'):
                fake = faker.Faker()
                udf_fake = udf(fake.name)
                valuesdf = dfix.select('id',udf_fake().alias(name))
            else:
                nrows = dfix.count()
                fake = faker.Faker()
                def mycharfake():
                    return random.choice(string.ascii_lowercase)
                def mytextfake():
                    return fake.text(max_nb_chars=dlength)
                def mylettersfake():
                    return fake.random_letters(length=dlength)
                if dlength == 1:
                    udf_fake = udf(mycharfake)
                elif dlength > 5:
                    udf_fake = udf(mytextfake)
                else:
                    udf_fake = udf(mylettersfake)
                valuesdf = dfix.select("id",udf_fake().alias(name))
        elif dtype == 'binary':
            valuesdf = dfix.select("id", (rand(seed=aseed)))
            valuesdf = valuesdf.rdd.map(lambda x: (x[0], bytearray(str(x[1]),"UTF-8"))).toDF(["id",name])
        elif dtype == 'date':
            valuesdf = dfix.select("id", floor(rand(seed=aseed)*500)) # random integer 0-500
            today = dtime.date.today()
            valuesdf = valuesdf.rdd.map(lambda x: (x[0], today-dtime.timedelta(days=x[1]))).toDF(["id",name])
        elif dtype == 'bigint':
            valuesdf = dfix.select("id", floor(rand(seed=aseed)*3650000000).alias(name)) # random integers
        elif dtype == 'timestamp':
            dtoday =  time.time()
            valuesdf = dfix.select("id", floor(rand(seed=aseed)*15000000))
            valuesdf = valuesdf.rdd.map(lambda x: (x[0], dtoday-x[1])).toDF(["id","unixtime"])
            valuesdf = valuesdf.select("id",to_timestamp(from_unixtime(col("unixtime"))).alias(name))
        elif dtype == 'boolean':
            valuesdf = dfix.select("id", (rand(seed=aseed) > 0.5 ).alias(name))
        elif 'decimal' in dtype:
            precision,scale = self.calculatedecimal(dtype)
            intrange = 10^(precision-scale)-1
            valuesdf = dfix.select("id", round(rand(seed=aseed)*intrange*2-intrange,scale).alias(name))
        else:
            print('ERROR: '+ dtype +' is a datatype not conforming')
        return valuesdf

    def calculate_db_tables(self, size=5000):
        for table in self.schemaDic:
            rows = math.floor(table['sizeorder']*(size*(.9+random.random()/5)))
            table['nrows'] = rows
            self.populate_table_df(table,rows)
            print('Table: ' + table['tableName'] + ' -- Rows: ' + str(table['nrows']))
        return

    def populate_table_df(self, tabledict, size = 5000):
        emptydf = sqlContext.range(size)
        mydf = sqlContext.range(size)
        for column in tabledict['columns']:
            columndf=self.createtypes(column['type'], tabledict['tableName'], column['colname'], column['length'], emptydf)
            mydf=mydf.join(columndf,'id','left')
            # column[1] is tablename, column[0] is columname
        output = mydf
        output = output.drop('id')
        # We delete the original id column, although it could be the PK, we just avoid it, as there might not be PK in a table
        # coallesce to write a limited minimum file size which is fine for most tables with aprox 50K rows per file/worker
        if size < 80000:
            output.repartition(1).write.insertInto(database+'.'+tabledict['tableName'],overwrite = True)
        elif size < 800000:
            output.repartition(5).write.insertInto(database+'.'+tabledict['tableName'],overwrite = True)
        else:
            output.repartition(50).write.insertInto(database+'.'+tabledict['tableName'],overwrite = True)
        return


mydatagen=templateDataGenerator(database)
#display(mydatagen.tablesDf)
mydatagen.calculate_db_tables(size = 10000)