# Practica 2
***
Rodriguez Nuñez Diego Eduardo

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, when, trim, regexp_replace, udf, row_number, lit, coalesce
from pyspark.sql.types import IntegerType, FloatType, StringType, DateType
from pyspark.sql.window import Window
import os
import shutil
import tempfile
import glob

## Paso 1
Cargar los datos en un DataFrame de Spark

In [2]:
# Creamos la sesion de Spark
spark = SparkSession.builder.appName("Limpieza de datos").getOrCreate()

In [3]:
# Cargamos el dataset
df = spark.read.option("header", "true").csv("datos_sucios.csv", inferSchema=True)

In [4]:
df.show()

+----+------+----------+---------------+----------+-------------+
|  ID|   Nam|       Age|          Email| Join Date|       Salary|
+----+------+----------+---------------+----------+-------------+
| 1.0|  Juan|        25|  juan@mail.com|2021-06-15|         3000|
| 2.0| Maria|veintiséis| maria@mail.com|2022-07-20|         2500|
| 3.0|  NULL|        30|           NULL|06-08-2021|         4000|
| 4.0| Pedro|      NULL|     pedro@mail|2020/05/30|         5000|
| 5.0|   Ana|        22|   ana@mail.com|      NULL|         NULL|
| 5.0|   Ana|        22|   ana@mail.com|2022-07-20|         2500|
|NULL|Carlos|        40|carlos@mail.com|2019-12-01|Four thousand|
| 7.0|M4nuel|        35|manuel@mail.com|01-01-2021|         3.5K|
| 8.0|  Luis|   treinta|      luis@mail|2023-03-25|         6000|
| 9.0|   123|        29|      luis@mail|2023-03-25|          NaN|
+----+------+----------+---------------+----------+-------------+



## Paso 2
Renombrar columnas con errores tipográficos

In [5]:
# Obtenemos los nombres de las columnas
columnas = df.columns
print("Columnas originales: ",columnas)

Columnas originales:  ['ID', 'Nam', 'Age', 'Email', 'Join Date', 'Salary']


In [6]:
columnas_corregidas = {
    'Nam' : 'Name'
}

In [7]:
for col_err, col_corr in columnas_corregidas.items():
    if col_err in columnas:
        df = df.withColumnRenamed(col_err, col_corr)

In [8]:
df.show()

+----+------+----------+---------------+----------+-------------+
|  ID|  Name|       Age|          Email| Join Date|       Salary|
+----+------+----------+---------------+----------+-------------+
| 1.0|  Juan|        25|  juan@mail.com|2021-06-15|         3000|
| 2.0| Maria|veintiséis| maria@mail.com|2022-07-20|         2500|
| 3.0|  NULL|        30|           NULL|06-08-2021|         4000|
| 4.0| Pedro|      NULL|     pedro@mail|2020/05/30|         5000|
| 5.0|   Ana|        22|   ana@mail.com|      NULL|         NULL|
| 5.0|   Ana|        22|   ana@mail.com|2022-07-20|         2500|
|NULL|Carlos|        40|carlos@mail.com|2019-12-01|Four thousand|
| 7.0|M4nuel|        35|manuel@mail.com|01-01-2021|         3.5K|
| 8.0|  Luis|   treinta|      luis@mail|2023-03-25|         6000|
| 9.0|   123|        29|      luis@mail|2023-03-25|          NaN|
+----+------+----------+---------------+----------+-------------+



## Paso 3
Eliminar duplicados

In [9]:
# Contar nulos por fila
df = df.withColumn("null-count", sum([when(col(c).isNull(), 1).otherwise(0) for c in df.columns]))
df.show()

+----+------+----------+---------------+----------+-------------+----------+
|  ID|  Name|       Age|          Email| Join Date|       Salary|null-count|
+----+------+----------+---------------+----------+-------------+----------+
| 1.0|  Juan|        25|  juan@mail.com|2021-06-15|         3000|         0|
| 2.0| Maria|veintiséis| maria@mail.com|2022-07-20|         2500|         0|
| 3.0|  NULL|        30|           NULL|06-08-2021|         4000|         2|
| 4.0| Pedro|      NULL|     pedro@mail|2020/05/30|         5000|         1|
| 5.0|   Ana|        22|   ana@mail.com|      NULL|         NULL|         2|
| 5.0|   Ana|        22|   ana@mail.com|2022-07-20|         2500|         0|
|NULL|Carlos|        40|carlos@mail.com|2019-12-01|Four thousand|         1|
| 7.0|M4nuel|        35|manuel@mail.com|01-01-2021|         3.5K|         0|
| 8.0|  Luis|   treinta|      luis@mail|2023-03-25|         6000|         0|
| 9.0|   123|        29|      luis@mail|2023-03-25|          NaN|         0|

In [10]:
# Ordenar por ID, menor cantidad de nulos y priorizando ell ID mas bajo en caso de correos repetidos
windowSpec = Window.partitionBy("Email").orderBy(col("null-count"), col("ID"))

# Asignar un numero de fila a cada registro
df = df.withColumn("row_number", row_number().over(windowSpec))

# Filtrar solo los registros con row_number = 1
df = df.filter(col("row_number") == 1).drop("row_number", "null-count")

df.show()

+----+------+----------+---------------+----------+-------------+
|  ID|  Name|       Age|          Email| Join Date|       Salary|
+----+------+----------+---------------+----------+-------------+
| 3.0|  NULL|        30|           NULL|06-08-2021|         4000|
| 5.0|   Ana|        22|   ana@mail.com|2022-07-20|         2500|
|NULL|Carlos|        40|carlos@mail.com|2019-12-01|Four thousand|
| 1.0|  Juan|        25|  juan@mail.com|2021-06-15|         3000|
| 8.0|  Luis|   treinta|      luis@mail|2023-03-25|         6000|
| 7.0|M4nuel|        35|manuel@mail.com|01-01-2021|         3.5K|
| 2.0| Maria|veintiséis| maria@mail.com|2022-07-20|         2500|
| 4.0| Pedro|      NULL|     pedro@mail|2020/05/30|         5000|
+----+------+----------+---------------+----------+-------------+



## Paso 4
Reemplazar valores incorrectos

### Primero tratamos el ID faltante

In [11]:
# Obtener el ID maximo
df = df.orderBy("ID").withColumn("ID", when(col("ID").isNull(), lit(6)).otherwise(col("ID")))

df = df.orderBy("ID")

df.show()

+---+------+----------+---------------+----------+-------------+
| ID|  Name|       Age|          Email| Join Date|       Salary|
+---+------+----------+---------------+----------+-------------+
|1.0|  Juan|        25|  juan@mail.com|2021-06-15|         3000|
|2.0| Maria|veintiséis| maria@mail.com|2022-07-20|         2500|
|3.0|  NULL|        30|           NULL|06-08-2021|         4000|
|4.0| Pedro|      NULL|     pedro@mail|2020/05/30|         5000|
|5.0|   Ana|        22|   ana@mail.com|2022-07-20|         2500|
|6.0|Carlos|        40|carlos@mail.com|2019-12-01|Four thousand|
|7.0|M4nuel|        35|manuel@mail.com|01-01-2021|         3.5K|
|8.0|  Luis|   treinta|      luis@mail|2023-03-25|         6000|
+---+------+----------+---------------+----------+-------------+



### Tratamos el nombre M4nuel

In [12]:
df = df.withColumn("Name", regexp_replace(col("Name"), "M4nuel", "Manuel"))

df.show()

+---+------+----------+---------------+----------+-------------+
| ID|  Name|       Age|          Email| Join Date|       Salary|
+---+------+----------+---------------+----------+-------------+
|1.0|  Juan|        25|  juan@mail.com|2021-06-15|         3000|
|2.0| Maria|veintiséis| maria@mail.com|2022-07-20|         2500|
|3.0|  NULL|        30|           NULL|06-08-2021|         4000|
|4.0| Pedro|      NULL|     pedro@mail|2020/05/30|         5000|
|5.0|   Ana|        22|   ana@mail.com|2022-07-20|         2500|
|6.0|Carlos|        40|carlos@mail.com|2019-12-01|Four thousand|
|7.0|Manuel|        35|manuel@mail.com|01-01-2021|         3.5K|
|8.0|  Luis|   treinta|      luis@mail|2023-03-25|         6000|
+---+------+----------+---------------+----------+-------------+



### Llenamos los NULL con desconocido

In [13]:
df = df.fillna("Desconocido")

df.show()

+---+-----------+-----------+---------------+----------+-------------+
| ID|       Name|        Age|          Email| Join Date|       Salary|
+---+-----------+-----------+---------------+----------+-------------+
|1.0|       Juan|         25|  juan@mail.com|2021-06-15|         3000|
|2.0|      Maria| veintiséis| maria@mail.com|2022-07-20|         2500|
|3.0|Desconocido|         30|    Desconocido|06-08-2021|         4000|
|4.0|      Pedro|Desconocido|     pedro@mail|2020/05/30|         5000|
|5.0|        Ana|         22|   ana@mail.com|2022-07-20|         2500|
|6.0|     Carlos|         40|carlos@mail.com|2019-12-01|Four thousand|
|7.0|     Manuel|         35|manuel@mail.com|01-01-2021|         3.5K|
|8.0|       Luis|    treinta|      luis@mail|2023-03-25|         6000|
+---+-----------+-----------+---------------+----------+-------------+



### Tratamos las edades

In [14]:
df = df.withColumn("Age", when(col("Age") == 'veintiséis', lit(26)).when(col("Age") == 'treinta', lit(30)).otherwise(col("Age")))

df.show()

+---+-----------+-----------+---------------+----------+-------------+
| ID|       Name|        Age|          Email| Join Date|       Salary|
+---+-----------+-----------+---------------+----------+-------------+
|1.0|       Juan|         25|  juan@mail.com|2021-06-15|         3000|
|2.0|      Maria|         26| maria@mail.com|2022-07-20|         2500|
|3.0|Desconocido|         30|    Desconocido|06-08-2021|         4000|
|4.0|      Pedro|Desconocido|     pedro@mail|2020/05/30|         5000|
|5.0|        Ana|         22|   ana@mail.com|2022-07-20|         2500|
|6.0|     Carlos|         40|carlos@mail.com|2019-12-01|Four thousand|
|7.0|     Manuel|         35|manuel@mail.com|01-01-2021|         3.5K|
|8.0|       Luis|         30|      luis@mail|2023-03-25|         6000|
+---+-----------+-----------+---------------+----------+-------------+



In [15]:
df = df.withColumn("Age", when(col("Age")== 'Desconocido', None).otherwise(col("Age")))

df.show()

+---+-----------+----+---------------+----------+-------------+
| ID|       Name| Age|          Email| Join Date|       Salary|
+---+-----------+----+---------------+----------+-------------+
|1.0|       Juan|  25|  juan@mail.com|2021-06-15|         3000|
|2.0|      Maria|  26| maria@mail.com|2022-07-20|         2500|
|3.0|Desconocido|  30|    Desconocido|06-08-2021|         4000|
|4.0|      Pedro|NULL|     pedro@mail|2020/05/30|         5000|
|5.0|        Ana|  22|   ana@mail.com|2022-07-20|         2500|
|6.0|     Carlos|  40|carlos@mail.com|2019-12-01|Four thousand|
|7.0|     Manuel|  35|manuel@mail.com|01-01-2021|         3.5K|
|8.0|       Luis|  30|      luis@mail|2023-03-25|         6000|
+---+-----------+----+---------------+----------+-------------+



### Ahora tratamos la columna Salary para unificarlas en un mismo formato 🙏

In [16]:
df = df.withColumn("Salary", regexp_replace(col("Salary"), "Four thousand", "4000"))

df = df.withColumn("Salary", regexp_replace(col("Salary"), "3.5K", "3500"))

df.show()

+---+-----------+----+---------------+----------+------+
| ID|       Name| Age|          Email| Join Date|Salary|
+---+-----------+----+---------------+----------+------+
|1.0|       Juan|  25|  juan@mail.com|2021-06-15|  3000|
|2.0|      Maria|  26| maria@mail.com|2022-07-20|  2500|
|3.0|Desconocido|  30|    Desconocido|06-08-2021|  4000|
|4.0|      Pedro|NULL|     pedro@mail|2020/05/30|  5000|
|5.0|        Ana|  22|   ana@mail.com|2022-07-20|  2500|
|6.0|     Carlos|  40|carlos@mail.com|2019-12-01|  4000|
|7.0|     Manuel|  35|manuel@mail.com|01-01-2021|  3500|
|8.0|       Luis|  30|      luis@mail|2023-03-25|  6000|
+---+-----------+----+---------------+----------+------+



## Paso 5
Convertir tipos de datos

In [17]:
# Mostramos el esquema
df.printSchema()

root
 |-- ID: double (nullable = true)
 |-- Name: string (nullable = false)
 |-- Age: string (nullable = true)
 |-- Email: string (nullable = false)
 |-- Join Date: string (nullable = false)
 |-- Salary: string (nullable = false)



In [18]:
# Convertimos los tipos de datos
df = df.withColumn("ID", col("ID").cast(IntegerType()))
df = df.withColumn("Age", col("Age").cast(IntegerType()))
df = df.withColumn("Salary", col("Salary").cast(FloatType()))

df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = false)
 |-- Age: integer (nullable = true)
 |-- Email: string (nullable = false)
 |-- Join Date: string (nullable = false)
 |-- Salary: float (nullable = true)



## Paso 6 
Normalizar formatos de fecha

In [19]:
# Probar diferentes formatos de fecha y convertir a tipo DateType
df = df.withColumn('Join Date', coalesce(
    to_date(col('Join Date'), 'yyyy-MM-dd'),
    to_date(col('Join Date'), 'dd-MM-yyyy'),
    to_date(col('Join Date'), 'yyyy/MM/dd')
))

# Imprimir los tipos de datos actuales de cada columna
df.printSchema()


root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = false)
 |-- Age: integer (nullable = true)
 |-- Email: string (nullable = false)
 |-- Join Date: date (nullable = true)
 |-- Salary: float (nullable = true)



In [20]:
df.show()

+---+-----------+----+---------------+----------+------+
| ID|       Name| Age|          Email| Join Date|Salary|
+---+-----------+----+---------------+----------+------+
|  1|       Juan|  25|  juan@mail.com|2021-06-15|3000.0|
|  2|      Maria|  26| maria@mail.com|2022-07-20|2500.0|
|  3|Desconocido|  30|    Desconocido|2021-08-06|4000.0|
|  4|      Pedro|NULL|     pedro@mail|2020-05-30|5000.0|
|  5|        Ana|  22|   ana@mail.com|2022-07-20|2500.0|
|  6|     Carlos|  40|carlos@mail.com|2019-12-01|4000.0|
|  7|     Manuel|  35|manuel@mail.com|2021-01-01|3500.0|
|  8|       Luis|  30|      luis@mail|2023-03-25|6000.0|
+---+-----------+----+---------------+----------+------+



## Paso 7
Cargar los datos limpios en “datos_limpios.csv”

In [21]:
# Guardar el dataset limpio
df.write.mode("overwrite").csv("datos_limpios.csv", header=True)

print("Datos limpios y unificados 🙏 guardados en datos_limpios.csv")

Py4JJavaError: An error occurred while calling o174.csv.
: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:552)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:392)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:420)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:392)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:860)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)


In [28]:
# 12. Usar un directorio temporal seguro en C:/temp
temp_output_dir = 'C:/tmp/temp_datos_limpios'

# Eliminar la carpeta temporal si existe
if os.path.exists(temp_output_dir):
    shutil.rmtree(temp_output_dir)

# Crear el directorio temporal sin cambiar permisos explícitamente
os.makedirs(temp_output_dir, exist_ok=True)

# Guardar el archivo CSV en un solo archivo (coalesce(1)) con encabezado en un directorio temporal
df.coalesce(1).write.mode('overwrite').option('header', True).csv(temp_output_dir)

# Encontrar el archivo generado por Spark
generated_file = glob.glob(temp_output_dir + '/part-*.csv')[0]

# Ruta final para el archivo CSV sin espacios en la ruta
final_output_path = 'C:/Users/dern9/Desktop/Big-Data/Practica_2/datos_limpios.csv'

# Crear la carpeta de destino si no existe
os.makedirs(os.path.dirname(final_output_path), exist_ok=True)

# Mover y renombrar el archivo CSV
shutil.move(generated_file, final_output_path)

# Eliminar el directorio temporal
shutil.rmtree(temp_output_dir)

print(f'Datos limpios guardados en {final_output_path}')

Py4JJavaError: An error occurred while calling o209.csv.
: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:552)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:392)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:420)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:392)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:860)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
