## Review

https://github.com/microsoft/semantic-link-labs/blob/main/notebooks/Capacity%20Migration.ipynb for info on Capacity Migration and ideas

1. Attach a lakehouse a schema enabled Lakehouse to the Notebook
2. Execute all cells
3. Execute again after changes to any Fabric item
3. Review changes to SCDs


In [13]:
!pip install --upgrade semantic-link --q #upgrade to most recent semantic-link...not necessary for Fabric runtime 1.3

StatementMeta(, ca15c2f6-4dc9-4a7c-802c-2fd3dff43365, 15, Finished, Available, Finished)

In [221]:
import sempy.fabric as fabric
import pandas as pd
from pyspark.sql.functions import *
from datetime import datetime

current_datetime = datetime.now()
#change this to hide result of API loads
show_results = False

StatementMeta(, 51fbb84b-59e1-4d32-b370-0df15fdf910b, 223, Finished, Available, Finished)

In [222]:
%%sql
CREATE SCHEMA IF NOT EXISTS stage

StatementMeta(, 51fbb84b-59e1-4d32-b370-0df15fdf910b, 224, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

## Fetch data for Capacities, Workspaces and Items from API

Using sempy and logged in user

In [223]:
capacity_list = fabric.list_capacities()
capacity_list = capacity_list.rename(columns={'Id': 'CapacityId','Display Name': 'CapacityName'})
capacity_list['EffectiveDate'] = current_datetime
spark_capacity_list=spark.createDataFrame(capacity_list)
spark_capacity_list = spark_capacity_list.withColumn('row_checksum', md5(concat_ws('CapacityId','CapacityName','SKU','Region','State')))
spark_capacity_list.write.mode("overwrite").format("delta").saveAsTable("stage.stage_capacity_list")
if show_results:
    display(spark_capacity_list)

StatementMeta(, 51fbb84b-59e1-4d32-b370-0df15fdf910b, 225, Finished, Available, Finished)

In [224]:
ws_list = fabric.list_workspaces()
ws_list = ws_list.rename(columns={'Id': 'WorkspaceId','Display Name': 'CapacityName', 'Capacity Id': 'CapacityId', 'Is Read Only':  'IsReadOnly',
            'Is On Dedicated Capacity': 'IsOnDedicatedCapacity','Default Dataset Storage Format': 'DefaultDatasetStorageFormat',
            'Name' : 'WorkspaceName'})
ws_list['EffectiveDate'] = current_datetime
spark_ws_list=spark.createDataFrame(ws_list)
spark_ws_list = spark_ws_list.withColumn('row_checksum', md5(concat_ws('WorkspaceId','IsReadOnly','isOnDedicatedCapacity','CapacityId','DefaultDatasetStorageFormat','Type', 'WorkspaceName')))
spark_ws_list.write.mode("overwrite").format("delta").saveAsTable("stage.stage_workspace_list")
if show_results:
    display(spark_ws_list)

StatementMeta(, 51fbb84b-59e1-4d32-b370-0df15fdf910b, 226, Finished, Available, Finished)

In [225]:
ws_item_list = pd.concat([fabric.list_items(workspace=ws) for ws in fabric.list_workspaces().query('`Is On Dedicated Capacity` == True').Id], ignore_index=True)
ws_item_list = ws_item_list.rename(columns={'Id': 'ItemId','Display Name': 'ItemName', 'Workspace Id': 'WorkspaceId'})
ws_item_list['EffectiveDate'] = current_datetime
spark_ws_item_list=spark.createDataFrame(ws_item_list)
spark_ws_item_list = spark_ws_item_list.withColumn('row_checksum', md5(concat_ws('ItemId','ItemName','Description','Type','WorkspaceId')))
spark_ws_item_list.write.mode("overwrite").format("delta").saveAsTable("stage.stage_workspace_item_list")
if show_results:
    display(spark_ws_item_list)

StatementMeta(, 51fbb84b-59e1-4d32-b370-0df15fdf910b, 227, Finished, Available, Finished)

## Create tables for history if they don't exist

In [226]:
%%sql
create table if not exists dbo.capacity_list
    (CapacityId	string
    , CapacityName	string
    , Sku	string
    , Region	string
    , State	string
    , EffectiveDate	timestamp
    , row_checksum string
    , current_row boolean
    , EndDate timestamp
    )

StatementMeta(, 51fbb84b-59e1-4d32-b370-0df15fdf910b, 228, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [227]:
%%sql
create table if not exists dbo.workspace_list
    (WorkspaceId	string
    , IsReadOnly	boolean
    , IsOnDedicatedCapacity	boolean
    , CapacityId	string
    , DefaultDatasetStorageFormat	string
    , Type	string
    , WorkspaceName	string
    , EffectiveDate	timestamp
    , row_checksum	string
    , current_row boolean
    , EndDate timestamp
    )

StatementMeta(, 51fbb84b-59e1-4d32-b370-0df15fdf910b, 229, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [228]:
%%sql
create table if not exists dbo.workspace_item_list
    (ItemId	string
    , ItemName	string
    , Description	string
    , Type	string
    , WorkspaceId	string
    , EffectiveDate	timestamp
    , row_checksum	string
    , current_row boolean
    , EndDate timestamp
    )

StatementMeta(, 51fbb84b-59e1-4d32-b370-0df15fdf910b, 230, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

## Populate slowly changing dimensions

In [229]:
  %%sql
--scd type 2 for capacity list
MERGE INTO dbo.capacity_list
USING (
  SELECT s.CapacityId as mergeKey, s.*
  FROM stage.stage_capacity_list s
  UNION ALL
  SELECT NULL as mergeKey, s.*
  FROM stage.stage_capacity_list s JOIN capacity_list t ON s.CapacityId = t.CapacityId 
  WHERE s.row_checksum <> t.row_checksum and t.current_row = TRUE
    ) staged_updates ON capacity_list.CapacityId = mergeKey
WHEN MATCHED AND capacity_list.current_row = TRUE AND capacity_list.row_checksum <> staged_updates.row_checksum THEN  
  UPDATE SET endDate = staged_updates.EffectiveDate, current_row = FALSE    
WHEN NOT MATCHED THEN 
  INSERT(CapacityId, CapacityName, Sku, Region, State, row_checksum, EffectiveDate,current_row, EndDate) 
  VALUES(staged_updates.CapacityId, staged_updates.CapacityName, staged_updates.Sku, staged_updates.Region
  , staged_updates.State, staged_updates.row_checksum, staged_updates.EffectiveDate, TRUE, make_date(2099,12,31))
WHEN NOT MATCHED BY SOURCE and capacity_list.current_row = TRUE THEN 
    UPDATE SET EndDate = CURRENT_TIMESTAMP(), current_row= false

StatementMeta(, 51fbb84b-59e1-4d32-b370-0df15fdf910b, 231, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 4 fields>

In [230]:
%%sql
--scd type 2 for workspace list
MERGE INTO dbo.workspace_list
USING ( 
  SELECT s.WorkspaceId as mergeKey, s.*
  FROM stage.stage_workspace_list s
  UNION ALL
  SELECT NULL as mergeKey, s.*
  FROM stage.stage_workspace_list s JOIN workspace_list t ON s.WorkspaceId = t.WorkspaceId 
  WHERE s.row_checksum <> t.row_checksum and t.current_row = TRUE
  ) staged_updates
ON workspace_list.WorkspaceId = mergeKey
WHEN MATCHED AND workspace_list.current_row = TRUE AND workspace_list.row_checksum <> staged_updates.row_checksum THEN  
  UPDATE SET endDate = staged_updates.EffectiveDate , current_row = FALSE
WHEN NOT MATCHED THEN 
  INSERT(WorkspaceId, IsReadOnly, IsOnDedicatedCapacity, CapacityId, DefaultDatasetStorageFormat, Type, WorkspaceName, EffectiveDate,row_checksum,current_row, EndDate) 
  VALUES(staged_updates.WorkspaceId, staged_updates.IsReadOnly, staged_updates.IsOnDedicatedCapacity, staged_updates.CapacityId
    , staged_updates.DefaultDatasetStorageFormat, staged_updates.Type
    , staged_updates.WorkspaceName, staged_updates.EffectiveDate,staged_updates.row_checksum, TRUE, make_date(2099,12,31))
WHEN NOT MATCHED BY SOURCE and workspace_list.current_row = TRUE THEN 
    UPDATE SET EndDate = CURRENT_TIMESTAMP(), current_row= false

StatementMeta(, 51fbb84b-59e1-4d32-b370-0df15fdf910b, 232, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 4 fields>

In [231]:
%%sql
--scd type 2 for workspace_item_list 
MERGE INTO dbo.workspace_item_list
USING ( 
  SELECT s.ItemId as mergeKey, s.*
  FROM stage.stage_workspace_item_list s
  UNION ALL
  SELECT NULL as mergeKey, s.*
  FROM stage.stage_workspace_item_list s JOIN dbo.workspace_item_list t ON s.ItemId = t.ItemId 
  WHERE s.row_checksum <> t.row_checksum and t.current_row = TRUE
  ) staged_updates
ON workspace_item_list.ItemId  = mergeKey
WHEN MATCHED AND workspace_item_list.current_row = TRUE AND workspace_item_list.row_checksum <> staged_updates.row_checksum THEN  
  UPDATE SET endDate = staged_updates.EffectiveDate, current_row = FALSE
WHEN NOT MATCHED THEN 
  INSERT(ItemId, ItemName, Description, Type, WorkspaceId, EffectiveDate, row_checksum, current_row, EndDate) 
  VALUES(staged_updates.ItemId, staged_updates.ItemName, staged_updates.Description
    , staged_updates.Type, staged_updates.WorkspaceId, staged_updates.EffectiveDate, staged_updates.row_checksum, TRUE, make_date(2099,12,31))
WHEN NOT MATCHED BY SOURCE and workspace_item_list.current_row = TRUE THEN 
    UPDATE SET EndDate = CURRENT_TIMESTAMP(), current_row= false

StatementMeta(, 51fbb84b-59e1-4d32-b370-0df15fdf910b, 233, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 4 fields>

In [238]:
%%sql
select * from dbo.capacity_list
where EffectiveDate > dateadd(current_timestamp(), -3)
order by CapacityName, EffectiveDate

StatementMeta(, 51fbb84b-59e1-4d32-b370-0df15fdf910b, 240, Finished, Available, Finished)

<Spark SQL result set with 6 rows and 9 fields>

In [237]:
%%sql
--changes from the last day, note does not include items where workspace is deleted
select c.CapacityName, w.WorkspaceName, i.* 
from dbo.workspace_item_list i
    join workspace_list w on i.WorkspaceId = w.WorkspaceId and w.current_row = True
    join capacity_list c on w.CapacityId =c.CapacityId and c.current_row = True
where ItemId in (
    SELECT ItemId FROM dbo.workspace_item_list 
    where current_row = false and EndDate > date_add(current_timestamp,-1)
    )
order by ItemId, EffectiveDate

StatementMeta(, 51fbb84b-59e1-4d32-b370-0df15fdf910b, 239, Finished, Available, Finished)

<Spark SQL result set with 3 rows and 11 fields>

In [234]:
%%sql
--show workspaces by capacity
select 
    c.CapacityId, c.CapacityName, c.Region
    , w.WorkspaceId, w.WorkspaceName
from
dbo.capacity_list c
left join dbo.workspace_list w on c.CapacityId = w.CapacityId and c.current_row = True and w.current_row = True
order by 
    CapacityName, WorkspaceName


StatementMeta(, 51fbb84b-59e1-4d32-b370-0df15fdf910b, 236, Finished, Available, Finished)

<Spark SQL result set with 16 rows and 5 fields>