In [1]:
'''.
           @@@@@@@@@@
       @@@@..........@@@@
    @@@         .        @@@
  @@.           .         . @@
 @  .     _     .         .   @
@........| |...................@    *********************************************
@      . | |   _____  .        @
@      . | |  |  __ \ .        @    La Data Web
@      . | |__| |  | |.   ***  @
@........|____| |  | |...*   *.@    Copyright © 2024 Ignacio Barrau
@   .       . | |__| |. *     *@
@   .       . |_____/ . *     *@    *********************************************
@   .       .         . *     *@
@   .       .         . *******@
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
'''

from notebookutils import mssparkutils
from pyspark.sql.functions import col
import os
import time
import pandas as pd

StatementMeta(, 90da0dcb-9ddb-42aa-991a-cca7edf24103, 3, Finished, Available)

#### Step 1

In [2]:
# Get mounts of session
mssparkutils.fs.mounts()
# Dataflow staging lake abfss of Tables
lake_source = "abfss://3e10dd2f-e450-496f-b1cc-2db2a2bb3d07@onelake.dfs.fabric.microsoft.com/5b1e13c2-059f-4bcb-b48c-f7af4fab7037/Tables/"
# Lakehouse File destination folder
lake_destination = "abfss://3e10dd2f-e450-496f-b1cc-2db2a2bb3d07@onelake.dfs.fabric.microsoft.com/021a9683-640d-4155-af13-cf64c7ee1ad5/Files/Bronze/"
# Dataflow json metadata as models$[dataflowid]
dataflow = "Files/models$50a92467_002D7193_002D4445_002D8ac5_002D00143959ff98/*.json"

StatementMeta(, 90da0dcb-9ddb-42aa-991a-cca7edf24103, 4, Finished, Available)

SynapseWidget(Synapse.DataFrame, 55faf449-f6a5-4998-871b-13cc8f0bf286)

#### Step 2

In [3]:
# Generate a list tables snapshots at the mounted path
tables = os.listdir('/lakehouse/default/Tables/')
print(tables)

StatementMeta(, 90da0dcb-9ddb-42aa-991a-cca7edf24103, 5, Finished, Available)

['5ac4c12adf204c28bb513cbefe9e5a33_InternetSales_002Eparquet', '5ac4c12adf204c28bb513cbefe9e5a33_Product_002Eparquet', '6b208846f20c4c6690506e99f1a33e9e_InternetSales_002Eparquet', '6b208846f20c4c6690506e99f1a33e9e_Product_002Eparquet', 'b865240d8d674f28bb0261f7ba64c7cf_Customers_002Eparquet', 'd74d1261e005426a84e9718c7cda53b7_Customers_002Eparquet']


#### Step 3

In [4]:
# Generate lists with real table name at staging, legible table name and last refreshed dates of each table snapshot
tables_times = [ [snapshot, snapshot.split("_")[1], time.ctime(os.stat('/lakehouse/default/Tables/'+snapshot).st_mtime)] for snapshot in tables]
print(tables_times)

StatementMeta(, 90da0dcb-9ddb-42aa-991a-cca7edf24103, 6, Finished, Available)

[['5ac4c12adf204c28bb513cbefe9e5a33_InternetSales_002Eparquet', 'InternetSales', 'Wed Nov 15 19:44:49 2023'], ['5ac4c12adf204c28bb513cbefe9e5a33_Product_002Eparquet', 'Product', 'Wed Nov 15 19:44:49 2023'], ['6b208846f20c4c6690506e99f1a33e9e_InternetSales_002Eparquet', 'InternetSales', 'Wed Nov 15 20:12:12 2023'], ['6b208846f20c4c6690506e99f1a33e9e_Product_002Eparquet', 'Product', 'Wed Nov 15 20:12:13 2023'], ['b865240d8d674f28bb0261f7ba64c7cf_Customers_002Eparquet', 'Customers', 'Tue Dec  5 17:45:33 2023'], ['d74d1261e005426a84e9718c7cda53b7_Customers_002Eparquet', 'Customers', 'Mon Dec  4 19:12:16 2023']]


#### Step 4

In [6]:
# Convert lists to pandas dataframe to keep it simple to loop or query
df = pd.DataFrame(tables_times, columns=["table", "table_name", "modified_date"]).sort_values(by=["table_name", "modified_date"],ascending=False)
df.head(10)

StatementMeta(, 90da0dcb-9ddb-42aa-991a-cca7edf24103, 8, Finished, Available)

Unnamed: 0,table,table_name,modified_date
3,6b208846f20c4c6690506e99f1a33e9e_Product_002Ep...,Product,Wed Nov 15 20:12:13 2023
1,5ac4c12adf204c28bb513cbefe9e5a33_Product_002Ep...,Product,Wed Nov 15 19:44:49 2023
2,6b208846f20c4c6690506e99f1a33e9e_InternetSales...,InternetSales,Wed Nov 15 20:12:12 2023
0,5ac4c12adf204c28bb513cbefe9e5a33_InternetSales...,InternetSales,Wed Nov 15 19:44:49 2023
4,b865240d8d674f28bb0261f7ba64c7cf_Customers_002...,Customers,Tue Dec 5 17:45:33 2023
5,d74d1261e005426a84e9718c7cda53b7_Customers_002...,Customers,Mon Dec 4 19:12:16 2023


#### Step 5

In [7]:
# Get tables metadata of last refresh
meta = spark.read.option("multiline", "true").json(dataflow).sort("modifiedTime", ascending=False).limit(1)
metadata = []
for i in meta.select(col("entities")).first()[0]:
    metadata.append([ i[2], [ columna[1] for columna in i[1] ] ])

StatementMeta(, 90da0dcb-9ddb-42aa-991a-cca7edf24103, 9, Finished, Available)

#### Step 6

In [11]:
# Loop table names in order to take the first row of each one with the most recent refresh date and move it to bronze with overwrite mode
for item in metadata:    
    # Get Table full name at DataflowsStaging Lakehouse
    df_values = df.loc[df['table_name'] == item[0]].sort_values(by=['modified_date'], ascending=False).head(1)
    # Create spark frame with the table source
    sframe = spark.read.format("parquet").load(lake_source + df_values["table"].iloc[0])
    # Set columnas name from json metadata to spark frame for destination
    sframe = sframe.toDF(*item[1])
    # Write spark frame on destination
    sframe.write.format("parquet").mode("overwrite").save(lake_destination + item[0] )

StatementMeta(, 90da0dcb-9ddb-42aa-991a-cca7edf24103, 13, Finished, Available)