# 2.1 Expérimentation avec l’algorithme approximative Nearest Neighbors (plus proches voisins approximatifs)

### 2.1.1 Faire une synthèse de l’article avec au minimum 2 pages et un maximum de 3 pages

non fait ici

### 2.1.2 Construire un classeur binaire capable de classer les tweets en deux classes : positive et négative, selon les 4 scénarios suivants du dataset :


In [1]:
data_file_csv = "./data/source.csv"

# creation session scpark:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Sentiment140_Load").getOrCreate()

##### Premiere partie chargement des datas et pretraitment

1: Chargement et on nettoie

les fonctions:

In [2]:
from pyspark.sql.functions import col, regexp_replace

#On charger le CSV
def load_csv(file_path):
    df = spark.read.csv(file_path,header=False,inferSchema=True)
    return df

def nettoie_df(df,aff=False):
    # on nomme les colones
    df = df.toDF("label", "id", "date", "flag", "user", "text")

    #on allege:
    df = df.select("id","label", "text")

    #on filtre pour
    #Nettoyer les données : suppression des mots non-pertinents, articles, urls.  
    df = df.filter(col("label").isin(0, 4))
    df = df.withColumn("text", regexp_replace(col("text"), r"http\S+", ""))
    df = df.withColumn("text", regexp_replace(col("text"), r"@\w+", ""))
    df = df.withColumn("text", regexp_replace(col("text"), r"[^a-z\s]", ""))
    df = df.withColumn("text", regexp_replace(col("text"), r"\s+", " "))
    if(aff):
        df.show(5, truncate=False)
    return df

execution:

In [3]:
df = load_csv(data_file_csv)
df_propre = nettoie_df(df)

ensuite on creer une version du df avec des labels en binaire:

fonction:

In [4]:
from pyspark.sql.functions import when

def build_df_label_bin(df):
    df = df.withColumn("label_bin", when(col("label") == 4, 1).otherwise(0)) # pour passer de 0-4 a 0-1
    return df.select("id","label_bin", "text")

execution:

In [5]:
df_label_bin = build_df_label_bin(df_propre)

2) On creer les scenarios

fonctions:

In [16]:
from pyspark.sql.functions import col, concat, explode, collect_list,coalesce, array
from pyspark.ml.feature import Tokenizer, NGram, HashingTF

num_features = 1<<18

def make_df_Tokenizer(df_label_bin):
    tok = Tokenizer(inputCol="text", outputCol="tokens") #conversion en tokens
    df_tok = tok.transform(df_label_bin)
    return df_tok

def make_df_NGram(df,ngram_n=2,outputCol="ngrams",inputCol="tokens"):
    ng = NGram(n=ngram_n, inputCol=inputCol, outputCol=outputCol)#creation des n grams
    df_ng = ng.transform(df)
    return df_ng

def make_s1_words(df):
    df_tok = make_df_Tokenizer(df)
    tf = HashingTF(inputCol="tokens", outputCol="features", numFeatures=num_features, binary=True)
    return tf.transform(df_tok).select("id","label_bin", "features")

def make_s2_ngrams(df, ngram_n=2):
    df_tok = make_df_Tokenizer(df)
    df_ng = make_df_NGram(df_tok,ngram_n)
    tf = HashingTF(inputCol="ngrams", outputCol="features", numFeatures=num_features, binary=True)
    return tf.transform(df_ng).select("id","label_bin", "features")

def make_s3_patterns(df, ngram_n=2, topN=200000):
    df_tok = make_df_Tokenizer(df)
    df_pat0 = make_df_NGram(df_tok, ngram_n, "patterns")

    top = (df_pat0 #on creer le top
        .select(explode("patterns").alias("p"))
        .groupBy("p").count()
        .orderBy(col("count").desc())
        .limit(topN)
        .select("p")
    )

    df_pat = (df_pat0 #on filtre selon le top
        .withColumn("p", explode("patterns"))
        .join(top, on="p", how="left_semi")
        .groupBy("id", "label_bin")
        .agg(collect_list("p").alias("patterns_f"))
    )

    tf = HashingTF(inputCol="patterns_f", outputCol="features", numFeatures=num_features, binary=True)
    return tf.transform(df_pat).select("id","label_bin", "features")

def make_s4_combo(df, ngram_n=2, topN=200000):
    #on refait les 3 senarios 
    df_tok = make_df_Tokenizer(df)

    df_ng = make_df_NGram(df_tok, ngram_n, "ngrams")

    df_pat0 = make_df_NGram(df_tok, ngram_n, "patterns")

    top = (df_pat0
        .select(explode("patterns").alias("p"))
        .groupBy("p").count()
        .orderBy(col("count").desc())
        .limit(topN)
        .select("p")
    )

    df_pat = (df_pat0
        .select("id", "label_bin", "patterns")
        .withColumn("p", explode("patterns"))
        .join(top, on="p", how="left_semi")
        .groupBy("id", "label_bin")
        .agg(collect_list("p").alias("patterns_f"))
    )

    df_all = (df_ng
        .select("id", "label_bin", "tokens", "ngrams")
        .join(df_pat.select("id", "patterns_f"), on="id", how="left")
        .withColumn("patterns_f", coalesce(col("patterns_f"), array()))
    )

    df_combo = df_all.withColumn("combo", concat(col("tokens"), col("ngrams"), col("patterns_f")))

    tf = HashingTF(inputCol="combo", outputCol="features", numFeatures=num_features, binary=True)
    return tf.transform(df_combo).select("id","label_bin", "features")



3) Construction d’un classifieur binaire pour la classification des tweets

L’objectif est de construire un classifieur binaire capable de distinguer les tweets positifs et négatifs.
Dans cette section, nous définissons l’ensemble des fonctions utilitaires qui seront utilisées dans les parties 2.1.2 à 2.1.9. Ces fonctions permettront de structurer le travail, de factoriser le code et de faciliter les expérimentations ultérieures.

Nous commençons par définir une fonction permettant de séparer le jeu de données en ensembles d’entraînement et de test.

Nous ajoutons également une fonction permettant de réduire la taille du dataset,pour la futur question 2.1.7 scalabilité

In [7]:
# on split
def split_train_test(df_feat, train_ratio=0.8, cache=True):
    train_df, test_df = df_feat.randomSplit([train_ratio, 1 - train_ratio])
    if cache:
        train_df = train_df.cache()
        test_df  = test_df.cache()
        train_df.count()
        test_df.count()
    return train_df, test_df

#on limite
def limit_dataset_size(df, ratio=1.0):
    if ratio >= 1.0:
        return df
    ratio = max(0,ratio)
    return df.sample(withReplacement=False, fraction=ratio)

ensuite on creer les fonctions qui vont faire l'Entraînement (MinHashLSH):

In [8]:
from pyspark.ml.feature import MinHashLSH
import time
# pour les focntions suivantes
#  On recupere la construction dans l'exemple: LSHMinHash_datasetNetFlix.ipynb

def fit_lsh(train_df, numHashTables, measure_time=False):
    mh = MinHashLSH(
        inputCol="features",
        outputCol="hashes",
        numHashTables=numHashTables
    )
    if(measure_time):
        start = time.perf_counter()
    
    lsh_model = mh.fit(train_df)
    lsh_model.transform(train_df).count() # test pas final pour voir si ca change le temps d'execution
    
    if measure_time: #c'est dans les cas plus tard ou on voudra le temps 
        end = time.perf_counter()
        return lsh_model, float(end - start)
    return lsh_model

def train_models_for_scenario(df_feat, numHashTables_list=(128, 250), train_ratio=0.8, cache=True, measure_time=False):
    train_df, test_df = split_train_test(df_feat, train_ratio=train_ratio, cache=cache)
    models = {}
    times  = {}
    for nht in numHashTables_list:
        if measure_time:
            model, t = fit_lsh(train_df, numHashTables=int(nht), measure_time=True)
            models[int(nht)] = model
            times[int(nht)]  = t
        else:
            models[int(nht)] = fit_lsh(train_df, numHashTables=int(nht), measure_time=False)

    if measure_time:
            return models, train_df, test_df, times
    return models, train_df, test_df

Fonctions pour prédiction AkNN + vote:

In [19]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

#Prédit en batch pour tout test_df via aevc vote majoritaire.
def predict_knn(lsh_model, train_df, test_df, k, threshold=1.0):
    pairs = lsh_model.approxSimilarityJoin(
        test_df,
        train_df,
        threshold=threshold,
        distCol="JaccardDist"
    )

    neigh = pairs.select(
        F.col("datasetA.id").alias("test_id"),
        F.col("datasetA.label_bin").alias("true_label"),
        F.col("datasetB.label_bin").alias("neighbor_label"),
        F.col("JaccardDist")
    )

    w = Window.partitionBy("test_id").orderBy(F.col("JaccardDist").asc())
    topk = neigh.withColumn("rank", F.row_number().over(w)).filter(F.col("rank") <= k)

    votes = topk.groupBy("test_id").agg(
        F.first("true_label").alias("true_label"),
        F.avg("neighbor_label").alias("p_positive")
    )

    pred_df = votes.withColumn("prediction", (F.col("p_positive") >= 0.5).cast("int"))
    return pred_df


def evaluate_accuracy(pred_df):
    """
    Calcule l'accuracy de pred_df (test_id, true_label, prediction).
    """
    df_ok = pred_df.filter(F.col("true_label").isNotNull() & F.col("prediction").isNotNull())

    row = (df_ok
        .select((F.col("prediction") == F.col("true_label")).cast("int").alias("ok"))
        .agg(F.avg("ok").alias("acc"))
        .first()
    )
    acc = row["acc"]
    return float(acc) if acc is not None else 0.0

def evaluate_k_grid(lsh_model, train_df, test_df, k_list=(50, 100, 150, 200), threshold=1.0):
    """
    Évalue l'accuracy pour plusieurs valeurs de k.
    """
    results = {}
    for k in k_list:
        pred_df = predict_knn(lsh_model, train_df, test_df, k=k, threshold=threshold)
        results[k] = evaluate_accuracy(pred_df)
    return results

Ensuite pour la futur question 2.1.5 on va vouloir un tableau pour comparer a l'article donc

In [10]:
import pandas as pd

def collect_results_row(scenario_name, numHashTables, k, accuracy):
    return {"scenario": scenario_name,"numHashTables": int(numHashTables),"k": int(k),"accuracy": float(accuracy)}

def build_results_table(results_rows, as_pandas=True):
    """
    Construit le tableau de résultats.
    """
    df = pd.DataFrame(results_rows)
    if as_pandas:
        return df
    return spark.createDataFrame(df)

def format_table4_like(df_results):
    """
    Met en forme le tableau
    """
    return (df_results.pivot_table(index=["scenario", "numHashTables"],columns="k",values="accuracy").sort_index())

Fonction pour l'Histogramme pour la question 2.1.6

In [11]:
import matplotlib.pyplot as plt

def plot_training_histogram(df_results, group_by=("scenario","numHashTables"), value_col="train_time_seconds"):
    dfp = df_results.copy()
    dfp["group"] = dfp[list(group_by)].astype(str).agg(" | ".join, axis=1)

    agg = dfp.groupby("group")[value_col].mean().sort_values(ascending=False)

    plt.figure()
    agg.plot(kind="bar")
    plt.xlabel("Modèle (scénario | numHashTables)")
    plt.ylabel("Temps d'entraînement (s)")
    plt.xticks(rotation=45, ha="right")
    plt.tight_layout()
    plt.show()

Ensuite les fonctions pour la 2.1.7, la scalabilité

In [12]:
import matplotlib.pyplot as plt
import pandas as pd

def benchmark_scalability_vs_fraction(df_feat,scenario_name,numHashTables,F_list=(0.2, 0.4, 0.6, 0.8, 1.0),train_ratio=0.8,cache=True):
    rows = []
    for F in F_list:
        df_F = limit_dataset_size(df_feat, F=F)

        train_df, _ = split_train_test(df_F, train_ratio=train_ratio, cache=cache)

        _,t = fit_lsh(train_df, numHashTables=numHashTables,measure_time=True)

        rows.append({
            "scenario": scenario_name,
            "numHashTables": int(numHashTables),
            "F": float(F),
            "time_seconds": t
        })

    return pd.DataFrame(rows)

#Plot Figure 3
def plot_scalability_fraction(df_scaling, hue="scenario"):
    plt.figure()

    if hue in df_scaling.columns:
        for key, g in df_scaling.groupby(hue):
            g2 = g.groupby("F")["time_seconds"].mean().sort_index()
            plt.plot(g2.index, g2.values, marker="o", label=str(key))
        plt.legend()
    else:
        g2 = df_scaling.groupby("F")["time_seconds"].mean().sort_index()
        plt.plot(g2.index, g2.values, marker="o")

    plt.xlabel("F (fraction de la taille originale)")
    plt.ylabel("Temps d'entraînement (s)")
    plt.tight_layout()
    plt.show()

Pour la question 2.1.9, focntion simple:

In [13]:
def add_signatures(lsh_model, df_feat):
    """
    Ajoute la colonne 'hashes' (signature MinHash) au DataFrame de features.
    """
    return lsh_model.transform(df_feat)

def show_signature_examples(df_with_hashes, n=5, cols=("id", "label_bin", "hashes")):
    df_with_hashes.select(*cols).show(n, truncate=False)


### 2.1.3 Entrainer simplement chacun des 4 classeurs sans validation croisée. Le réglage des paramètres de l’algorithme approxNearestNeighbos sera uniquement sur le nombre de fonctions de hachages « numHashTables » du MinHash. Utiliser une plage de 2 valeurs maximum, par exemple {128, 250} afin de trouver la bonne longueur de la meilleure signature (réduction de dimensionnalité) 

In [None]:
NUM_HASH_LIST = (128, 250)

# quand on est en local pour pas detruire mon ordi
df_small = limit_dataset_size(df_label_bin, ratio=0.001)

#Features (les 4 scénarios)
df_s1 = make_s1_words(df_small)
df_s2 = make_s2_ngrams(df_small)
df_s3 = make_s3_patterns(df_small)
df_s4 = make_s4_combo(df_small)

#Entraînement
models_s1, train_s1, test_s1 = train_models_for_scenario(df_s1, numHashTables_list=NUM_HASH_LIST, train_ratio=0.8, cache=True)
models_s2, train_s2, test_s2 = train_models_for_scenario(df_s2, numHashTables_list=NUM_HASH_LIST, train_ratio=0.8, cache=True)
models_s3, train_s3, test_s3 = train_models_for_scenario(df_s3, numHashTables_list=NUM_HASH_LIST, train_ratio=0.8, cache=True)
models_s4, train_s4, test_s4 = train_models_for_scenario(df_s4, numHashTables_list=NUM_HASH_LIST, train_ratio=0.8, cache=True)

print("OK: modèles entraînés pour S1..S4 avec numHashTables =", NUM_HASH_LIST)

Py4JJavaError: An error occurred while calling o2756.filter.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:789)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:298)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:314)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:1116)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:798)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:838)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:988)
	at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.liftedTree1$1(InMemoryCatalog.scala:122)
	at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.createDatabase(InMemoryCatalog.scala:119)
	at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:160)
	at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:141)
	at org.apache.spark.sql.internal.BaseSessionStateBuilder.$anonfun$catalog$1(BaseSessionStateBuilder.scala:163)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.externalCatalog$lzycompute(SessionCatalog.scala:129)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.externalCatalog(SessionCatalog.scala:129)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.databaseExists(SessionCatalog.scala:335)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.isPersistentFunction(SessionCatalog.scala:1975)
	at org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.functionExists(V2SessionCatalog.scala:489)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$17.applyOrElse(Analyzer.scala:2090)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$17.applyOrElse(Analyzer.scala:2079)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:475)
	at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1257)
	at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1256)
	at org.apache.spark.sql.catalyst.expressions.BinaryExpression.mapChildren(Expression.scala:683)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:475)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:475)
	at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1259)
	at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1256)
	at org.apache.spark.sql.catalyst.expressions.BinaryExpression.mapChildren(Expression.scala:683)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:475)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsDownWithPruning$1(QueryPlan.scala:185)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:226)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:226)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:238)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$5(QueryPlan.scala:249)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:312)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:249)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDownWithPruning(QueryPlan.scala:185)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsWithPruning(QueryPlan.scala:156)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveExpressionsDownWithPruning$1.applyOrElse(AnalysisHelper.scala:307)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveExpressionsDownWithPruning$1.applyOrElse(AnalysisHelper.scala:306)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:200)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:200)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:416)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:198)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:194)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:100)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:97)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveExpressionsDownWithPruning(AnalysisHelper.scala:306)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveExpressionsDownWithPruning$(AnalysisHelper.scala:303)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveExpressionsDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveExpressionsWithPruning(AnalysisHelper.scala:278)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveExpressionsWithPruning$(AnalysisHelper.scala:276)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveExpressionsWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$.apply(Analyzer.scala:2079)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$.apply(Analyzer.scala:2075)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:242)
	at scala.collection.immutable.ArraySeq.foldLeft(ArraySeq.scala:222)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:239)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:231)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:231)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:340)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:336)
	at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:234)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:336)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:299)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:201)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:201)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.resolveInFixedPoint(HybridAnalyzer.scala:190)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$apply$1(HybridAnalyzer.scala:76)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.withTrackedAnalyzerBridgeState(HybridAnalyzer.scala:111)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.apply(HybridAnalyzer.scala:71)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:330)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:423)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:330)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$2(QueryExecution.scala:110)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:148)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:278)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:278)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:277)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$1(QueryExecution.scala:110)
	at scala.util.Try$.apply(Try.scala:217)
	at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
	at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)
	at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:121)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:80)
	at org.apache.spark.sql.classic.Dataset.<init>(Dataset.scala:250)
	at org.apache.spark.sql.classic.Dataset.<init>(Dataset.scala:262)
	at org.apache.spark.sql.classic.Dataset$.apply(Dataset.scala:99)
	at org.apache.spark.sql.classic.Dataset.withSameTypedPlan(Dataset.scala:2273)
	at org.apache.spark.sql.classic.Dataset.filter(Dataset.scala:926)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:840)
	Suppressed: org.apache.spark.util.Utils$OriginalTryStackTraceException: Full stacktrace of original doTryWithCallerStacktrace caller
		at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:789)
		at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:298)
		at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:314)
		at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:1116)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:798)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:838)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
		at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:988)
		at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.liftedTree1$1(InMemoryCatalog.scala:122)
		at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.createDatabase(InMemoryCatalog.scala:119)
		at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:160)
		at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:141)
		at org.apache.spark.sql.internal.BaseSessionStateBuilder.$anonfun$catalog$1(BaseSessionStateBuilder.scala:163)
		at org.apache.spark.sql.catalyst.catalog.SessionCatalog.externalCatalog$lzycompute(SessionCatalog.scala:129)
		at org.apache.spark.sql.catalyst.catalog.SessionCatalog.externalCatalog(SessionCatalog.scala:129)
		at org.apache.spark.sql.catalyst.catalog.SessionCatalog.databaseExists(SessionCatalog.scala:335)
		at org.apache.spark.sql.catalyst.catalog.SessionCatalog.isPersistentFunction(SessionCatalog.scala:1975)
		at org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.functionExists(V2SessionCatalog.scala:489)
		at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$17.applyOrElse(Analyzer.scala:2090)
		at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$17.applyOrElse(Analyzer.scala:2079)
		at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:470)
		at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
		at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
		at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:475)
		at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1257)
		at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1256)
		at org.apache.spark.sql.catalyst.expressions.BinaryExpression.mapChildren(Expression.scala:683)
		at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:475)
		at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:475)
		at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1259)
		at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1256)
		at org.apache.spark.sql.catalyst.expressions.BinaryExpression.mapChildren(Expression.scala:683)
		at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:475)
		at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsDownWithPruning$1(QueryPlan.scala:185)
		at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:226)
		at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
		at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:226)
		at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:238)
		at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$5(QueryPlan.scala:249)
		at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:312)
		at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:249)
		at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDownWithPruning(QueryPlan.scala:185)
		at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsWithPruning(QueryPlan.scala:156)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveExpressionsDownWithPruning$1.applyOrElse(AnalysisHelper.scala:307)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveExpressionsDownWithPruning$1.applyOrElse(AnalysisHelper.scala:306)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:200)
		at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:200)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:416)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:198)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:194)
		at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:37)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:100)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:97)
		at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:37)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveExpressionsDownWithPruning(AnalysisHelper.scala:306)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveExpressionsDownWithPruning$(AnalysisHelper.scala:303)
		at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveExpressionsDownWithPruning(LogicalPlan.scala:37)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveExpressionsWithPruning(AnalysisHelper.scala:278)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveExpressionsWithPruning$(AnalysisHelper.scala:276)
		at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveExpressionsWithPruning(LogicalPlan.scala:37)
		at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$.apply(Analyzer.scala:2079)
		at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$.apply(Analyzer.scala:2075)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:242)
		at scala.collection.immutable.ArraySeq.foldLeft(ArraySeq.scala:222)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:239)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:231)
		at scala.collection.immutable.List.foreach(List.scala:334)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:231)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:340)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:336)
		at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:234)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:336)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:299)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:201)
		at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:201)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.resolveInFixedPoint(HybridAnalyzer.scala:190)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$apply$1(HybridAnalyzer.scala:76)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.withTrackedAnalyzerBridgeState(HybridAnalyzer.scala:111)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.apply(HybridAnalyzer.scala:71)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:330)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:423)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:330)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$2(QueryExecution.scala:110)
		at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:148)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:278)
		at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:278)
		at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
		at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:277)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$1(QueryExecution.scala:110)
		at scala.util.Try$.apply(Try.scala:217)
		at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
		at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46)
		at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46)
		... 20 more
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:601)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:622)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:645)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:742)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:80)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1954)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1912)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1885)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$install$1(ShutdownHookManager.scala:194)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at scala.Option.fold(Option.scala:263)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:195)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:55)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:53)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:159)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala:63)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:250)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:99)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:379)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:961)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:204)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:227)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:96)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1132)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1141)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:521)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:492)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:569)
	... 27 more


### 2.1.4 Tester les classeurs obtenus et mesurer l’exactitude « accuracy » selon les différentes valeurs de k plus proches voisins : {50, 100, 150, 200}

In [None]:
K_LIST = (50, 100, 150, 200)

def eval_scenario(models, train_df, test_df, scenario_name):
    rows = []
    for nht, model in models.items():
        acc_by_k = evaluate_k_grid(model, train_df, test_df, k_list=K_LIST)
        for k, acc in acc_by_k.items():
            rows.append(collect_results_row(scenario_name, nht, k, acc))
    print(f"OK: {scenario_name} évalué")
    return rows

results_rows = []
results_rows += eval_scenario(models_s1, train_s1, test_s1, "S1_words")
results_rows += eval_scenario(models_s2, train_s2, test_s2, "S2_ngrams")
results_rows += eval_scenario(models_s3, train_s3, test_s3, "S3_patterns")
results_rows += eval_scenario(models_s4, train_s4, test_s4, "S4_combo")


Py4JJavaError: An error occurred while calling o2585.collectToPython.
: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] User defined function (`LSHModel$$Lambda$6516/0x00000155354f1bb8`: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => array<struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>) failed due to: java.lang.IllegalArgumentException: requirement failed: Must have at least 1 non zero entry.. SQLSTATE: 39000
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:195)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:402)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2484)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2505)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2524)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2549)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1057)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:417)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1056)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:469)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:182)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$4(SQLExecution.scala:322)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$3(SQLExecution.scala:320)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:316)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.IllegalArgumentException: requirement failed: Must have at least 1 non zero entry.
	at scala.Predef$.require(Predef.scala:337)
	at org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:64)
	at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:402)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	... 3 more


### 2.1.5 Tracer le tableau des performances de classification, à l’image de la table 4 de l’article,selon les différents modèles de classeurs obtenus

In [None]:
df_results = build_results_table(results_rows)     
df_table4  = format_table4_like(df_results)
df_table4