Notebook permettant de charger un fichier parquet dans une table delta.

Initialization: true / false, permet de faire un init / run du chargement. Un init efface le paramètre qui stocke le timestamp du dernier chargement réussi et la table destination dans silver, avant de faire le chargement.

Input Path: chemin abfss du fichier source.

Output Path: chemin abfss de la table delta destination.

Pool: nom du pool utilisé par spark. Les pools sont définis au niveau du cluster dans Advanced Options -> Spark Config (= spark.scheduler.allocation.file /dbfs/FileStore/fairscheduler.xml).

Pipeline Name: Nom du pipeline exécutant le notebook.

Pipeline Run ID: ID du run du pipeline.

Data Factory Name: Nom de la ressource Data Factory exécutant le pipeline.

In [0]:
%run ../Utils/COMMON

In [0]:
from datetime import datetime

dbutils.widgets.text('input_file_path','abfss://bronze@stdatalake01dev.dfs.core.windows.net/QAD/','Input Path')
dbutils.widgets.text('output_delta_table','abfss://silver@stdatalake01dev.dfs.core.windows.net/QAD/', 'Output Path')
dbutils.widgets.text('pool_name', 'byod', 'Pool')
dbutils.widgets.text('init', 'false', 'Initialization')
# dbutils.widgets.dropdown("encoding", "UTF-8", ["UTF-8", "ISO-8859-1"], "Input File Encoding")
dbutils.widgets.text('pipeline_run_id', '0', 'Pipeline Run ID')
dbutils.widgets.text('pipeline_name', '', 'Pipeline Name')
dbutils.widgets.text('data_factory_name', '', 'Data Factory Name')
dbutils.widgets.text('folder_partition', 'year=*/month=*/day=*/time=*', 'Folder structure')
#dbutils.widgets.text('folder_partition', 'year=2021/month=09/day=15/time=1400', 'Folder structure')

start_date = datetime.utcnow()

init: bool = (dbutils.widgets.get('init') == 'true')
folder_partition: str = dbutils.widgets.get('folder_partition')

output_full_path: str = dbutils.widgets.get('output_delta_table')
output_container, output_storage_account_name, output_file_path = LIN_parse_datalake_url(output_full_path)
LIN_config_adls_gen2_connector(output_storage_account_name, use_service_principal = True)

input_full_path: str = dbutils.widgets.get('input_file_path')
input_container, input_storage_account_name, input_file_path = LIN_parse_datalake_url(input_full_path)
LIN_config_adls_gen2_connector(input_storage_account_name, use_service_principal = True)

pool_name: str = dbutils.widgets.get('pool_name')
#encoding: str = dbutils.widgets.get('encoding')
pipeline_run_id: str = dbutils.widgets.get('pipeline_run_id')
pipeline_name: str = dbutils.widgets.get('pipeline_name')
data_factory_name: str = dbutils.widgets.get('data_factory_name')
  
#monitor= LIN_Monitor(f'CSV_TO_DELTA', data_factory_name, pipeline_run_id, pipeline_name, LIN_get_source(input_full_path), LIN_Stage.SILVER)

In [0]:
if init :
  # Suppression des tables delta dans silver
  print(output_full_path)
  try : 
    dbutils.fs.ls(output_full_path)
    dbutils.fs.rm(output_full_path, True)
  except : 
    print(f"folder {output_full_path} doesn't exist")
  # Suppression des paramètres de dernier chargement pour tout recharger.
  LIN_delete_loading_parameter(input_storage_account_name, input_full_path)

In [0]:
try:
  print("start reading")

  print("parquet path:" + input_full_path+folder_partition + '/*.parquet')

  # Lecture du fichier et ajout des colonnes filePath, folderDatetime.
  df = (
      spark.read.option('basePath', input_full_path)
        .format("parquet").load(input_full_path+folder_partition+ '/*.parquet') 
        .withColumn('filePath', f.input_file_name())
        .withColumn('folderDatetime', f.to_timestamp(f.concat((f.col('year') * 10000 + f.col('month') * 100 + f.col('day')).cast('string'), f.lit(' '), f.format_string("%06d", f.col('time'))), 'yyyyMMdd HHmmss'))

  )

  df.printSchema()
  df.show()
  print("end reading")
  
  print("start load loading parameters ")
  # Récupérer la date du dernier chargement pour filtrer les répertoires à charger.
  last_load = LIN_get_loading_parameter(input_storage_account_name, input_full_path)
  if last_load:
    df = df.filter(f.col('folderDatetime') > last_load.strftime('%Y-%m-%d %H:%M:%S'))
  print("end load loading parameters ")

  # récupération des noms de fichiers source distincts, permettra de calculer la taille totale des fichier traités.
  spark.sparkContext.setLocalProperty("spark.scheduler.pool", pool_name)
  input_files = [r['filePath'] for r in df.select('filePath').distinct().collect()]
  input_files_size: int = sum([LIN_get_dbfs_path_size(af) for af in input_files])

  data_to_load = df.drop('filePath', 'year', 'month', 'day', 'time', 'folderDatetime')
  
  # Ajouter les colonnes de metadata
  data_to_load = LIN_add_metadata_cols(data_to_load)
  data_to_load = data_to_load.select([f.col(col).alias(col.replace(' ', '')) for col in data_to_load.columns])

  # Ajouter un accumulateur pour compter les lignes sans faire un count.
  accumulator_count = spark.sparkContext.accumulator(0)

  def LIN_add_to_accu(x):
    accumulator_count.add(1)
    return x

  rdd = data_to_load.rdd.map(lambda x: LIN_add_to_accu(x))
  currated_df = sqlContext.createDataFrame(rdd, data_to_load.schema)

  # Ajouter les colonnes de metadata
  #currated_df = LIN_add_metadata_cols(data_to_load)
  #currated_df = currated_df.select([f.col(col).alias(col.replace(' ', '')) for col in currated_df.columns])

  before_write_table_size: int = LIN_get_dbfs_path_size(output_full_path)
  
  try :
    # Définir le pool utilisé.
    spark.sparkContext.setLocalProperty("spark.scheduler.pool", pool_name)
    # Ecriture dans la table delta.
    currated_df.write.format("delta").mode("append").partitionBy("IngestionDate").save(output_full_path)
  
  except:
    # mettre log
    raise 
  after_write_table_size: int = LIN_get_dbfs_path_size(output_full_path)

  print("start set loading parameters ")
  # Si il n'y a pas d'erreur pour le chargement des repertoires, on met a jour le parametre du fichier.
  LIN_set_loading_parameter(input_storage_account_name, input_full_path, start_date)
  print("end set loading parameters ")

  #monitor.log_loading_metrics(LIN_Status.SUCCESS, input_full_path, output_full_path, start_date, datetime.utcnow(), '', accumulator_count.value, input_file_size=input_files_size, output_file_size=(after_write_table_size-before_write_table_size))

except:
  #monitor.log_loading_metrics(LIN_Status.FAILURE, input_full_path, output_full_path, start_date, datetime.utcnow(), LIN_get_error_message())
  raise 