# Extract Data and Metadata from XML




In [0]:
# parse <Element> and root entries
# root is ProjectCollection


from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType, BooleanType
from pyspark.sql import Row
import xml.etree.ElementTree as ET

# Load and parse the XML
xml_path = "/Volumes/janesun/default/sas_xml/project_1.xml"
tree = ET.parse(xml_path)
root = tree.getroot()

# in _config komen losse entries in de root 
parsed_data = []
d = {"Element": {}, "_config": {}, "MetaDataInfo": {}, "DecisionManager":{}, "Parameters": {}, "Containers": {}}
for branch in root:
    if branch.tag == "Element":
        for child in branch:
            d["Element"][child.tag] = child.text
    elif branch.tag == "UseRelativePaths":
        d["_config"][branch.tag] = branch.text
    elif branch.tag == "SubmitToGrid":
        d["_config"][branch.tag] = branch.text
    elif branch.tag == "QueueSubmitsForServer":
        d["_config"][branch.tag] = branch.text
    elif branch.tag == "ActionOnError":
        d["_config"][branch.tag] = branch.text
    elif branch.tag == "ApplicationOverrides":
        d["_config"]["ApplicationOverrides"] = branch.text
    elif branch.tag == "ExploreDataList":
        d["_config"]["ExploreDataList"] = branch.text
    elif branch.tag == "MetaDataInfo":
        for child in branch:
            d["MetaDataInfo"][child.tag] = child.text
    elif branch.tag == "DecisionManager":
        for child in branch:
            d["DecisionManager"][child.tag] = child.text
    elif branch.tag == "Parameters":
        for child in branch:
            d["Parameters"][child.tag] = child.text
    elif branch.tag == "Containers":
        for child in branch:
            d["Containers"][child.tag] = child.text
parsed_data.append(d)

# Define the schema
schema = StructType([
    StructField("Element", MapType(StringType(), StringType()), True),
    StructField("_config", MapType(StringType(), StringType()), True),
    StructField("MetaDataInfo", MapType(StringType(), StringType()), True),
    StructField("DecisionManager", MapType(StringType(), StringType()), True),
    StructField("Parameters", MapType(StringType(), StringType()), True),
    StructField("Containers", MapType(StringType(), StringType()), True)
])
# Convert parsed data to Spark DataFrame
rows = [Row(**item) for item in parsed_data]
rootDF = spark.createDataFrame(rows, schema=schema)

display(rootDF)

In [0]:

# root is ProjectCollection
# branch is DataList
# Data is data
# element and data are the child van de data

import xml.etree.ElementTree as ET
from pyspark.sql import Row

def parse_deepest(var):
    d = {}
    for child in var:
        d[child.tag] = child.text
    return d

def parse_data(data):
    """
    Parses a SAS XML file datalist
    """
    d = {}
    
    for child in data:
        if child.tag == "Element":
            d[child.tag] = parse_deepest(child)
        if child.tag == "Data":
            for el in child:
                if el.tag == "ShortCutList":
                    d[el.tag] = parse_deepest(el)
                if el.tag == "DataModel":
                    d[el.tag] = parse_deepest(el)
    return d


# Load and parse the XML
xml_path = "/Volumes/janesun/default/sas_xml/project_1.xml"
tree = ET.parse(xml_path)
root = tree.getroot()

parsed_data = []
for branch in root:
    if branch.tag == "DataList":
        for data in branch:
            d = parse_data(data)
            #print(d)
            parsed_data.append(d)


# Convert parsed data to Spark DataFrame
rows = [Row(**item) for item in parsed_data]
datalistDF = spark.createDataFrame(rows)

display(datalistDF)

In [0]:
# parse <ExternalFileList>
# root is ProjectCollection
# branch is ExternalFileList
# ExtrenalFile is externalfile
# element and externalfile are the child van de externalfile

def parse_deepest(var):
    d = {}
    for child in var:
        d[child.tag] = child.text
    return d

def parse_data(externalfile):
    """
    Parses a SAS XML file externalfilelist
    """
    d = {}
    
    for child in externalfile:
        if child.tag == "Element":
            d[child.tag] = parse_deepest(child)
        if child.tag == "ExternalFile":
            for el in child:
                if el.tag == "ShortCutList":
                    d[el.tag] = parse_deepest(el)
                else:
                    d[el.tag] = el.text
    return d


# Load and parse the XML
xml_path = "/Volumes/janesun/default/sas_xml/project_1.xml"
tree = ET.parse(xml_path)
root = tree.getroot()

parsed_data = []
for branch in root:
    if branch.tag == "ExternalFileList":
        for externalfile in branch:
            d = parse_data(externalfile)
            #print(d)
            parsed_data.append(d)


# Convert parsed data to Spark DataFrame
rows = [Row(**item) for item in parsed_data]
externalFileListDF = spark.createDataFrame(rows)

display(externalFileListDF)

In [0]:
# parse <Elements> of type PFD
# root is ProjectCollection
# branch is Elements
# Element is element with attribute type
# element, containerelement and pfd are the child van de element
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType
from pyspark.sql import Row
import xml.etree.ElementTree as ET

def parse_deepest(var):
    d = {}
    for child in var:
        d[child.tag] = child.text
    return d

def parse_data(outerelement):
    """
    Parses a SAS XML file Elements
    """
    d = {}
    
    for child in outerelement:
        if child.tag == "PFD":
            d[child.tag] = []
            for el in child:
                if el.tag == "Process":
                    d2 = {}
                    for var in el:
                        if var.tag == "Element":
                            d2[var.tag] = parse_deepest(var)
                        if var.tag == "Depencies":
                            d2[var.tag] = parse_deepest(var)
                    d[child.tag].append(d2)
        else:
            d[child.tag] = parse_deepest(child)
    return d


# Load and parse the XML
xml_path = "/Volumes/janesun/default/sas_xml/project_1.xml"
tree = ET.parse(xml_path)
root = tree.getroot()

parsed_data = []
for branch in root:
    if branch.tag == "Elements":
        for outerelement in branch:
            if outerelement.attrib["Type"] == "SAS.EG.ProjectElements.PFD":
                d = parse_data(outerelement)
                d.setdefault("Element", {})
                d.setdefault("ContainerElement", {})
                d.setdefault("PFD", [])
                #print(d)
                parsed_data.append(d)

# Define the schema
schema = StructType([
    StructField("Element", MapType(StringType(), StringType()), True),
    StructField("ContainerElement", MapType(StringType(), StringType()), True),
    StructField("PFD", ArrayType(MapType(StringType(), StringType())), True)
])

# Convert parsed data to Spark DataFrame
rows = [Row(**item) for item in parsed_data]
PFDelementsDF = spark.createDataFrame(rows, schema=schema)

display(PFDelementsDF)

In [0]:
# parse <Elements> of type ShortCutToData
# root is ProjectCollection
# branch is Elements
# Element is element with attribute type
# element, containerelement and pfd are the child van de element
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType
from pyspark.sql import Row
import xml.etree.ElementTree as ET

def parse_deepest(var):
    d = {}
    for child in var:
        d[child.tag] = child.text
    return d

def parse_data(outerelement):
    """
    Parses a SAS XML file Elements
    """
    d = {}
    
    for child in outerelement:
        d[child.tag] = parse_deepest(child)
    return d


# Load and parse the XML
xml_path = "/Volumes/janesun/default/sas_xml/project_1.xml"
tree = ET.parse(xml_path)
root = tree.getroot()

parsed_data = []
for branch in root:
    if branch.tag == "Elements":
        for outerelement in branch:
            # if outerelement.attrib["Type"] == "SAS.EG.ProjectElements.ShortCutToFile":
            if outerelement.attrib["Type"] == "SAS.EG.ProjectElements.ShortCutToData":
                d = parse_data(outerelement)
                d.setdefault("Element", {})
                #print(d)
                parsed_data.append(d)

# Convert parsed data to Spark DataFrame
rows = [Row(**item) for item in parsed_data]
SCTDelementsDF = spark.createDataFrame(rows)

display(SCTDelementsDF)

In [0]:
# parse <Elements> of type ShortCutToFile
# root is ProjectCollection
# branch is Elements
# Element is element with attribute type
# element, containerelement and pfd are the child van de element
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType
from pyspark.sql import Row
import xml.etree.ElementTree as ET

def parse_deepest(var):
    d = {}
    for child in var:
        d[child.tag] = child.text
    return d

def parse_data(outerelement):
    """
    Parses a SAS XML file Elements
    """
    d = {}
    
    for child in outerelement:
        d[child.tag] = parse_deepest(child) if child is not None else {}
    return d


# Load and parse the XML
xml_path = "/Volumes/janesun/default/sas_xml/project_1.xml"
tree = ET.parse(xml_path)
root = tree.getroot()

parsed_data = []
for branch in root:
    if branch.tag == "Elements":
        for outerelement in branch:
            if outerelement.attrib["Type"] == "SAS.EG.ProjectElements.ShortCutToFile":
                d = parse_data(outerelement)
                d.setdefault("Element", {})
                d.setdefault("SHORTCUT", {})
                d.setdefault("ShortCutToFile", {})
                #print(d)
                parsed_data.append(d)

# Define the schema
schema = StructType([
    StructField("Element", MapType(StringType(), StringType()), True),
    StructField("SHORTCUT", MapType(StringType(), StringType()), True),
    StructField("ShortCutToFile", MapType(StringType(), StringType()), True),
])

# Convert parsed data to Spark DataFrame
rows = [Row(**item) for item in parsed_data]
SCTFelementsDF = spark.createDataFrame(rows, schema=schema)

display(SCTFelementsDF)

In [0]:
# parse <Elements> of type Log
# root is ProjectCollection
# branch is Elements
# Element is element with attribute type
# element, containerelement and pfd are the child van de element
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType
from pyspark.sql import Row
import xml.etree.ElementTree as ET

def parse_deepest(var):
    d = {}
    for child in var:
        d[child.tag] = child.text
    return d

def parse_data(outerelement):
    """
    Parses a SAS XML file Elements
    """
    d = {}
    
    for child in outerelement:
        d[child.tag] = parse_deepest(child)
    return d


# Load and parse the XML
xml_path = "/Volumes/janesun/default/sas_xml/project_1.xml"
tree = ET.parse(xml_path)
root = tree.getroot()

parsed_data = []
for branch in root:
    if branch.tag == "Elements":
        for outerelement in branch:
            if outerelement.attrib["Type"] == "SAS.EG.ProjectElements.Log":
                d = parse_data(outerelement)
                d.setdefault("Element", {})
                #print(d)
                parsed_data.append(d)

# Convert parsed data to Spark DataFrame
rows = [Row(**item) for item in parsed_data]
LOGelementsDF = spark.createDataFrame(rows)

display(LOGelementsDF)

In [0]:
# parse <Elements> of type Code
# root is ProjectCollection
# branch is Elements
# Element is element with attribute type
# element, containerelement and pfd are the child van de element
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType
from pyspark.sql import Row
import xml.etree.ElementTree as ET

def parse_deepest(var):
    d = {}
    for child in var:
        d[child.tag] = child.text
    return d

def parse_data(outerelement):
    """
    Parses a SAS XML file Elements
    """
    d = {}
    
    for child in outerelement:
        d[child.tag] = parse_deepest(child)
    return d


# Load and parse the XML
xml_path = "/Volumes/janesun/default/sas_xml/project_1.xml"
tree = ET.parse(xml_path)
root = tree.getroot()

parsed_data = []
for branch in root:
    if branch.tag == "Elements":
        for outerelement in branch:
            if outerelement.attrib["Type"] == "SAS.EG.ProjectElements.Code":
                d = parse_data(outerelement)
                d.setdefault("Element", {})
                #print(d)
                parsed_data.append(d)

# Convert parsed data to Spark DataFrame
rows = [Row(**item) for item in parsed_data]
CODEelementsDF = spark.createDataFrame(rows)

display(CODEelementsDF)

In [0]:
# parse <Elements> of type ImportTask
# root is ProjectCollection
# branch is Elements
# Element is element with attribute type
# element, containerelement and pfd are the child van de element
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType
from pyspark.sql import Row
import xml.etree.ElementTree as ET

def parse_deepest(var, prefix=""):
    d = {}
    for child in var:
        if prefix == "" and child.tag == "OutputDataList":
            parse_deepest(child, "OutputDataList_")
        else:
            d[prefix + child.tag] = child.text
    return d

def parse_data(data):
    """
    Parses a SAS XML file datalist
    """
    d = {
        "Element": {},
        "SubmitableElement": {},
        "ExpectedOutputDataList": {},
        "JobRecipe": {},
        "EGTask": {},
        "ImportTask": []
    }
    
    for child in data:
        if child.tag == "SubmitableElement":
            for el in child:
                if el.tag == "ExpectedOutputDataList":
                    for var in el:
                        if var.tag == "DataDescriptor":
                            d["ExpectedOutputDataList"] = parse_deepest(var, "DataDescriptor_")
                        else:
                            d["ExpectedOutputDataList"][var.tag] = var.text
                if el.tag == "JobRecipe":
                    # Jobrecipe includes one element also called jobrecipe
                    # iterate over the inner jobrecipe 
                    for var in el[0]:
                        print(var)
                        if var.tag == "OutputDataList":
                            d["JobRecipe"] = parse_deepest(var, "OutputDataList_")
                        else:
                            d["JobRecipe"][var.tag] = var.text
                else:
                    d["SubmitableElement"][el.tag] = el.text
        elif child.tag == "ImportTask":
            d["ImportTask"].append(parse_deepest(child))
        else:
            d[child.tag] = parse_deepest(child)
    return d


# Load and parse the XML
xml_path = "/Volumes/janesun/default/sas_xml/project_1.xml"
tree = ET.parse(xml_path)
root = tree.getroot()

parsed_data = []
for branch in root:
    if branch.tag == "Elements":
        for outerelement in branch:
            if outerelement.attrib["Type"] == "SAS.EG.ProjectElements.ImportTask":
                d = parse_data(outerelement)
                d.setdefault("Element", {})
                #print(d)
                parsed_data.append(d)

# Define the schema
schema = StructType([
    StructField("Element", MapType(StringType(), StringType()), True),
    StructField("SubmitableElement", MapType(StringType(), StringType()), True),
    StructField("ExpectedOutputDataList", MapType(StringType(), StringType()), True),
    StructField("JobRecipe", MapType(StringType(), StringType()), True),
    StructField("EGTask", MapType(StringType(), StringType()), True),
    StructField("ImportTask", ArrayType(StringType()), True)
])
# Convert parsed data to Spark DataFrame
rows = [Row(**item) for item in parsed_data]
IMPTelementsDF = spark.createDataFrame(rows, schema=schema)

display(IMPTelementsDF)

In [0]:
# parse <Elements> of type ShortCutToFile
# root is ProjectCollection
# branch is Elements
# Element is element with attribute type
# element, containerelement and pfd are the child van de element
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType
from pyspark.sql import Row
import xml.etree.ElementTree as ET

def parse_deepest(var):
    d = {}
    for child in var:
        d[child.tag] = child.text
    return d

def parse_data(outerelement):
    """
    Parses a SAS XML file Elements
    """
    d = {}
    
    for child in outerelement:
        d[child.tag] = parse_deepest(child) if child is not None else {}
    return d


# Load and parse the XML
xml_path = "/Volumes/janesun/default/sas_xml/project_1.xml"
tree = ET.parse(xml_path)
root = tree.getroot()

parsed_data = []
for branch in root:
    if branch.tag == "Elements":
        for outerelement in branch:
            if outerelement.attrib["Type"] == "SAS.EG.ProjectElements.ShortCutToFile":
                d = parse_data(outerelement)
                d.setdefault("Element", {})
                d.setdefault("SHORTCUT", {})
                d.setdefault("ShortCutToFile", {})
                #print(d)
                parsed_data.append(d)

# Define the schema
schema = StructType([
    StructField("Element", MapType(StringType(), StringType()), True),
    StructField("SHORTCUT", MapType(StringType(), StringType()), True),
    StructField("ShortCutToFile", MapType(StringType(), StringType()), True),
])

# Convert parsed data to Spark DataFrame
rows = [Row(**item) for item in parsed_data]
SCTFelementsDF = spark.createDataFrame(rows, schema=schema)

display(SCTFelementsDF)

Drop table if exists


In [0]:
%python
# Define library_name and table_name
#library_name = "/janesun/sas_data"  # Replace with your actual library name
datalist_table_name = "JG_Start_C7_Managing_DataList"  # Replace with your actual table name
external_file_list_table_name = "JG_Start_C7_Managing_externalFileList"
elements_table_name = "JG_Start_C7_Managing_elements"

# Create database and table
catalog_name = "janesun"
database_name = "sas_data"
library_name_fixed = f"{catalog_name}.{database_name}"

# Create database and table
spark.sql(f"CREATE DATABASE IF NOT EXISTS {library_name_fixed}")
datalistDF.write.format("delta").mode("overwrite").saveAsTable(f"{library_name_fixed}.{datalist_table_name.replace('/', '_').replace(' ', '_')}")

externalFileListDF.write.format("delta").mode("overwrite").saveAsTable(f"{library_name_fixed}.{external_file_list_table_name.replace('/', '_').replace(' ', '_')}")

elementsDF.write.format("delta").mode("overwrite").saveAsTable(f"{library_name_fixed}.{elements_table_name.replace('/', '_').replace(' ', '_')}")

display(datalistDF)

# Convert SAS Code to pyspark or spark SQL




# Migrate Data Workflows


# Migrate Analytics and Reporting



# Optimize performance

