In [2]:
"""Needed packages to run everything"""
from pyspark.sql import SparkSession
import yaml

"""General methods that are dictate what stable version the file is currently in, how the file is read, created and how the metadata is dumped into it"""

class generalMethods:
    def __init__(self, majorVersion: int, minorVersion : int) -> None:
        """
        Constructor of the class general methods 
        ...
        Arguments
        ----------
        self: 
            variable that calls this class to create an instance
        majorVersion : int
            current major version that is stable for instance
        minorVersion : int
            current minor version that is stable for instance
        ...
        Returns
        ----------
        None
        """
        self.majorVersion = majorVersion
        self.minorVersion = minorVersion
        self.stableVersion = f"{self.majorVersion}.{self.minorVersion}"

    def writeMetadataFile(self, schema: dict, outputMetadataPath: str, fileName: str, outputFileType : str = "yaml") -> None:
        """
        Write generated metadata, with the current stable version
        ...
        Arguments
        ----------
        schema : dict
            dictionary that contains all the data that needs to be inputed inside the .yaml file to structure the metadata
        outputMetadataPath : str
            path of where the metadata files are going to be saved, so that they are structured and easy to find
        fileName : str
            path of where each folder is gonna be grouped based on the type of the file
        stableVersion : str
            latest stable version that the metadata will be generated, which is depended on the global variables majorVersion and minorVersion
        outputFileType : str
            output file extension (type) that we want to create, default value is `yaml` if no value is given for that parameter
        ...
        Returns
        ----------
        None
        """
        with open(f'{outputMetadataPath}/{fileName}/{fileName}%{self.stableVersion}.{outputFileType}', 'w') as file:
            yaml.dump(schema, file, sort_keys=False, version=(self.majorVersion, self.minorVersion))
            

In [3]:

import threading
lock = threading.Lock()

"""Default functions to check if certain values fulfill certain criteria and functions that manipulate with the global "sources" variable"""

"""Mapping string value to boolean so we can use where ever they used"""
strBoolMapping = {
    "y": True,
    "n": False,
    "gdpr": "true"
}

def isNull(nullable : str) -> bool:
    """
    Finding from the dataframe column if that column can have null values (is it required or not)
    ...
    Arguments
    ----------
    nullable : str
        cell value of the row that we are currently checking, on the column "NULLABLE"
    ...
    Returns
    ----------
    Returns a boolean value, based on the variable strBoolMapping
    """
    return strBoolMapping[nullable.lower()]

def isDate(dataType : str) -> bool:
    """
    Function returns extra metadata for column "DATA_TYPE", if that column fulfills a certain condition (is a date type)
    ...
    Arguments
    ----------
    dataType : str
        cell value of the row that we are currently checking, on the column "DATA_TYPE"
    ...
    Returns
    ----------
    Retuns a boolean value, if that specific value has type date
    """
    dateTypeChecked = "date"
    dateBool = dataType.lower() == dateTypeChecked
    return dateBool

def isGdpr(gdprFlag : str) -> bool:
    """
    Function returns extra metadata for column "GDPR_FLAG", if that row fulfills a certain condition (has the cell value "true")
    ...
    Arguments
    ----------
    gdprFlag : str
        cell value of the row that we are currently checking, on the column "GDPR_FLAG"
    ...
    Returns
    ----------
    Boolean value, if the value that we want to compare is equal that to the value that is being compared by 
    """
    gdprBool = gdprFlag.lower() == strBoolMapping["gdpr"]
    return gdprBool


In [4]:
"""Needed packages to run everything in this file"""
from pyspark.sql import DataFrame

"""Functions that generate all the metadata for different parts of a dataframe """
    
def genPrimaryArgsMeta(title : str, sourceSystem : str, description : str = "A metaschema generated from an example dataset.") -> dict:
    """
    Write general info of metadata file
    ...
    Arguments
    ----------
    title : str
        variable that is gonna be used for the title of the metadata file
    sourceSystem : 
    description : str
        variable that is gonna be used for the description of the metadata file, also has a default value which is gonna be used when there is not value give
    ...
    Returns
    ----------
    It returns a dictionary of the values mentioned above which are gonna be used later to build the entires metadata file
    """
    version = 2
    primaryArgSchema = {
        "version": version,
        "source_system": sourceSystem,
        "title": title,
        "description": f"{description}",
        "tables": []
    }
    return primaryArgSchema
    
def genTableArgsMeta(tableName : str, sourceRef : int, inputSchema : list[dict]) -> dict:
    """
    Generating primary attributes for metadata schema
    ...
    Arguments
    ----------
    tableName : str
        name of the table that we are generating metadata for
    sourceSystem : str
        name of the source system that the table column is part of
    inputSchema : list[dict]
        list of dictionaries that are placed in the `columns` attribute
    ...
    Returns
    ----------
    A dictionary of the primary metadata attributes for the table that we are checking 
    """
    tableArgMeta = {
        "name": f"{tableName}",
        "source": sourceRef,
        "columns": inputSchema,
        "primary_key": "unid",
        "cdc_column": "dml_flag",
        "cdc_type": "soft" 
    }
    return tableArgMeta

def genTableArgsGdpr(tableName : str, inputSchema : list[dict]) -> dict:
    """
    Generating primary attributes for gdpr schema
    ...
    Arguments
    ----------
    tableName : str
        name of the table that we are generating metadata for
    inputSchema : list[dict]
        list of dictionaries that are placed in the `personal < hash` attribute
    ...
    Returns
    ----------
    A dictionary of the primary gdpr attributes for the table that we are checking 
    """
    tableArgGdpr = {
        "name": tableName,
        "personal_data": {"hash" : inputSchema}
    }
    return tableArgGdpr

def genColumnSchema(tableDataframe : DataFrame) -> dict:
    """
    Generating the schema metadata for each column on a table
    ...
    Arguments
    ----------
    tableDataframe : DataFrame
        a piece of the original dataframe that has the column "table_name" equal to a specific table name that enables us to see all the columns in that specific piece of dataframe
    ...
    Returns
    ----------
    Function returns a dictionary that contains metadata for each dataframe column (like: name, data_type, is_nullable, date_format etc)
    """
    columnSchema = {
        "columns": [],
        "hash": []
    }
    for row in tableDataframe:
        columnName = row[5].lower()
        dataType = row[6].lower()
        if "number" in dataType:
            dataType = dataType.replace("number", "decimal")
        if "varchar" in dataType:
            dataType = "string"
        column_info = {
            "name": columnName,
            "data_type": dataType
        }
        if isNull(row[10]):
            column_info["is_nullable"] = True
        if isDate(dataType):
            column_info["date_format"] = "yyyy-MM-dd HH:mm:ss"
        if isGdpr(row[14]):
            columnSchema["hash"].append(columnName)
        columnSchema["columns"].append(column_info)
    return columnSchema

In [5]:
"""Needed packages to run everything"""
from pyspark.sql import DataFrame
from concurrent.futures import ThreadPoolExecutor

"""All the functions to generate the schemas are called here and that data is saved in the variable "schema\""""

def datasetTableInfo(dataFrame : DataFrame, metadataTitle : str, description : str = "A metaschema generated from an example dataset.") -> dict:
    """
    All the pieces of the whole schema are generated here and are merged together
    ...
    Arguments
    ----------
    dataFrame : DataFrame
        the dataframe that we are gonna generate all the schema for
    metadataTitle : str
        title of the schema file that we are gonna call it
    description : str
        description about the schema, if this argument is not sent the default value is used
    ...
    Returns
    ----------
    The whole generated schema about this dataframe
    """
    firstRow = dataFrame.head(1)[0]
    source = {
        "load_type": "delta",
        "file_pattern": firstRow.FILE_NAME.lower(),
        "params": {
            "decimal_separator": firstRow.DECIMAL_SEPARATOR,
            "format": firstRow.FILE_TYPE.lower()
        }
    }
    sourceSystem = firstRow.SOURCE_SYSTEM
    completeSchema = {
        "metadata": genPrimaryArgsMeta(metadataTitle, sourceSystem, description),
        "gdpr": []
    }
    separatedDataFrames = {}
    for row in dataFrame.collect():
        table_name = row["table_name"]
        row_values = list(row.asDict().values())
        if table_name in separatedDataFrames:
            separatedDataFrames[table_name].append(row_values)
        else:
            separatedDataFrames[table_name] = [row_values]
            
    def processDataframe(key_value):
        key, value = key_value
        sourceRef = source
        columnSchema = genColumnSchema(value)
        tableArgsMeta = genTableArgsMeta(key, sourceRef, columnSchema["columns"])
        tableArgsGdpr = genTableArgsGdpr(key, columnSchema["hash"])
        return tableArgsMeta, tableArgsGdpr

    def parallelProcessDataframes(dataframes):
        with ThreadPoolExecutor(max_workers=4) as executor:
            results = executor.map(processDataframe, dataframes.items())
            for tableArgsMeta, tableArgsGdpr in results:
                completeSchema["metadata"]["tables"].append(tableArgsMeta)
                completeSchema["gdpr"].append(tableArgsGdpr)
        return completeSchema
    
    return parallelProcessDataframes(separatedDataFrames)
        
    

In [9]:
import time

"""Basic variables that define current stable version of schemas, "SparkSession" object, paths of where to read and save files and calling functions that generate and dump the schema to those files"""
spark = SparkSession.builder.getOrCreate()

"""Gloabl varibles for major and minor stable metadata and gdpr versions"""
majorMetadataVersion = 1
minorMetadataVersion = 6
metadata = generalMethods(majorMetadataVersion, minorMetadataVersion)

majorGdprVersion = 1
minorGdprVersion = 6
stableGdprVersion = generalMethods(majorGdprVersion, minorGdprVersion)

"""Global input metadata path"""
inputDataFramePath = "/app/syntheticDataset100K"

"""Global output metadata path"""
outputSchemaPath = "../Output Metadata/Schema Versions"
metadataFileName = "MetadataSpeedTest"
gdprFileName = "GdprSpeedTest"


"""Saving the dataframe into a variable so that we don't have to call the function readCSV() each time!"""
dataFrame = spark.read.csv(f"{inputDataFramePath}.csv", header=True, sep=';').cache()

"""The variable where we save the time each iteration takes"""
time_list = []

"""Iterate 10 times over the same task, so we can get the average time it takes over 10 tests"""
for _ in range(10):
    """Start the timer, not including the time it takes to read the csv"""
    beginTime = time.time()

    """Generating the metadata from the dataset and saving it into a variable"""
    primaryArgSchema = datasetTableInfo(dataFrame, "TestMetadata")
    metadataSchema = primaryArgSchema["metadata"]
    gdprSchema = primaryArgSchema["gdpr"]

    """Stop the time after also the spark session has stoped"""
    timeElapsed = time.time() - beginTime

    """Append the time to the overall time consumed for each iteration"""
    time_list.append(timeElapsed)

"""Show how much each iteration has taken"""
print(time_list)

def save_time_speeds(file_path: str, time_list: list, algorithm: str, dataset: str) -> None:
    with open(file_path, 'a') as file:
        file.write("\n")
        file.write(f"Algorithm: {algorithm}\n")
        file.write(f"Dataset: {dataset}\n")
        file.write("Time speeds (in seconds):\n")
        for time_elapsed in time_list:
            file.write(f"{time_elapsed}\n")

algorithm = "Parallel PySpark Speed"
dataset = "100000 rows, 1000 tables"
save_time_speeds('../timeSpeeds.txt', time_list, algorithm, dataset)

"""Using generated metadata, output paths and the current latest stable version to dump the metadata into a .yaml schema """
# metadata.writeMetadataFile(metadataSchema, outputSchemaPath, metadataFileName)
# metadata.writeMetadataFile(gdprSchema, outputSchemaPath, gdprFileName)

"""Stop the spark session"""
spark.stop()

[3.4472897052764893, 2.6291465759277344, 2.5403432846069336, 2.7508511543273926, 2.6073222160339355, 2.669354200363159, 2.431424856185913, 2.587740659713745, 2.580059766769409, 2.5330612659454346]
