## Importation des librairies et configuration de la session

In [2]:
import pyspark

In [3]:
import os


# Point vers l'interpréteur Python actif de ton environnement Anaconda
python_path = os.path.join(os.environ["CONDA_PREFIX"], "python.exe")

# Assure que Spark utilise la même version pour driver et worker
os.environ["PYSPARK_PYTHON"] = python_path
os.environ["PYSPARK_DRIVER_PYTHON"] = python_path



In [4]:
from pyspark.sql import SparkSession
mongo_uri = "mongodb://mongodb://admin:admin@localhost:27007"

spark = SparkSession.builder \
    .appName("LectureParquet") \
    .master("local[1]") \
    .config("spark.driver.memory", "6g") \
    .config("spark.mongodb.output.uri", mongo_uri) \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .getOrCreate()



In [5]:
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
spark.conf.set("spark.sql.parquet.columnarReaderBatchSize", 512)


In [6]:
df = spark.read.parquet("donnees_petit.parquet")

# Afficher les premières lignes

# Afficher le schéma
df.printSchema()

root
 |-- additives_n: integer (nullable = true)
 |-- additives_tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- allergens_tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- brands_tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- brands: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- categories_tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- categories_properties: struct (nullable = true)
 |    |-- ciqual_food_code: integer (nullable = true)
 |    |-- agribalyse_food_code: integer (nullable = true)
 |    |-- agribalyse_proxy_food_code: integer (nullable = true)
 |-- checkers_tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ciqual_food_name_tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- cities_tags: array (nullable = true)
 |    |-- element: string (containsNul

## Data cleaning

#### Première des chose on supprime les lignes sans ingrédients qui jouent un rôle très important

In [9]:
from pyspark.sql.functions import size, col

# Supposons que ton DataFrame s'appelle df et que la colonne est 'ingredients_text'
df_cleaned = df.filter(size(col("ingredients_text")) > 0)

#### calculer le pourcentage des missings value du dataset

In [11]:
from pyspark.sql.functions import col, count, when

total_rows = df_cleaned.count()
nan_percentages = df_cleaned.select([
    (count(when(col(c).isNull(), c)) / total_rows).alias(c)
    for c in df_cleaned.columns
])
nan_percentages.show(vertical=True)



-RECORD 0------------------------------------------------------
 additives_n                            | 6.075062760758668E-4 
 additives_tags                         | 9.82730740710961E-5  
 allergens_tags                         | 0.0                  
 brands_tags                            | 0.15901476776285814  
 brands                                 | 0.15931852090089607  
 categories                             | 0.10650121054559424  
 categories_tags                        | 0.10641187138734778  
 categories_properties                  | 1.78678316492902E-5  
 checkers_tags                          | 0.0                  
 ciqual_food_name_tags                  | 0.3527377985044625   
 cities_tags                            | 0.5681255751208312   
 code                                   | 0.0                  
 compared_to_category                   | 0.1733894383247121   
 complete                               | 0.0                  
 completeness                           

#### supprimer les colonnes dont le pourcentage de null supérieur de 0.6 

In [13]:
from pyspark.sql.functions import col

# Suppression des colonnes avec plus de 90% de valeurs nulles
total_rows = df_cleaned.count()
seuil = 0.6

cols_to_drop = []
for c in df_cleaned.columns:
    null_ratio = df_cleaned.filter(col(c).isNull()).count() / total_rows
    if null_ratio > seuil:
        cols_to_drop.append(c)

df_cleaned = df_cleaned.drop(*cols_to_drop)
print("Colonnes supprimées pour cause de +60% NaN :", cols_to_drop)

Colonnes supprimées pour cause de +60% NaN : ['ecoscore_score', 'editors', 'new_additives_n', 'owner_fields', 'owner', 'packagings_complete', 'packaging_tags', 'packaging', 'photographers', 'with_non_nutritive_sweeteners', 'with_sweeteners']


#### supprimer les lignes qui ont country vide

In [15]:
from pyspark.sql.functions import col, size

df_cleaned= df_cleaned.filter(
    (size(col("countries_tags")) > 0)
)



#### traiter les miss values pour les integer et string 


###### traitement des additifs

In [18]:
from pyspark.sql.functions import col, when

df_cleaned = df_cleaned.withColumn(
    "additives_n",
    when(col("additives_n").isNull(), 0).otherwise(col("additives_n")).cast("int")
)

from pyspark.sql.functions import udf, array
from pyspark.sql.types import ArrayType, StringType

# Remplacer les nulls par tableau vide
df_cleaned = df_cleaned.withColumn(
    "additives_tags",
    when(col("additives_tags").isNull(), array().cast(ArrayType(StringType())))
    .otherwise(col("additives_tags"))
)


##### traitement des brands

In [20]:
from pyspark.sql.functions import col, when

df_cleaned = df_cleaned.withColumn(
    "brands",
    when(col("brands").isNull(), "").otherwise(col("brands"))
)
from pyspark.sql.functions import when, col, array

df_cleaned = df_cleaned.withColumn(
    "brands_tags",
    when(col("brands_tags").isNull(), array()).otherwise(col("brands_tags"))
)


##### traitement de categories

In [22]:
from pyspark.sql.functions import explode, col, count, desc, when, array, lit

# 1. On aplatit les listes pour compter les tags
most_common_tag = (
    df_cleaned
    .filter((col("categories_tags").isNotNull()) & (col("categories_tags") != array(lit("en:null"))))
    .select(explode(col("categories_tags")).alias("tag"))
    .groupBy("tag")
    .agg(count("*").alias("count"))
    .orderBy(desc("count"))
    .first()
)

# 2. Récupération du tag le plus fréquent (si trouvé)
default_tag = most_common_tag["tag"] if most_common_tag else "en:unknown"

# 3. Remplacement des NULLs et [en:null] par [default_tag]
df_cleaned = df_cleaned.withColumn(
    "categories_tags",
    when(
        (col("categories_tags").isNull()) | (col("categories_tags") == array(lit("en:null"))),
        array(lit(default_tag))
    ).otherwise(col("categories_tags"))
)


In [23]:
from pyspark.sql.functions import trim

df_cleaned = df_cleaned.withColumn(
    "categories",
    when((col("categories").isNull()) | (trim(col("categories")) == "null") | (trim(col("categories")) == ""), lit("Snacks"))
    .otherwise(col("categories"))
)


In [24]:
# Ta liste de colonnes à conserver
cols_to_gard = [
    "additives_n", "allergens_tags", "brands", "categories",
    "code", "compared_to_category", "countries_tags",
    "ingredients_analysis_tags", "ingredients_from_palm_oil_n",
    "ingredients_n", "ingredients_percent_analysis",
    "ingredients_tags", "ingredients_text", "ingredients_with_specified_percent_n",
    "ingredients_with_unspecified_percent_n", "ingredients", "known_ingredients_n",
    "lang", "nutrient_levels_tags", "nutriments", "nutrition_data_per", "product_name",
    "product_quantity_unit", "quantity", "serving_quantity", "serving_size",
    "unknown_ingredients_n"
]

# Filtrer les colonnes existantes (certaines peuvent ne pas être présentes dans le DataFrame)
cols_present = [col for col in cols_to_gard if col in df.columns]

# Sélectionner seulement ces colonnes
df_cleaned = df_cleaned.select(*cols_present)


In [25]:
df_cleaned[['countries_tags']].show(20,truncate=False)

+------------------+
|countries_tags    |
+------------------+
|[en:united-states]|
|[en:united-states]|
|[en:united-states]|
|[en:germany]      |
|[en:united-states]|
|[en:united-states]|
|[en:united-states]|
|[en:united-states]|
|[en:united-states]|
|[en:united-states]|
|[en:united-states]|
|[en:united-states]|
|[en:germany]      |
|[en:united-states]|
|[en:united-states]|
|[en:united-states]|
|[en:united-states]|
|[en:united-states]|
|[en:united-states]|
|[en:united-states]|
+------------------+
only showing top 20 rows



##### traiter les donnés avec ==1

In [27]:
df_cleaned = df_cleaned.filter(df.ingredients_percent_analysis == 1)


##### Traitement des ingrédients

In [29]:
from pyspark.sql.functions import regexp_extract, col, when,expr
df_cleaned = df_cleaned.withColumn(
    "ingredients_text",
    expr("""
        element_at(
            filter(ingredients_text, x -> x.lang = 'main'),
            1
        ).text
    """)
)

# Gérer les cas où aucun élément avec lang = "main" n'existe
df_cleaned = df_cleaned.withColumn(
    "ingredients_text",
    when(col("ingredients_text").isNotNull(), col("ingredients_text")).otherwise("")
)

In [30]:
df_cleaned[['ingredients_n']].show(truncate=False)

+-------------+
|ingredients_n|
+-------------+
|1            |
|11           |
|8            |
|8            |
|9            |
|6            |
|8            |
|1            |
|1            |
|3            |
|3            |
|2            |
|12           |
|1            |
|4            |
|3            |
|2            |
|4            |
|15           |
|11           |
+-------------+
only showing top 20 rows



In [31]:
df_cleaned[['ingredients_text']].show(truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ingredients_text                                                                                                                                                                                             |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|SHAVE GRASS.                                                                                                                                                                                                 |
|RYE FLOUR, WATER, WHEAT, FLOUR, MALT, MOLASSES, SUGAR, ONION, YEAST, CARAWAY SEEDS, SALT.                                                                              

In [32]:
df_cleaned.show(3)

+-----------+------------------+--------------------+--------------------+-------------+--------------------+------------------+-------------------------+---------------------------+-------------+----------------------------+--------------------+--------------------+------------------------------------+--------------------------------------+--------------------+-------------------+----+--------------------+--------------------+------------------+--------------------+---------------------+--------+-----------------+------------------+---------------------+
|additives_n|    allergens_tags|              brands|          categories|         code|compared_to_category|    countries_tags|ingredients_analysis_tags|ingredients_from_palm_oil_n|ingredients_n|ingredients_percent_analysis|    ingredients_tags|    ingredients_text|ingredients_with_specified_percent_n|ingredients_with_unspecified_percent_n|         ingredients|known_ingredients_n|lang|nutrient_levels_tags|          nutriments|nutriti

In [33]:
df_cleaned[['nutrient_levels_tags']].show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------+
|nutrient_levels_tags                                                                                                      |
+--------------------------------------------------------------------------------------------------------------------------+
|[en:fat-in-low-quantity, en:salt-in-low-quantity]                                                                         |
|[en:fat-in-moderate-quantity, en:saturated-fat-in-low-quantity, en:sugars-in-high-quantity, en:salt-in-high-quantity]     |
|[en:fat-in-moderate-quantity, en:saturated-fat-in-high-quantity, en:sugars-in-high-quantity, en:salt-in-moderate-quantity]|
|[en:fat-in-moderate-quantity, en:sugars-in-moderate-quantity, en:salt-in-moderate-quantity]                               |
|[en:fat-in-high-quantity, en:saturated-fat-in-high-quantity, en:sugars-in-high-quantity, en:salt-in-low-quantity]         |


In [34]:
from pyspark.sql.functions import col, lower, trim, regexp_replace, array, transform, lit

def clean_countries_tags(df):

    return df.withColumn(
        "countries_tags",
        transform(col("countries_tags"), lambda x: regexp_replace(x, "^en:", ""))
    )

df_cleaned = clean_countries_tags(df_cleaned)


In [35]:
from pyspark.sql.functions import expr


def extract_main_product_name(df):
    return df.withColumn(
        "product_name",
        expr("""
            filter(product_name, x -> x.lang = 'main')[0].text
        """)
    )

df_cleaned = extract_main_product_name(df_cleaned)

In [36]:
df_cleaned[['lang']].show(truncate=False)

+----+
|lang|
+----+
|en  |
|en  |
|en  |
|de  |
|en  |
|en  |
|en  |
|en  |
|en  |
|en  |
|en  |
|en  |
|en  |
|en  |
|en  |
|en  |
|en  |
|en  |
|en  |
|en  |
+----+
only showing top 20 rows



In [37]:
from pyspark.sql.functions import expr

def extract_nutrient_levels(df):
    df = df.withColumn("nutrient_levels_tags",
        transform(col("nutrient_levels_tags"), lambda x: regexp_replace(x, "^en:", ""))
    )
    
    return df \
        .withColumn("fat_level", expr("""
            filter(nutrient_levels_tags, x -> x like 'fat-in-%')[0]
        """)) \
        .withColumn("sugar_level", expr("""
            filter(nutrient_levels_tags, x -> x like 'sugars-in-%')[0]
        """)) \
        .withColumn("salt_level", expr("""
            filter(nutrient_levels_tags, x -> x like 'salt-in-%')[0]
        """)) \
        .withColumn("saturated_fat_level", expr("""
            filter(nutrient_levels_tags, x -> x like 'saturated-fat-in-%')[0]
        """)) \
        .withColumn("fat_level", regexp_replace("fat_level", "fat-in-|\\-quantity", "")) \
        .withColumn("sugar_level", regexp_replace("sugar_level", "sugars-in-|\\-quantity", "")) \
        .withColumn("salt_level", regexp_replace("salt_level", "salt-in-|\\-quantity", "")) \
        .withColumn("saturated_fat_level", regexp_replace("saturated_fat_level", "saturated-fat-in-|\\-quantity", ""))

df_cleaned=extract_nutrient_levels(df_cleaned)


In [38]:
df_cleaned[['allergens_tags']].show()

+--------------------+
|      allergens_tags|
+--------------------+
|                  []|
|         [en:gluten]|
|  [en:eggs, en:milk]|
|[en:milk, de:whey...|
|[en:milk, en:soyb...|
|                  []|
|           [en:milk]|
|                  []|
|                  []|
|                  []|
|                  []|
|                  []|
|                  []|
|                  []|
|                  []|
|                  []|
|                  []|
|                  []|
|    [en:crustaceans]|
|  [en:eggs, en:nuts]|
+--------------------+
only showing top 20 rows



In [39]:
df_cleaned= df_cleaned.fillna({
    "fat_level": "moderate",
    "sugar_level": "moderate",
    "salt_level": "moderate",
    "saturated_fat_level": "moderate"
})


In [40]:
from pyspark.sql.functions import col, lower, regexp_replace, split, element_at

df_cleaned= df_cleaned.withColumn("product_name", lower(col("product_name")))
df_cleaned= df_cleaned.withColumn("ingredients_text", lower(col("ingredients_text")))
df_cleaned= df_cleaned.withColumn("categories", lower(col("categories")))
df_cleaned= df_cleaned.withColumn("ingredients_text", regexp_replace(col("ingredients_text"), "[^a-z0-9, ]", ""))
df_cleaned= df_cleaned.withColumn("categories", regexp_replace(col("categories"), "[^a-z0-9, ]", ""))
df_cleaned= df_cleaned.withColumn("countries_tags", element_at(col("countries_tags"), 1))


In [41]:
#feature engineering
df_cleaned= df_cleaned.withColumn("ingredients_list", split(col("ingredients_text"), ", "))
df_cleaned= df_cleaned.withColumn("categories_list", split(col("categories"), ", "))
df_cleaned= df_cleaned.withColumn("has_gluten", when(col("ingredients_text").contains("gluten|wheat"), 1).otherwise(0))
df_cleaned= df_cleaned.withColumn("has_nuts", when(col("ingredients_text").contains("nut|peanut|almond"), 1).otherwise(0))

In [42]:
df_cleaned[['ingredients_list']].show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ingredients_list                                                                                                                                                                                          |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[shave grass]                                                                                                                                                                                             |
|[rye flour, water, wheat, flour, malt, molasses, sugar, onion, yeast, caraway seeds, salt]                                                                                         

In [43]:
from pyspark.sql.functions import when, col

df_cleaned = df_cleaned.withColumn("fat_level_num", 
    when(col("fat_level") == "low", 0)
    .when(col("fat_level") == "moderate", 1)
    .when(col("fat_level") == "high", 2)
    .otherwise(None)
)

df_cleaned = df_cleaned.withColumn("sugar_level_num", 
    when(col("sugar_level") == "low", 0)
    .when(col("sugar_level") == "moderate", 1)
    .when(col("sugar_level") == "high", 2)
    .otherwise(None)
)

df_cleaned = df_cleaned.withColumn("salt_level_num", 
    when(col("salt_level") == "low", 0)
    .when(col("salt_level") == "moderate", 1)
    .when(col("salt_level") == "high", 2)
    .otherwise(None)
)

df_cleaned = df_cleaned.withColumn("saturated_fat_level_num", 
    when(col("saturated_fat_level") == "low", 0)
    .when(col("saturated_fat_level") == "moderate", 1)
    .when(col("saturated_fat_level") == "high", 2)
    .otherwise(None)
)


In [44]:
df_cleaned[['saturated_fat_level_num']].show()

+-----------------------+
|saturated_fat_level_num|
+-----------------------+
|                      1|
|                      0|
|                      2|
|                      1|
|                      2|
|                      1|
|                      2|
|                      0|
|                      0|
|                      0|
|                      0|
|                      0|
|                      0|
|                      0|
|                      0|
|                      0|
|                      1|
|                      1|
|                      1|
|                      1|
+-----------------------+
only showing top 20 rows



In [45]:
from pyspark.ml.feature import Tokenizer, Word2Vec, StringIndexer

sugar_indexer = StringIndexer(inputCol="sugar_level", outputCol="sugar_level_index", handleInvalid="keep")
fat_indexer = StringIndexer(inputCol="fat_level", outputCol="fat_level_index", handleInvalid="keep")
sat_fat_indexer = StringIndexer(inputCol="saturated_fat_level", outputCol="saturated_fat_level_index", handleInvalid="keep")
salt_indexer = StringIndexer(inputCol="salt_level", outputCol="salt_level_index", handleInvalid="keep")
country_indexer = StringIndexer(inputCol="countries_tags", outputCol="country_index", handleInvalid="keep")
category_indexer = StringIndexer(inputCol="categories", outputCol="category_index", handleInvalid="keep")

In [46]:
# Vectoriser ingredients_text
ingredient_tokenizer = Tokenizer(inputCol="ingredients_text", outputCol="ingredient_words")
ingredient_word2vec = Word2Vec(vectorSize=100, minCount=0, inputCol="ingredient_words", outputCol="ingredient_vector")


In [47]:
# Pipeline
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[
    sugar_indexer, fat_indexer, sat_fat_indexer, salt_indexer,
    country_indexer, category_indexer,
    ingredient_tokenizer, ingredient_word2vec
])
model = pipeline.fit(df_cleaned)

In [48]:
df_cleaned= model.transform(df_cleaned)


In [49]:
from pyspark.ml.linalg import Vectors

def combine_features(ingredient_vector, sugar_level_index, fat_level_index, saturated_fat_level_index, salt_level_index, country_index, category_index):
    ingredient_weight = 0.6
    nutrition_weight = 0.05
    country_weight = 0.15
    category_weight = 0.15
    return Vectors.dense(
        [x * ingredient_weight for x in ingredient_vector.toArray().tolist()] +
        [sugar_level_index * nutrition_weight, fat_level_index * nutrition_weight,
         saturated_fat_level_index * nutrition_weight, salt_level_index * nutrition_weight,
         country_index * country_weight, category_index * category_weight]
    )
from pyspark.ml.linalg import VectorUDT

combine_features_udf = udf(combine_features, VectorUDT())
df_cleaned= df_cleaned.withColumn("combined_features", combine_features_udf(
    col("ingredient_vector"), col("sugar_level_index"), col("fat_level_index"),
    col("saturated_fat_level_index"), col("salt_level_index"), col("country_index"), col("category_index")
))


In [50]:
df_cleaned[['ingredients']].show()

+--------------------+
|         ingredients|
+--------------------+
|[{"percent_max":1...|
|[{"percent_max":1...|
|[{"percent_max":1...|
|[{"percent_max":9...|
|[{"percent_max":1...|
|[{"percent_max":1...|
|[{"percent_max":1...|
|[{"percent_max":1...|
|[{"percent_max":1...|
|[{"percent_max":1...|
|[{"percent_max":1...|
|[{"percent_max":1...|
|[{"percent_max":1...|
|[{"percent_max":1...|
|[{"percent_max":1...|
|[{"percent_max":1...|
|[{"percent_max":1...|
|[{"percent_max":1...|
|[{"percent_max":1...|
|[{"percent_max":7...|
+--------------------+
only showing top 20 rows



In [51]:
import os
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.1 pyspark-shell"


In [52]:
 
# mongo_uri = "mongodb://admin:admin@localhost:27007"
# db_name = "mini-rag"
# collection_name = "assets"

# if not check_mongodb_connection(mongo_uri):
#     raise Exception("Impossible de se connecter à MongoDB. Vérifiez que le serveur est en cours d'exécution.")

# import os
# os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.1 pyspark-shell"

# from pyspark.sql import SparkSession

# spark = SparkSession.builder \
#     .appName("OpenFoodFactsPipelineTextInput") \
#     .config("spark.mongodb.input.uri", f"{mongo_uri}/{db_name}.{collection_name}") \
#     .config("spark.mongodb.output.uri", f"{mongo_uri}/{db_name}.{collection_name}") \
#     .getOrCreate()


# df = spark.read.format("mongo").load()
# df.show()

# df_test.write.format("mongo").mode("overwrite").save()


# df_test = spark.createDataFrame(
#     [(1, "pomme"), (2, "banane")],
#     ["id", "fruit"]
# )

# try:
#     df = spark.read \
#         .format("com.mongodb.spark.sql.DefaultSource") \
#         .option("database", db_name) \
#         .option("collection", collection_name) \
#         .load()

#     df.show()
#     logger.info("Écriture dans MongoDB réussie")
# except Exception as e:
#     logger.error(f"Erreur lors de l'écriture dans MongoDB : {e}")
#     raise


# try:
#     df_test.write \
#         .format("mongodb") \
#         .mode("overwrite") \
#         .option("spark.mongodb.write.connection.uri", f"{mongo_uri}/{db_name}") \
#         .option("spark.mongodb.write.database", db_name) \
#         .option("spark.mongodb.write.collection", collection_name) \
#         .save()
#     logger.info("Écriture dans MongoDB réussie")
# except Exception as e:
#     logger.error(f"Erreur lors de l'écriture dans MongoDB : {e}")
#     raise


In [53]:
print(spark.version)

3.5.5


In [54]:
df_cleaned[['ingredients_text']].show()

+--------------------+
|    ingredients_text|
+--------------------+
|         shave grass|
|rye flour, water,...|
|bart  judys propr...|
|molkenproteinkonz...|
|milk chocolate su...|
|bittersweet choco...|
|semisweet chocola...|
|      romaine hearts|
|          green leaf|
|fuji apple cider,...|
|apple cider, natu...|
|pink lady apple c...|
|sugar, corn syrup...|
|         fuji apples|
|cranberries, suga...|
|honey crisp apple...|
|pure raw honey, r...|
|caperberries, wat...|
|seasoned flour wh...|
|sugar, almonds 30...|
+--------------------+
only showing top 20 rows



In [55]:
# ATTENTION : ça peut consommer beaucoup de mémoire si ton dataset est volumineux !
df_cleaned.write \
    .mode("append") \
    .parquet("../export_parquet")



Py4JJavaError: An error occurred while calling o1898.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 377.0 failed 1 times, most recent failure: Lost task 0.0 in stage 377.0 (TID 503) (host.docker.internal executor driver): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to file:/C:/Users/em/anaconda_projects/tensorflow/export_parquet.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:775)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:420)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:99)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:403)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:410)
	... 17 more
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(DataInputStream.java:392)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
	... 30 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:271)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:802)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to file:/C:/Users/em/anaconda_projects/tensorflow/export_parquet.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:775)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:420)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:99)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:403)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:410)
	... 17 more
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(DataInputStream.java:392)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
	... 30 more


In [None]:
df_cleaned[['ingredients_text']]

In [None]:
import sys
print(sys.version)


In [56]:
from pyspark.sql.functions import udf, lit
from pyspark.sql.types import StringType
import platform

def get_python_version():
    import sys
    return sys.version

get_python_version_udf = udf(get_python_version, StringType())

df = spark.range(1)  # un petit DataFrame
df_with_version = df.withColumn("python_version", get_python_version_udf())

df_with_version.show(truncate=False)


Py4JJavaError: An error occurred while calling o1919.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 378.0 failed 1 times, most recent failure: Lost task 0.0 in stage 378.0 (TID 505) (host.docker.internal executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:99)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(DataInputStream.java:392)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
	... 26 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4333)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3539)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:99)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(DataInputStream.java:392)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
	... 26 more


In [62]:
df.limit(10).toPandas().to_parquet("C:/Users/em/anaconda_projects/tensorflow/export_parquet/test.parquet")


In [64]:
df_cleaned.printSchema()

root
 |-- additives_n: integer (nullable = true)
 |-- allergens_tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- brands: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- code: string (nullable = true)
 |-- compared_to_category: string (nullable = true)
 |-- countries_tags: string (nullable = true)
 |-- ingredients_analysis_tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ingredients_from_palm_oil_n: integer (nullable = true)
 |-- ingredients_n: integer (nullable = true)
 |-- ingredients_percent_analysis: integer (nullable = true)
 |-- ingredients_tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ingredients_text: string (nullable = true)
 |-- ingredients_with_specified_percent_n: integer (nullable = true)
 |-- ingredients_with_unspecified_percent_n: integer (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- known_ingredients_n: integer (nullable = t

In [68]:
df1=df_cleaned

In [70]:
df1.limit(1).toPandas()


Py4JJavaError: An error occurred while calling o1943.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 383.0 failed 1 times, most recent failure: Lost task 0.0 in stage 383.0 (TID 510) (host.docker.internal executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:99)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(DataInputStream.java:392)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
	... 26 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4149)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4146)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:99)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(DataInputStream.java:392)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
	... 26 more


In [74]:
df_cleaned.coalesce(1).write.mode("overwrite").parquet("/content/tmp_parquet")


Py4JJavaError: An error occurred while calling o1958.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 385.0 failed 1 times, most recent failure: Lost task 0.0 in stage 385.0 (TID 512) (host.docker.internal executor driver): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to file:/content/tmp_parquet.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:775)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:420)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:99)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:403)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:410)
	... 17 more
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(DataInputStream.java:392)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
	... 31 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:271)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:802)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to file:/content/tmp_parquet.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:775)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:420)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:99)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:403)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:410)
	... 17 more
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(DataInputStream.java:392)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
	... 31 more
