In [2]:
from pyspark.sql import SparkSession, types
import pyspark.sql.functions as F
import os
import logging
import os


# Setting up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Define explicit schemas for different file types
schema_nb_fer = types.StructType([
    types.StructField("JOUR", types.StringType(), True),
    types.StructField("CODE_STIF_TRNS", types.IntegerType(), True),
    types.StructField("CODE_STIF_RES", types.StringType(), True),
    types.StructField("CODE_STIF_ARRET", types.StringType(), True),
    types.StructField("LIBELLE_ARRET", types.StringType(), True),
    types.StructField("ID_REFA_LDA", types.StringType(), True),
    types.StructField("CATEGORIE_TITRE", types.StringType(), True),
    types.StructField("NB_VALD", types.StringType(), True),
    types.StructField("date", types.StringType(), True)
])

schema_profil = types.StructType([
    types.StructField("CODE_STIF_TRNS", types.IntegerType(), True),
    types.StructField("CODE_STIF_RES", types.StringType(), True),
    types.StructField("CODE_STIF_ARRET", types.StringType(), True),
    types.StructField("LIBELLE_ARRET", types.StringType(), True),
    types.StructField("ID_REFA_LDA", types.StringType(), True),
    types.StructField("CAT_JOUR", types.StringType(), True),
    types.StructField("TRNC_HORR_60", types.StringType(), True),
    types.StructField("pourc_validations", types.StringType(), True),
    types.StructField("date", types.StringType(), True)
])

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("VALIDATION MERGE") \
    .config("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem") \
    .config("spark.driver.extraJavaOptions", "-Dhadoop.home.dir=C:/ProgramData/hadoop-3.3.6") \
    .config("spark.executor.extraJavaOptions", "-Dhadoop.home.dir=C:/ProgramData/hadoop-3.3.6") \
    .config("spark.ui.port", "4050") \
    .getOrCreate()
def infer_delimiter(path):
    """Heuristically infer the delimiter of a file by reading the first line."""
    with open(path, 'r', encoding='utf-8') as file:
        first_line = file.readline()
    delimiter = ';' if ';' in first_line else ','
    logging.info(f"Inferred delimiter '{delimiter}' for file: {path}")
    return delimiter

def read_and_label_data(path, file_type, date_pattern):
    """Read data from CSV or TXT, add date column."""
    sep = infer_delimiter(path)  # Dynamically infer the delimiter
    schema = schema_nb_fer if 'NB_FER' in file_type else schema_profil

    # Read the file with the specified delimiter and headers
    df = spark.read.option("header", "true").option("delimiter", sep).schema(schema).csv(path)

    # Debug: Show schema to verify columns are read correctly
    df.printSchema()

    # Extract year from the filename and add as a column
    year = date_pattern.split('_')[0]  # Assuming the year is part of the filename like '2015'
    df = df.withColumn("date", F.lit(year))

    return df

def get_files(directory, pattern):
    """Retrieve files matching the pattern within directories."""
    matched_files = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            if pattern in file:
                matched_files.append((os.path.join(root, file), file))
    logging.info(f"Found {len(matched_files)} files with pattern '{pattern}' in directory: {directory}")
    return matched_files

# Set the base directory to the location of the histo_validation folder
base_dir = r"C:\Users\yskon\Desktop\python_av\Transport_App\ML_module\data\histo_validation"

# Retrieve files matching the patterns for 'NB_FER' and 'PROFIL'
nb_fer_files = get_files(base_dir, "NB_FER")
profil_files = get_files(base_dir, "PROFIL")

# Function to merge dataframes of the same type
def merge_files(files, file_type):
    merged_df = None
    for file_path, file_name in files:
        df = read_and_label_data(file_path, file_type, file_name)
        if merged_df is None:
            merged_df = df
        else:
            merged_df = merged_df.unionByName(df)
    return merged_df

# Merge and save the data for 'NB_FER' and 'PROFIL'
nb_fer_df = merge_files(nb_fer_files, 'NB_FER')
profil_df = merge_files(profil_files, 'PROFIL')

nb_fer_df.write.csv(r"D:\\merged_NB_FER.csv", header=True, mode="overwrite")
profil_df.write.csv(r"D:\\merged_PROFIL.csv", header=True, mode="overwrite")

# Stop the Spark session
spark.stop()


2024-05-10 16:55:46,290 - INFO - Found 15 files with pattern 'NB_FER' in directory: C:\Users\yskon\Desktop\python_av\Transport_App\ML_module\data\histo_validation
2024-05-10 16:55:46,292 - INFO - Found 15 files with pattern 'PROFIL' in directory: C:\Users\yskon\Desktop\python_av\Transport_App\ML_module\data\histo_validation
2024-05-10 16:55:46,294 - INFO - Inferred delimiter ';' for file: C:\Users\yskon\Desktop\python_av\Transport_App\ML_module\data\histo_validation\data-rf-2015\2015S1_NB_FER.csv
2024-05-10 16:55:46,322 - INFO - Inferred delimiter ';' for file: C:\Users\yskon\Desktop\python_av\Transport_App\ML_module\data\histo_validation\data-rf-2015\2015S2_NB_FER.csv
2024-05-10 16:55:46,351 - INFO - Inferred delimiter ',' for file: C:\Users\yskon\Desktop\python_av\Transport_App\ML_module\data\histo_validation\data-rf-2016\2016S1_NB_FER.txt
2024-05-10 16:55:46,381 - INFO - Inferred delimiter ',' for file: C:\Users\yskon\Desktop\python_av\Transport_App\ML_module\data\histo_validation\d

root
 |-- JOUR: string (nullable = true)
 |-- CODE_STIF_TRNS: integer (nullable = true)
 |-- CODE_STIF_RES: string (nullable = true)
 |-- CODE_STIF_ARRET: string (nullable = true)
 |-- LIBELLE_ARRET: string (nullable = true)
 |-- ID_REFA_LDA: string (nullable = true)
 |-- CATEGORIE_TITRE: string (nullable = true)
 |-- NB_VALD: string (nullable = true)
 |-- date: string (nullable = true)

root
 |-- JOUR: string (nullable = true)
 |-- CODE_STIF_TRNS: integer (nullable = true)
 |-- CODE_STIF_RES: string (nullable = true)
 |-- CODE_STIF_ARRET: string (nullable = true)
 |-- LIBELLE_ARRET: string (nullable = true)
 |-- ID_REFA_LDA: string (nullable = true)
 |-- CATEGORIE_TITRE: string (nullable = true)
 |-- NB_VALD: string (nullable = true)
 |-- date: string (nullable = true)

root
 |-- JOUR: string (nullable = true)
 |-- CODE_STIF_TRNS: integer (nullable = true)
 |-- CODE_STIF_RES: string (nullable = true)
 |-- CODE_STIF_ARRET: string (nullable = true)
 |-- LIBELLE_ARRET: string (nullable =

2024-05-10 16:55:46,546 - INFO - Inferred delimiter ',' for file: C:\Users\yskon\Desktop\python_av\Transport_App\ML_module\data\histo_validation\data-rf-2019\2019_S2_NB_FER.txt
2024-05-10 16:55:46,585 - INFO - Inferred delimiter ',' for file: C:\Users\yskon\Desktop\python_av\Transport_App\ML_module\data\histo_validation\data-rf-2020\2020_S1_NB_FER.txt
2024-05-10 16:55:46,625 - INFO - Inferred delimiter ',' for file: C:\Users\yskon\Desktop\python_av\Transport_App\ML_module\data\histo_validation\data-rf-2020\2020_S2_NB_FER.txt
2024-05-10 16:55:46,656 - INFO - Inferred delimiter ',' for file: C:\Users\yskon\Desktop\python_av\Transport_App\ML_module\data\histo_validation\data-rf-2021\2021_S1_NB_FER.txt
2024-05-10 16:55:46,681 - INFO - Inferred delimiter ',' for file: C:\Users\yskon\Desktop\python_av\Transport_App\ML_module\data\histo_validation\data-rf-2021\2021_S2_NB_FER.txt
2024-05-10 16:55:46,708 - INFO - Inferred delimiter ',' for file: C:\Users\yskon\Desktop\python_av\Transport_App\ML

root
 |-- JOUR: string (nullable = true)
 |-- CODE_STIF_TRNS: integer (nullable = true)
 |-- CODE_STIF_RES: string (nullable = true)
 |-- CODE_STIF_ARRET: string (nullable = true)
 |-- LIBELLE_ARRET: string (nullable = true)
 |-- ID_REFA_LDA: string (nullable = true)
 |-- CATEGORIE_TITRE: string (nullable = true)
 |-- NB_VALD: string (nullable = true)
 |-- date: string (nullable = true)

root
 |-- JOUR: string (nullable = true)
 |-- CODE_STIF_TRNS: integer (nullable = true)
 |-- CODE_STIF_RES: string (nullable = true)
 |-- CODE_STIF_ARRET: string (nullable = true)
 |-- LIBELLE_ARRET: string (nullable = true)
 |-- ID_REFA_LDA: string (nullable = true)
 |-- CATEGORIE_TITRE: string (nullable = true)
 |-- NB_VALD: string (nullable = true)
 |-- date: string (nullable = true)

root
 |-- JOUR: string (nullable = true)
 |-- CODE_STIF_TRNS: integer (nullable = true)
 |-- CODE_STIF_RES: string (nullable = true)
 |-- CODE_STIF_ARRET: string (nullable = true)
 |-- LIBELLE_ARRET: string (nullable =

2024-05-10 16:55:46,763 - INFO - Inferred delimiter ';' for file: C:\Users\yskon\Desktop\python_av\Transport_App\ML_module\data\histo_validation\data-rf-2015\2015S1_PROFIL_FER.csv
2024-05-10 16:55:46,782 - INFO - Inferred delimiter ';' for file: C:\Users\yskon\Desktop\python_av\Transport_App\ML_module\data\histo_validation\data-rf-2015\2015S2_PROFIL_FER.csv
2024-05-10 16:55:46,806 - INFO - Inferred delimiter ',' for file: C:\Users\yskon\Desktop\python_av\Transport_App\ML_module\data\histo_validation\data-rf-2016\2016S1_PROFIL_FER.txt
2024-05-10 16:55:46,826 - INFO - Inferred delimiter ',' for file: C:\Users\yskon\Desktop\python_av\Transport_App\ML_module\data\histo_validation\data-rf-2016\2016S2_PROFIL_FER.txt
2024-05-10 16:55:46,848 - INFO - Inferred delimiter ',' for file: C:\Users\yskon\Desktop\python_av\Transport_App\ML_module\data\histo_validation\data-rf-2017\2017S1_PROFIL_FER.txt
2024-05-10 16:55:46,871 - INFO - Inferred delimiter ',' for file: C:\Users\yskon\Desktop\python_av\T

root
 |-- JOUR: string (nullable = true)
 |-- CODE_STIF_TRNS: integer (nullable = true)
 |-- CODE_STIF_RES: string (nullable = true)
 |-- CODE_STIF_ARRET: string (nullable = true)
 |-- LIBELLE_ARRET: string (nullable = true)
 |-- ID_REFA_LDA: string (nullable = true)
 |-- CATEGORIE_TITRE: string (nullable = true)
 |-- NB_VALD: string (nullable = true)
 |-- date: string (nullable = true)

root
 |-- CODE_STIF_TRNS: integer (nullable = true)
 |-- CODE_STIF_RES: string (nullable = true)
 |-- CODE_STIF_ARRET: string (nullable = true)
 |-- LIBELLE_ARRET: string (nullable = true)
 |-- ID_REFA_LDA: string (nullable = true)
 |-- CAT_JOUR: string (nullable = true)
 |-- TRNC_HORR_60: string (nullable = true)
 |-- pourc_validations: string (nullable = true)
 |-- date: string (nullable = true)

root
 |-- CODE_STIF_TRNS: integer (nullable = true)
 |-- CODE_STIF_RES: string (nullable = true)
 |-- CODE_STIF_ARRET: string (nullable = true)
 |-- LIBELLE_ARRET: string (nullable = true)
 |-- ID_REFA_LDA: 

2024-05-10 16:55:46,975 - INFO - Inferred delimiter ',' for file: C:\Users\yskon\Desktop\python_av\Transport_App\ML_module\data\histo_validation\data-rf-2020\2020_S1_PROFIL_FER.txt
2024-05-10 16:55:47,004 - INFO - Inferred delimiter ',' for file: C:\Users\yskon\Desktop\python_av\Transport_App\ML_module\data\histo_validation\data-rf-2020\2020_S2_PROFIL_FER.txt
2024-05-10 16:55:47,044 - INFO - Inferred delimiter ',' for file: C:\Users\yskon\Desktop\python_av\Transport_App\ML_module\data\histo_validation\data-rf-2021\2021_S1_PROFIL_FER.txt
2024-05-10 16:55:47,096 - INFO - Inferred delimiter ',' for file: C:\Users\yskon\Desktop\python_av\Transport_App\ML_module\data\histo_validation\data-rf-2021\2021_S2_PROFIL_FER.txt
2024-05-10 16:55:47,136 - INFO - Inferred delimiter ',' for file: C:\Users\yskon\Desktop\python_av\Transport_App\ML_module\data\histo_validation\data-rf-2022\2022_S1_PROFIL_FER.txt


root
 |-- CODE_STIF_TRNS: integer (nullable = true)
 |-- CODE_STIF_RES: string (nullable = true)
 |-- CODE_STIF_ARRET: string (nullable = true)
 |-- LIBELLE_ARRET: string (nullable = true)
 |-- ID_REFA_LDA: string (nullable = true)
 |-- CAT_JOUR: string (nullable = true)
 |-- TRNC_HORR_60: string (nullable = true)
 |-- pourc_validations: string (nullable = true)
 |-- date: string (nullable = true)

root
 |-- CODE_STIF_TRNS: integer (nullable = true)
 |-- CODE_STIF_RES: string (nullable = true)
 |-- CODE_STIF_ARRET: string (nullable = true)
 |-- LIBELLE_ARRET: string (nullable = true)
 |-- ID_REFA_LDA: string (nullable = true)
 |-- CAT_JOUR: string (nullable = true)
 |-- TRNC_HORR_60: string (nullable = true)
 |-- pourc_validations: string (nullable = true)
 |-- date: string (nullable = true)

root
 |-- CODE_STIF_TRNS: integer (nullable = true)
 |-- CODE_STIF_RES: string (nullable = true)
 |-- CODE_STIF_ARRET: string (nullable = true)
 |-- LIBELLE_ARRET: string (nullable = true)
 |-- ID

2024-05-10 16:55:47,170 - INFO - Inferred delimiter ';' for file: C:\Users\yskon\Desktop\python_av\Transport_App\ML_module\data\histo_validation\data-rf-2022\2022_S2_PROFIL_FER.txt


root
 |-- CODE_STIF_TRNS: integer (nullable = true)
 |-- CODE_STIF_RES: string (nullable = true)
 |-- CODE_STIF_ARRET: string (nullable = true)
 |-- LIBELLE_ARRET: string (nullable = true)
 |-- ID_REFA_LDA: string (nullable = true)
 |-- CAT_JOUR: string (nullable = true)
 |-- TRNC_HORR_60: string (nullable = true)
 |-- pourc_validations: string (nullable = true)
 |-- date: string (nullable = true)

root
 |-- CODE_STIF_TRNS: integer (nullable = true)
 |-- CODE_STIF_RES: string (nullable = true)
 |-- CODE_STIF_ARRET: string (nullable = true)
 |-- LIBELLE_ARRET: string (nullable = true)
 |-- ID_REFA_LDA: string (nullable = true)
 |-- CAT_JOUR: string (nullable = true)
 |-- TRNC_HORR_60: string (nullable = true)
 |-- pourc_validations: string (nullable = true)
 |-- date: string (nullable = true)



Py4JJavaError: An error occurred while calling o1058.csv.
: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:552)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:850)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
