# Load Data From External Location to Unity Catalog

**Import Libraries**

In [0]:
from functools import reduce # load library to remove specific caracters
from pyspark.sql.types import DecimalType, IntegerType

**Read Data From External Location**

In [0]:
df = spark.read.format("csv").option("delimiter",";").option("header", "true") \
          .load("abfss://electric@adlsviguyonv.dfs.core.windows.net/raw/consommation/consommation.csv")

In [0]:
df = df.drop("code_iris", "nom_iris", "indice_de_repetition", "nom_commune", "tri_des_adresses")

charsToReplace = ('.','_'),(' ',''),('(',''),(')',''),('{',''),('}',''),('/','')

def renameColumn(column):
    return reduce(lambda string, kv : string.replace(*kv), charsToReplace, column)

for columnName in df.schema.names:
    df = df.withColumnRenamed(columnName, renameColumn(columnName))

In [0]:
df = df.withColumn("numero_de_voie", df["numero_de_voie"].cast(IntegerType()))  \
.withColumn("nombre_de_logements", df["nombre_de_logements"].cast(IntegerType()))  \
.withColumn("consommation_annuelle_totale_de_l_adresse_mwh", df["consommation_annuelle_totale_de_l_adresse_mwh"].cast(DecimalType()))  \
.withColumn("consommation_annuelle_moyenne_par_site_de_l_adresse_mwh", df["consommation_annuelle_moyenne_par_site_de_l_adresse_mwh"].cast(DecimalType()))  \
.withColumn("consommation_annuelle_moyenne_de_la_commune_mwh", df["consommation_annuelle_moyenne_de_la_commune_mwh"].cast(DecimalType()))

**Write File to Unity Catalog**

In [0]:
df.write.mode("overwrite").saveAsTable("electric.default.consommations")

In [0]:
%sql 

SELECT * FROM electric.default.consommations;

annee,numero_de_voie,type_de_voie,libelle_de_voie,code_commune,segment_de_client,nombre_de_logements,consommation_annuelle_totale_de_l_adresse_mwh,consommation_annuelle_moyenne_par_site_de_l_adresse_mwh,consommation_annuelle_moyenne_de_la_commune_mwh,adresse
2019,2,SQUARE,BUGEAUD,92048,RESIDENTIEL,35,63,2,3,2 SQUARE BUGEAUD
2019,16,RUE,CHARLES INFROIT,92048,RESIDENTIEL,37,136,4,3,16 RUE CHARLES INFROIT
2019,12,RUE,CHARLES INFROIT,92048,RESIDENTIEL,37,67,2,3,12 RUE CHARLES INFROIT
2019,44,RUE,D ARTHELON,92048,RESIDENTIEL,30,39,1,3,44 RUE D ARTHELON
2019,42,RUE,D ARTHELON,92048,RESIDENTIEL,21,5,0,3,42 RUE D ARTHELON
2019,10,AVENUE,DE CELLE,92048,RESIDENTIEL,23,29,1,3,10 AVENUE DE CELLE
2019,5,AVENUE,DE CELLE,92048,RESIDENTIEL,20,47,2,3,5 AVENUE DE CELLE
2019,69,RUE,DE LA BELGIQUE,92048,RESIDENTIEL,11,19,2,3,69 RUE DE LA BELGIQUE
2019,19,RUE,DE LA BOURGOGNE,92048,RESIDENTIEL,13,35,3,3,19 RUE DE LA BOURGOGNE
2019,52,ALLEE,DE LA FORET,92048,RESIDENTIEL,19,27,1,3,52 ALLEE DE LA FORET


# Test Overwrite On Files

## Test Overwrite

Let's see what happens if I load same data twice in the same table 

In [0]:
%sql 

USE CATALOG electric;
DROP TABLE IF EXISTS consommations_filtered;

In [0]:
df_filtered = spark.read.table("electric.default.consommations")

df_filtered.filter(df_filtered.annee == 2019).write.mode("overwrite").saveAsTable("electric.default.consommations_filtered")

In [0]:
df_filtered.filter(df_filtered.annee == 2019).count()

Out[8]: 399791

In [0]:
%sql 

USE CATALOG electric;
DESCRIBE HISTORY consommations_filtered;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,2023-07-12T07:42:04.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(2536255942183608),0710-142838-jmag6q3d,,WriteSerializable,False,"Map(numFiles -> 7, numOutputRows -> 399791, numOutputBytes -> 8986831)",,Databricks-Runtime/12.2.x-scala2.12


In [0]:
%sql 

SELECT DISTINCT annee, COUNT(1) FROM consommations_filtered GROUP BY annee; -- 399 791

annee,count(1)
2019,399791


In [0]:
df_filtered = spark.read.table("electric.default.consommations")

df_filtered.filter(df_filtered.annee == 2019).write.mode("overwrite").saveAsTable("electric.default.consommations_filtered")

In [0]:
%sql 

USE CATALOG electric;
DESCRIBE HISTORY consommations_filtered;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
1,2023-07-12T07:45:23.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(2536255942183608),0710-142838-jmag6q3d,0.0,WriteSerializable,False,"Map(numFiles -> 3, numOutputRows -> 399791, numOutputBytes -> 8535606)",,Databricks-Runtime/12.2.x-scala2.12
0,2023-07-12T07:42:04.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(2536255942183608),0710-142838-jmag6q3d,,WriteSerializable,False,"Map(numFiles -> 7, numOutputRows -> 399791, numOutputBytes -> 8986831)",,Databricks-Runtime/12.2.x-scala2.12


In [0]:
%sql 

SELECT DISTINCT annee, COUNT(1) FROM consommations_filtered GROUP BY annee; -- 399 791

annee,count(1)
2019,399791


In [0]:
df_filtered = spark.read.table("electric.default.consommations")

df_filtered.filter(df_filtered.annee == 2020).write.mode("overwrite").saveAsTable("electric.default.consommations_filtered")

In [0]:
%sql 

SELECT DISTINCT annee, COUNT(1) FROM consommations_filtered GROUP BY annee; -- 405 605

annee,count(1)
2020,405605


In [0]:
%sql 

USE CATALOG electric;
DESCRIBE HISTORY consommations_filtered;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
2,2023-07-12T07:47:01.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(2536255942183608),0710-142838-jmag6q3d,1.0,WriteSerializable,False,"Map(numFiles -> 3, numOutputRows -> 405605, numOutputBytes -> 8705455)",,Databricks-Runtime/12.2.x-scala2.12
1,2023-07-12T07:45:23.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(2536255942183608),0710-142838-jmag6q3d,0.0,WriteSerializable,False,"Map(numFiles -> 3, numOutputRows -> 399791, numOutputBytes -> 8535606)",,Databricks-Runtime/12.2.x-scala2.12
0,2023-07-12T07:42:04.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(2536255942183608),0710-142838-jmag6q3d,,WriteSerializable,False,"Map(numFiles -> 7, numOutputRows -> 399791, numOutputBytes -> 8986831)",,Databricks-Runtime/12.2.x-scala2.12


In [0]:
df_timestampOf =  spark.read.option("timestampAsOf", "2023-07-12T07:42:05.000+0000").table("electric.default.consommations_filtered")

In [0]:
df_timestampOf.count()

Out[20]: 399791

In [0]:
%sql 

USE CATALOG electric;
OPTIMIZE electric.default.consommations_filtered;

path,metrics
abfss://unity@adlsviguyonv.dfs.core.windows.net/353d6a9c-be25-42c0-90b5-9e78e1fad292/tables/6f9fc80a-c5ff-46bb-bacc-a07585d19b98,"List(1, 3, List(8275412, 8275412, 8275412.0, 1, 8275412), List(1430576, 4084266, 2901818.3333333335, 3, 8705455), 0, null, 1, 3, 0, true, 0, 0, 1689148169611, 1689148177681, 8, 1, null, List(0, 0), 11, 11, 2735)"


In [0]:
%sql 

USE CATALOG electric;
DESCRIBE HISTORY consommations_filtered;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
3,2023-07-12T07:49:35.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,OPTIMIZE,"Map(predicate -> [], zOrderBy -> [], batchId -> 0, auto -> false)",,List(2536255942183608),0710-142838-jmag6q3d,2.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 3, numRemovedBytes -> 8705455, p25FileSize -> 8275412, numDeletionVectorsRemoved -> 0, minFileSize -> 8275412, numAddedFiles -> 1, maxFileSize -> 8275412, p75FileSize -> 8275412, p50FileSize -> 8275412, numAddedBytes -> 8275412)",,Databricks-Runtime/12.2.x-scala2.12
2,2023-07-12T07:47:01.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(2536255942183608),0710-142838-jmag6q3d,1.0,WriteSerializable,False,"Map(numFiles -> 3, numOutputRows -> 405605, numOutputBytes -> 8705455)",,Databricks-Runtime/12.2.x-scala2.12
1,2023-07-12T07:45:23.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(2536255942183608),0710-142838-jmag6q3d,0.0,WriteSerializable,False,"Map(numFiles -> 3, numOutputRows -> 399791, numOutputBytes -> 8535606)",,Databricks-Runtime/12.2.x-scala2.12
0,2023-07-12T07:42:04.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(2536255942183608),0710-142838-jmag6q3d,,WriteSerializable,False,"Map(numFiles -> 7, numOutputRows -> 399791, numOutputBytes -> 8986831)",,Databricks-Runtime/12.2.x-scala2.12



##Vacuum

To query a previous table version, you must retain both the log and the data files for that version.

Data files are deleted when VACUUM runs against a table. Delta Lake manages log file removal automatically after checkpointing table versions.

Because most Delta tables have VACUUM run against them regularly, point-in-time queries should respect the retention threshold for VACUUM, which is 7 days by default.

In [0]:
%sql 
VACUUM electric.default.consommations_filtered;

path
abfss://unity@adlsviguyonv.dfs.core.windows.net/353d6a9c-be25-42c0-90b5-9e78e1fad292/tables/6f9fc80a-c5ff-46bb-bacc-a07585d19b98



##Restore

In [0]:
%sql

RESTORE TABLE consommations_filtered TO VERSION AS OF 0;

table_size_after_restore,num_of_files_after_restore,num_removed_files,num_restored_files,removed_files_size,restored_files_size
8986831,7,1,7,8275412,8986831


In [0]:
%sql 

SELECT COUNT(1) FROM consommations_filtered;

count(1)
399791


In [0]:
%sql 

DESCRIBE HISTORY consommations_filtered;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
6,2023-07-12T07:58:00.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,RESTORE,"Map(version -> 0, timestamp -> null)",,List(2536255942183608),0710-142838-jmag6q3d,5.0,Serializable,False,"Map(numRestoredFiles -> 7, removedFilesSize -> 8275412, numRemovedFiles -> 1, restoredFilesSize -> 8986831, numOfFilesAfterRestore -> 7, tableSizeAfterRestore -> 8986831)",,Databricks-Runtime/12.2.x-scala2.12
5,2023-07-12T07:52:37.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,VACUUM END,Map(status -> COMPLETED),,List(2536255942183608),0710-142838-jmag6q3d,4.0,SnapshotIsolation,True,"Map(numDeletedFiles -> 0, numVacuumedDirectories -> 1)",,Databricks-Runtime/12.2.x-scala2.12
4,2023-07-12T07:52:34.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,VACUUM START,"Map(retentionCheckEnabled -> true, defaultRetentionMillis -> 604800000)",,List(2536255942183608),0710-142838-jmag6q3d,3.0,SnapshotIsolation,True,"Map(numFilesToDelete -> 0, sizeOfDataToDelete -> 0)",,Databricks-Runtime/12.2.x-scala2.12
3,2023-07-12T07:49:35.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,OPTIMIZE,"Map(predicate -> [], zOrderBy -> [], batchId -> 0, auto -> false)",,List(2536255942183608),0710-142838-jmag6q3d,2.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 3, numRemovedBytes -> 8705455, p25FileSize -> 8275412, numDeletionVectorsRemoved -> 0, minFileSize -> 8275412, numAddedFiles -> 1, maxFileSize -> 8275412, p75FileSize -> 8275412, p50FileSize -> 8275412, numAddedBytes -> 8275412)",,Databricks-Runtime/12.2.x-scala2.12
2,2023-07-12T07:47:01.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(2536255942183608),0710-142838-jmag6q3d,1.0,WriteSerializable,False,"Map(numFiles -> 3, numOutputRows -> 405605, numOutputBytes -> 8705455)",,Databricks-Runtime/12.2.x-scala2.12
1,2023-07-12T07:45:23.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(2536255942183608),0710-142838-jmag6q3d,0.0,WriteSerializable,False,"Map(numFiles -> 3, numOutputRows -> 399791, numOutputBytes -> 8535606)",,Databricks-Runtime/12.2.x-scala2.12
0,2023-07-12T07:42:04.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(2536255942183608),0710-142838-jmag6q3d,,WriteSerializable,False,"Map(numFiles -> 7, numOutputRows -> 399791, numOutputBytes -> 8986831)",,Databricks-Runtime/12.2.x-scala2.12


## Conclusion : 

If I load the same data with overwrite, I still twice data stored, but with only one version of the data. The last one.  

# Test Append data 

In [0]:
%sql 

USE CATALOG electric;
DROP TABLE IF EXISTS consommations_append;

In [0]:
df_src = spark.read.table("electric.default.consommations")

df_src.filter(df_src.annee == 2019).write.mode("append").saveAsTable("electric.default.consommations_append")

In [0]:
%sql 

SELECT DISTINCT annee, COUNT(1) FROM consommations_append GROUP BY annee;

annee,count(1)
2019,399791


In [0]:
%sql 

USE CATALOG electric;
DESCRIBE HISTORY consommations_append;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,2023-07-12T08:02:10.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,CREATE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(2536255942183608),0710-142838-jmag6q3d,,WriteSerializable,True,"Map(numFiles -> 7, numOutputRows -> 399791, numOutputBytes -> 8986831)",,Databricks-Runtime/12.2.x-scala2.12


In [0]:
df_src.filter(df_src.annee == 2020).write.mode("append").saveAsTable("electric.default.consommations_append")

In [0]:
%sql 

SELECT DISTINCT annee, COUNT(1) FROM consommations_append GROUP BY annee;

annee,count(1)
2020,405605
2019,399791


In [0]:
%sql 

USE CATALOG electric;
DESCRIBE HISTORY consommations_append;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
1,2023-07-12T08:02:38.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(2536255942183608),0710-142838-jmag6q3d,0.0,WriteSerializable,True,"Map(numFiles -> 3, numOutputRows -> 405605, numOutputBytes -> 8705455)",,Databricks-Runtime/12.2.x-scala2.12
0,2023-07-12T08:02:10.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,CREATE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(2536255942183608),0710-142838-jmag6q3d,,WriteSerializable,True,"Map(numFiles -> 7, numOutputRows -> 399791, numOutputBytes -> 8986831)",,Databricks-Runtime/12.2.x-scala2.12


In [0]:
df_countOne = spark.read.option("timestampOf", "2023-07-12T08:02:10.000+0000").table("electric.default.consommations_append")

df_countOne.count()

Out[35]: 805396

In [0]:
df_countOne = spark.read.option("versionOf", "0").table("electric.default.consommations_append")

df_countOne.count()

Out[36]: 805396

In [0]:
%sql 

USE CATALOG electric;
DESCRIBE HISTORY consommations_append;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
1,2023-07-11T20:04:26.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(2536255942183608),0710-142838-jmag6q3d,0.0,WriteSerializable,True,"Map(numFiles -> 3, numOutputRows -> 405605, numOutputBytes -> 8705455)",,Databricks-Runtime/12.2.x-scala2.12
0,2023-07-11T20:04:11.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,CREATE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(2536255942183608),0710-142838-jmag6q3d,,WriteSerializable,True,"Map(numFiles -> 7, numOutputRows -> 399791, numOutputBytes -> 8986831)",,Databricks-Runtime/12.2.x-scala2.12


In [0]:
%sql 

USE CATALOG electric;

OPTIMIZE consommations_append;
DESCRIBE HISTORY consommations_append;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
2,2023-07-12T08:08:39.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,OPTIMIZE,"Map(predicate -> [], zOrderBy -> [], batchId -> 0, auto -> false)",,List(2536255942183608),0710-142838-jmag6q3d,1.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 10, numRemovedBytes -> 17692286, p25FileSize -> 15942296, numDeletionVectorsRemoved -> 0, minFileSize -> 15942296, numAddedFiles -> 1, maxFileSize -> 15942296, p75FileSize -> 15942296, p50FileSize -> 15942296, numAddedBytes -> 15942296)",,Databricks-Runtime/12.2.x-scala2.12
1,2023-07-12T08:02:38.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(2536255942183608),0710-142838-jmag6q3d,0.0,WriteSerializable,True,"Map(numFiles -> 3, numOutputRows -> 405605, numOutputBytes -> 8705455)",,Databricks-Runtime/12.2.x-scala2.12
0,2023-07-12T08:02:10.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,CREATE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(2536255942183608),0710-142838-jmag6q3d,,WriteSerializable,True,"Map(numFiles -> 7, numOutputRows -> 399791, numOutputBytes -> 8986831)",,Databricks-Runtime/12.2.x-scala2.12


## Conclusion : 

If I load data to a dataframe with append, two versions are created, and data is added below, but version does not work.

Also, reoptimizing the dataframe once the append is done changes the storage.

# Test Merge Data

In [0]:
%sql 

USE CATALOG electric;
DROP TABLE IF EXISTS consommations_merge;
DROP TABLE IF EXISTS consommations_merge_temp;

In [0]:
from pyspark.sql.functions import sum

df_original = spark.read.table("electric.default.consommations") 

df_selected_columns = df_original.drop("numero_de_voie","libelle_de_voie","libelle_de_voie","segment_de_client","nombre_de_logements","consommation_annuelle_moyenne_par_site_de_l_adresse_mwh","consommation_annuelle_moyenne_de_la_commune_mwh", "adresse")

df_selected_columns.filter(df_selected_columns.annee == 2019).groupBy("annee", "type_de_voie","code_commune").agg(sum("consommation_annuelle_totale_de_l_adresse_mwh").alias("sum_conso")) \
  .write.mode("overwrite").saveAsTable("electric.default.consommations_merge")


# Ajouter une colonne d'insert date + modified date (premier run elles sont égales)

In [0]:
%sql 

SELECT COUNT(1) FROM consommations_merge

count(1)
18212


In [0]:
df_original = spark.read.table("electric.default.consommations") 

df_selected_columns = df_original.drop("numero_de_voie","libelle_de_voie","libelle_de_voie","segment_de_client","nombre_de_logements","consommation_annuelle_moyenne_par_site_de_l_adresse_mwh","consommation_annuelle_moyenne_de_la_commune_mwh", "adresse")

df_selected_columns.filter(df_selected_columns.annee == 2020).groupBy("annee", "type_de_voie","code_commune").agg(sum("consommation_annuelle_totale_de_l_adresse_mwh").alias("sum_conso")) \
  .write.mode("overwrite").saveAsTable("electric.default.consommations_merge_temp")


# idem, préparer la suite

In [0]:
%sql 

SELECT COUNT(1) FROM consommations_merge_temp

count(1)
18957


In [0]:
%sql 


SELECT * FROM consommations_merge_temp

annee,type_de_voie,code_commune,sum_conso
2020,VILLA,92004,395
2020,RUE,25031,3243
2020,AVENUE,8043,65
2020,AVENUE,92009,5887
2020,SENTIER,93032,172
2020,RUE,94017,18623
2020,RUE,78160,1980
2020,RUE,68056,981
2020,SQUARE,78118,403
2020,RUE,31100,44


In [0]:
%sql 
--DESCRIBE EXTENDED electric.default.consommations_merge
--DESCRIBE DETAIL electric.default.consommations_merge

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion,tableFeatures,statistics
delta,94208b6b-5039-46d2-ba99-15cb79145035,electric.default.consommations_merge,,abfss://unity@adlsviguyonv.dfs.core.windows.net/353d6a9c-be25-42c0-90b5-9e78e1fad292/tables/1567c331-ab80-4810-b16e-2b66030c3889,2023-07-11T20:34:27.960+0000,2023-07-11T20:34:29.000+0000,List(),1,137821,Map(),1,2,"List(appendOnly, invariants)",Map()


In [0]:
#from delta.tables import *

original = spark.read.table("electric.default.consommations_merge")
updates  = spark.read.table("electric.default.consommations_merge_temp")

dfUpdates = updates.toDF()

dfUpdates.show()

#original.alias('source').merge(
#    dfUpdates.alias('updates'),
#    'source.type_de_voie = updates.type_de_voie AND source.code_commune = updates.code_commune'
#  ) \
#  .whenMatchedUpdate(set =
#    {
#      "annee": "updates.annee",
#      "type_de_voie": "updates.type_de_voie",
#      "code_commune": "updates.code_commune",
#      "sum_conso":  "updates.sum_conso"
#    }
#  ) \
#  .whenNotMatchedInsert(values =
#    {
#      "annee": "updates.annee",
#      "type_de_voie": "updates.type_de_voie",
#      "code_commune": "updates.code_commune",
#      "sum_conso":  "updates.sum_conso"
#    }
#  ) \
#  .execute()

[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
File [0;32m<command-132560150536797>:6[0m
[1;32m      3[0m original [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mtable([38;5;124m"[39m[38;5;124melectric.default.consommations_merge[39m[38;5;124m"[39m)
[1;32m      4[0m updates  [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mtable([38;5;124m"[39m[38;5;124melectric.default.consommations_merge_temp[39m[38;5;124m"[39m)
[0;32m----> 6[0m dfUpdates [38;5;241m=[39m updates[38;5;241m.[39mtoDF()
[1;32m      8[0m dfUpdates[38;5;241m.[39mshow()

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m-

In [0]:
%sql

MERGE INTO electric.default.consommations_merge AS src
USING electric.default.consommations_merge_temp AS upd
  ON src.type_de_voie = upd.type_de_voie 
  AND src.code_commune = upd.code_commune
WHEN MATCHED THEN
  UPDATE SET
    annee = upd.annee,
    sum_conso = upd.sum_conso
    -- update la date de mise à jour 
WHEN NOT MATCHED THEN 
  INSERT (annee, type_de_voie, code_commune, sum_conso)
  VALUES (
    upd.annee,
    upd.type_de_voie,
    upd.code_commune,
    upd.sum_conso
    -- insert les deux dates 
  );



num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
18957,18504,0,453


In [0]:
%sql 

SELECT COUNT(1) FROM consommations_merge

count(1)
19678


In [0]:
%sql 

DESCRIBE HISTORY consommations_merge;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
2,2023-07-12T08:20:37.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,MERGE,"Map(predicate -> [""((type_de_voie#14513 = type_de_voie#14521) AND (code_commune#14514 = code_commune#14522))""], matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}], notMatchedBySourcePredicates -> [])",,List(2536255942183608),0710-142838-jmag6q3d,1.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 453, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, numTargetBytesAdded -> 148204, numTargetBytesRemoved -> 142566, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 18504, executionTimeMs -> 3356, numTargetRowsInserted -> 453, numTargetRowsMatchedDeleted -> 0, scanTimeMs -> 2060, numTargetRowsUpdated -> 18504, numOutputRows -> 19410, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 18957, numTargetFilesRemoved -> 1, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 1111)",,Databricks-Runtime/12.2.x-scala2.12
1,2023-07-12T08:19:08.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,MERGE,"Map(predicate -> [""((type_de_voie#13373 = type_de_voie#13377) AND (code_commune#13374 = code_commune#13378))""], matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}], notMatchedBySourcePredicates -> [])",,List(2536255942183608),0710-142838-jmag6q3d,0.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 721, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, numTargetBytesAdded -> 151168, numTargetBytesRemoved -> 137821, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 17491, executionTimeMs -> 5407, numTargetRowsInserted -> 1466, numTargetRowsMatchedDeleted -> 0, scanTimeMs -> 3395, numTargetRowsUpdated -> 17491, numOutputRows -> 19678, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 18957, numTargetFilesRemoved -> 1, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 1420)",,Databricks-Runtime/12.2.x-scala2.12
0,2023-07-12T08:10:30.000+0000,2438481468773153,admin@mngenvmcap135687.onmicrosoft.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(2536255942183608),0710-142838-jmag6q3d,,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 18212, numOutputBytes -> 137821)",,Databricks-Runtime/12.2.x-scala2.12


In [0]:
df_countOne = spark.read.option("versionOf", "0").table("electric.default.consommations_merge")

df_countOne.count()

Out[50]: 20131


# Write In external Tables

In [0]:
%sql 

USE CATALOG electric;

CREATE TABLE consommation_external
LOCATION 'abfss://electric@adlsviguyonv.dfs.core.windows.net/enriched/consommations_external'
AS SELECT * FROM consommations 

num_affected_rows,num_inserted_rows
