#### importing notebook to get Modeldata class

In [None]:
%run "./1_Modeldata"

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting https://github.com/eduardhendriksen/PyForge/archive/master.tar.gz
  Using cached https://github.com/eduardhendriksen/PyForge/archive/master.tar.gz
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting flatten_dict
  Using cached flatten_dict-0.4.2-py2.py3-none-any.whl (9.7 kB)
Collecting requests-toolbelt
  Using cached requests_toolbelt-1.0.0-py2.py3-none-any.whl (54 kB)
Building wheels for collected packages: PyForge
  Building wheel for PyForge (setup.py): started
  Building wheel for PyForge (setup.py): finished with status 'done'
  Created wheel for PyForge: filename=PyForge-0.4-py3-none-any.whl size=23839 sha256=3572a47e1f9d9e8b90aefe8b6250812f04b8cf46bb96b10b96f0dfbd8218a7d8
  Stored in directory: /tmp/pip-ephem-wheel-cache-hsjevao5/wheels/36/2b/00/cda886582b7b9dcbd63d645cbaed0ae96c3169e5945a72a4f2


In [None]:
#creating spark session

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder \
        .appName("pandas_to_spark") \
        .getOrCreate()

## Parameters

In [None]:
dbutils.widgets.text("p_project_id", "")
dbutils.widgets.text("p_folder_id", "")
dbutils.widgets.text("p_model_name", "")
dbutils.widgets.text("p_view_name", "")
dbutils.widgets.text("p_project_name","")

## Reading Parameter value

In [None]:
v_project_id = dbutils.widgets.get("p_project_id")
v_folder_id = dbutils.widgets.get("p_folder_id")
v_model_name = dbutils.widgets.get("p_model_name")
v_view_name = dbutils.widgets.get("p_view_name")
v_project_name = dbutils.widgets.get("p_project_name")

## Initialize the Modeldata class and work with data

In [None]:
model_data = ModelData(auth_address='https://developer.api.autodesk.com/authentication/v2/token',
                                            client_id="",
                                            client_secret='',
                                            scopes='data:read',
                                            hub_id='2ee6be14-9d13-4d36-8d21-49bf9ba124cd',
                                            project_name=v_project_name,
                                            project_id=v_project_id,
                                            folder_id=v_folder_id,
                                            model_name=v_model_name,
                                            view_name=v_view_name)

model_version = model_data.version_number
model_object_tree,model_object_properties = model_data.get_model_object_tree_properties(model_data.token, model_data.model_urn, model_data.view_name)

In [None]:
type_names, elem_names, type_frames, elem_frames = [], [], [], []

for type_name, elem_name, type_frame, elem_frame in model_data.process_object_tree(model_object_tree, model_object_properties):
    type_names.append(type_name)
    elem_names.append(elem_name)
    type_frames.append(type_frame)
    elem_frames.append(elem_frame)

Processing tables for Walls
Succesfully created tables for Walls
Processing tables for Structural Columns
Succesfully created tables for StructuralColumns
Processing tables for Floors
Succesfully created tables for Floors
Processing tables for Structural Framing
Succesfully created tables for StructuralFraming
Processing tables for Generic Models
Succesfully created tables for GenericModels
Processing tables for Structural Foundations
Succesfully created tables for StructuralFoundations
Processing tables for Slab Edges
Succesfully created tables for SlabEdges
Processing tables for Columns
Succesfully created tables for Columns
Processing tables for Structural Connections
Succesfully created tables for StructuralConnections
Processing tables for Stairs
Succesfully created tables for Stairs


In [None]:
# Function to create Spark DataFrames
def create_spark_dataframes(spark, frames):
    spark_frames = [spark.createDataFrame(df) for df in frames]
    return spark_frames

In [None]:
# Create Spark DataFrames
spark_elem_frames = create_spark_dataframes(spark, elem_frames)
spark_type_frames = create_spark_dataframes(spark, type_frames)

In [None]:
dataframe_object = dict(zip(
    [*type_names, *elem_names],
    [*spark_type_frames, *spark_elem_frames]
))

In [None]:
database_name= []

for i in dbutils.fs.ls('mnt/kevee/bronze/'):
     database_name.append(i.name.split('/')[0])

In [None]:
def get_dataframe_object(path):

    dataframe_object_adls= {}
    for i in dbutils.fs.ls(path):
        _i = i.name.split('/')[0]
        table_name = _i.split('.')[0]
        dataframe_object_adls[table_name] = spark.read.format("parquet").load(i.path)
    return dataframe_object_adls

In [None]:
if v_project_name not in database_name:
    
    # When creating new tables, we append them to the dataframe_object to consider all different versions of the table
    for table, df in dataframe_object.items():
        path = f'mnt/kevee/bronze/{v_project_name}/{table}.parquet'
        df.write.mode('overwrite').parquet(path)
    
else:
    db_path = f'mnt/kevee/bronze/{v_project_name}' 
    dataframe_object_adls = get_dataframe_object(db_path)
            
    for table in dataframe_object.keys():
        if table not in dataframe_object_adls.keys():
            path = f'mnt/kevee/bronze/{v_project_name}/{table}.parquet'
            df = dataframe_object[table]
            df.write.mode('overwrite').parquet(path)
            
            # Store the dataframe in dataframe_object_adls so that future versions can be updated correctly
            dataframe_object_adls[table] = df
            
        else:
            df = dataframe_object_adls[table]
            table_versions = df.select("modelVersionNumber").distinct().rdd.flatMap(lambda x: x).collect()
            
            if model_version not in table_versions:
                path = f'mnt/kevee/bronze/{v_project_name}/{table}.parquet'
                dataframe_object_adls[table].write.mode('append').parquet(path)
                
            elif model_version in table_versions:
                path = f'/mnt/kevee/bronze/{v_project_name}/{table}.parquet/'
                # Load the existing data from the path
                df_existing = spark.read.format("parquet").load(path)
                
                # Select all rows with modelVersionNumber equal to model_version
                df_updated = df_existing.filter(col("modelVersionNumber") != model_version)
                df_updated = df_updated.union(dataframe_object[table])
                
                # Overwrite the rows with modelVersionNumber equal to model_version
                df_updated.write.mode('overwrite').parquet(path)
                
                # Update the dataframe_object_adls with the latest version
                dataframe_object_adls[table] = df_updated

In [None]:
dbutils.notebook.exit("Success")