In [0]:
!pip install snowflake-connector-python

In [0]:
import datetime
from delta.tables import DeltaTable
import pyspark.sql.functions as F

In [0]:
import snowflake.connector
import pandas as pd
from pyspark.dbutils import DBUtils

# Initialiser DBUtils
dbutils = DBUtils(spark)

# Récupérer les secrets depuis le scope
user = dbutils.secrets.get(scope="my_snowflake_scope", key="user")
password = dbutils.secrets.get(scope="my_snowflake_scope", key="password")
account = dbutils.secrets.get(scope="my_snowflake_scope", key="account")
warehouse = dbutils.secrets.get(scope="my_snowflake_scope", key="warehouse")
database = dbutils.secrets.get(scope="my_snowflake_scope", key="database")
schema = dbutils.secrets.get(scope="my_snowflake_scope", key="schema")
role = dbutils.secrets.get(scope="my_snowflake_scope", key="role")

# Connexion à Snowflake
connection = snowflake.connector.connect(
    user=user,
    password=password,
    account=account,
    warehouse=warehouse,
    database=database,
    schema=schema,
    role=role
)



In [0]:
from pyspark.dbutils import DBUtils

# Initialiser DBUtils
dbutils = DBUtils(spark)

# Récupérer les secrets depuis le scope
ADLS_STORAGE_ACCOUNT_NAME = dbutils.secrets.get(scope="adls_scope", key="storage_account_name")
ADLS_ACCOUNT_KEY = dbutils.secrets.get(scope="adls_scope", key="account_key")
ADLS_CONTAINER_NAME = dbutils.secrets.get(scope="adls_scope", key="container_name")
ADLS_FOLDER_PATH = dbutils.secrets.get(scope="adls_scope", key="folder_path")

# Construire l'URL Delta Source
DELTA_SOURCE = f"abfss://{ADLS_CONTAINER_NAME}@{ADLS_STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/{ADLS_FOLDER_PATH}"


In [0]:
# Define checkpoint file path
CHECKPOINT_FILE_PATH = "./delta_checkpoint.txt"

In [0]:
# Create Spark session
spark.conf.set(
    f"fs.azure.account.key.{ADLS_STORAGE_ACCOUNT_NAME}.dfs.core.windows.net",
    ADLS_ACCOUNT_KEY,
)
spark.conf.set("spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled", "true")

In [0]:
# Read the checkpoint file if it exists, otherwise set the LAST_READ_TIMESTAMP to 0
delta_table = DeltaTable.forPath(spark, DELTA_SOURCE)
try:
    with open(CHECKPOINT_FILE_PATH, "r") as checkpoint_file:
        LAST_READ_TIMESTAMP = checkpoint_file.read()
    print("Getting Latest timtestamp from checkpoint file: ")
except FileNotFoundError:
    print("Getting Latest timtestamp from delta history: ")
    LAST_READ_TIMESTAMP = (
        delta_table.history().select(F.min("timestamp").alias("timestamp")).collect()
    )
    LAST_READ_TIMESTAMP = str(LAST_READ_TIMESTAMP[0]["timestamp"])
     

Getting Latest timtestamp from delta history: 


In [0]:
print("LAST_READ_TIMESTAMP: ", LAST_READ_TIMESTAMP)

LAST_READ_TIMESTAMP:  2024-12-03 16:27:48


In [0]:
def insert_data(connection, competences, contrat_nom, societe_nom, formation_nom, experience_dur, salaire, type_de_trav):
    try:
        cursor = connection.cursor()

        # Insérer les compétences dans la table, en laissant l'auto-incrémentation gérer l'ID
        for competence_nom in competences:
            cursor.execute(f"""
                INSERT INTO my_project_schema.competence (NOM)
                SELECT '{competence_nom}'
                WHERE NOT EXISTS (
                    SELECT 1 FROM my_project_schema.competence WHERE nom = '{competence_nom}'
                );
            """)

        # Insérer les autres entités (contrat, société, formation, etc.)
        cursor.execute(f"""
            INSERT INTO my_project_schema.contrat (contrat_nom)
            SELECT '{contrat_nom}'
            WHERE NOT EXISTS (
                SELECT 1 FROM my_project_schema.contrat WHERE contrat_nom = '{contrat_nom}'
            );
        """)

        cursor.execute(f"""
            INSERT INTO my_project_schema.entreprise (nom)
            SELECT '{societe_nom}'
            WHERE NOT EXISTS (
                SELECT 1 FROM my_project_schema.entreprise WHERE nom = '{societe_nom}'
            );
        """)

        cursor.execute(f"""
            INSERT INTO my_project_schema.formation (nom)
            SELECT '{formation_nom}'
            WHERE NOT EXISTS (
                SELECT 1 FROM my_project_schema.formation WHERE nom = '{formation_nom}'
            );
        """)

        # Récupérer les IDs des entités
        cursor.execute(f"SELECT entreprise_id FROM my_project_schema.entreprise WHERE nom = '{societe_nom}'")
        entreprise_id = cursor.fetchone()[0]

        cursor.execute(f"SELECT formation_id FROM my_project_schema.formation WHERE nom = '{formation_nom}'")
        formation_id = cursor.fetchone()[0]

        cursor.execute(f"SELECT contrat_id FROM my_project_schema.contrat WHERE contrat_nom = '{contrat_nom}'")
        contrat_id = cursor.fetchone()[0]

        cursor.execute(f"SELECT TYPETRAV_ID FROM my_project_schema.type_trav WHERE nom = '{type_de_trav}'")
        type_trav_id = cursor.fetchone()[0]

        # Insérer l'offre
        query = """
            INSERT INTO my_project_schema.offre_fait (
                entreprise_id, experience_dur, formation_id, contrat_id, salaire, typetrav_id
            )
            SELECT %s, %s, %s, %s, %s, %s
        """
        cursor.execute(query, (entreprise_id, experience_dur, formation_id, contrat_id, salaire, type_trav_id))

        # Récupérer l'ID de l'offre insérée (l'ID le plus récent)
        cursor.execute("SELECT COALESCE(MAX(OFFRE_ID), 0) FROM my_project_schema.offre_fait")
        offre_id = cursor.fetchone()[0]

        # Insérer les associations entre l'offre et les compétences
        for competence_nom in competences:
            # Récupérer l'ID de la compétence
            cursor.execute(f"SELECT competence_id FROM my_project_schema.competence WHERE nom = '{competence_nom}'")
            competence_id = cursor.fetchone()[0]

            # Insérer l'association entre l'offre et la compétence
            cursor.execute(f"""
                INSERT INTO my_project_schema.offre_competence (offre_id, competence_id)
                SELECT {offre_id}, {competence_id}
                WHERE NOT EXISTS (
                    SELECT 1 FROM my_project_schema.offre_comp WHERE offre_id = {offre_id} AND competence_id = {competence_id}
                );
            """)

        # Commit des modifications
        connection.commit()

    except Exception as e:
        print(f"Erreur lors de l'insertion des données : {e}")
        connection.rollback()

    finally:
        cursor.close()


In [None]:
def insert_langue_association(connection, langues):
    try:
        cursor = connection.cursor()

        # Récupérer l'ID de l'offre la plus récemment insérée
        cursor.execute("SELECT COALESCE(MAX(OFFRE_ID), 0) FROM my_project_schema.offre_fait")
        offre_id = cursor.fetchone()[0]

        # Insérer les associations des langues avec l'offre
        for langue_nom in langues:
            cursor.execute(f"""
                SELECT langue_id FROM my_project_schema.langue WHERE nom = '{langue_nom}'
            """)
            langue_id = cursor.fetchone()[0]

            cursor.execute(f"""
                INSERT INTO my_project_schema.langue_offr (offre_id, langue_id)
                SELECT {offre_id}, {langue_id}
                WHERE NOT EXISTS (
                    SELECT 1 FROM my_project_schema.langue_offr WHERE offre_id = {offre_id} AND langue_id = {langue_id}
                );
            """)

        # Commit des modifications
        connection.commit()

    except Exception as e:
        print(f"Erreur lors de l'insertion des associations : {e}")
        connection.rollback()

    finally:
        cursor.close()


In [0]:
def display_delta_changes():
    delta_df = (
        spark.read.format("delta")
        .option("readChangeFeed", "true")
        .option("startingTimestamp", LAST_READ_TIMESTAMP)
        .load(DELTA_SOURCE)
    )

    if delta_df.count() != 0:
        excluded_columns = ["_change_type", "_commit_version", "_commit_timestamp"]
        selected_columns = [
            column for column in delta_df.columns if column not in excluded_columns
        ]
        print(f"Colonnes sélectionnées : {selected_columns}")
        
        # Filtrer les colonnes nécessaires et convertir en Pandas DataFrame
        pandas_df = delta_df.select(*selected_columns).toPandas()
        for index, row in pandas_df.iterrows():
    # Assurez-vous que les noms de colonnes existent dans le DataFrame
          # Extraire les paramètres de la ligne
         competences = row['competences']
         experience_demande = row['experience_demande']
         formation_requise = row['formation_requise']
         type_de_contrat = row['type_de_contrat']
         type_offre = row['type']
         salaire = row['salaire']
         societe = row['societe']

    # Imprimer les paramètres
         print("Données à insérer :")
         print(f"Compétences: {competences}, Expérience demandée: {experience_demande}, Formation requise: {formation_requise}, "
          f"Type de contrat: {type_de_contrat}, Type d'offre: {type_offre}, Salaire: {salaire}, Société: {societe}")
         if row['experience_demande']==None:
             row['experience_demande']=0
       
             
    
         insert_data(connection,row['competences'],row['type_de_contrat'],row['societe'],
                     row['formation_requise'], , row['experience_demande'],row['salaire'],row[ 'type'])
         insert_langue_association(connection ,row["langues"])
         updated_timestamp = delta_table.history().select(F.max("timestamp").alias("timestamp")).collect()[0]["timestamp"]
         updated_timestamp += datetime.timedelta(seconds=1)
         updated_timestamp = str(updated_timestamp)
         print("Updated timestamp: ", updated_timestamp)
         with open(CHECKPOINT_FILE_PATH, "w") as file:
            file.write(updated_timestamp)

    else:
        print("Aucune nouvelle donnée dans les fichiers Delta.")
        return None


In [0]:
display_delta_changes()

Colonnes sélectionnées : ['titre_du_poste', 'societe', 'competences', 'lieu', 'type_offre', 'durée', 'type_de_contrat', 'email', 'telephone', 'type', 'langues', 'salaire', 'date_de_debut', 'secteur_dactivite', 'experience_demande', 'formation_requise', 'avantages', 'site_web']
Données à insérer :
Compétences: ['Python' 'SQL' 'Apache Airflow' 'Talend' 'MongoDB' 'Cassandra' 'AWS'
 'Google Cloud'], Expérience demandée: None, Formation requise: Étudiant en informatique ou domaine connexe, Type de contrat: Stage PFE, Type d'offre: stage, Salaire: 1 200 EUR par mois, Société: DataTech Solutions
Erreur lors de l'insertion des données : 100038 (22018): Numeric value '1 200 EUR par mois' is not recognized
