# Extract data to a Spark DataFrame

In [3]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Example: Reading a CSV file
df = spark.read.option("sep", "\t").csv("data/en.openfoodfacts.org.products.csv.gz", header=True, inferSchema=True).cache()
# df.show()

# Generate a table in a CSV with statistical descriptions of the DataFrame

In [2]:
df.describe().toPandas().to_csv("describe_summary.csv", index=False)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

# Take a sample of the DF

In [4]:
FRACTION_SIZE = 0.00001

In [5]:
columnsToSample = ['quantity', 'serving_size', 'serving_quantity', 'product_quantity']

In [6]:
samples_not_clean_df = df.select(columnsToSample).dropna(how='all').cache()

In [7]:
samples_df = samples_not_clean_df.sample(withReplacement=False, fraction=FRACTION_SIZE).cache()

In [8]:
samples_df.count()

15

In [9]:
samples_df.show()

+--------+-----------------+----------------+----------------+
|quantity|     serving_size|serving_quantity|product_quantity|
+--------+-----------------+----------------+----------------+
|   32 oz|             NULL|            NULL|       907.18474|
|    NULL|     2 ONZ (57 g)|            57.0|            NULL|
|   200 g|            100 g|           100.0|           200.0|
|    NULL|  0.5 cup (125 g)|           125.0|            NULL|
|    NULL|   0.5 cup (85 g)|            85.0|            NULL|
|    NULL|38.3 g (1.35 ONZ)|            38.3|            NULL|
|   6 lbs|             NULL|            NULL|          6000.0|
|   540ml|             NULL|            NULL|           540.0|
|   400 g|             NULL|            NULL|           400.0|
|    NULL|            77.0g|            77.0|            NULL|
|   400 g|             NULL|            NULL|           400.0|
|    150g|             NULL|            NULL|           150.0|
|    NULL|            100 g|           100.0|          

# Drop null rows

In [10]:
dropped_rows = df.dropna(subset=['categories_tags'])

# Extract unique strings in a column (ingredients analysis tags in this example)

In [11]:
def flatten_list(li):
    flat_list = []
    for row in li:
        flat_list += row
    return flat_list

def make_list_unique(li):
    return list(dict.fromkeys(li))

def split_string_list_elements(li, sep):
    return [x.split(sep) for x in li]

def column_to_list(col):
    return col.rdd.flatMap(lambda x: x).collect()

In [12]:
categories_list = make_list_unique(flatten_list(split_string_list_elements(column_to_list(df.select('ingredients_analysis_tags').dropna()), ",")))

In [13]:
categories_list

['en:palm-oil-free',
 'en:non-vegan',
 'en:vegetarian-status-unknown',
 'en:palm-oil-content-unknown',
 'en:vegan-status-unknown',
 'en:may-contain-palm-oil',
 'en:vegetarian',
 'en:vegan',
 'en:non-vegetarian',
 'en:palm-oil',
 'en:maybe-vegan',
 'en:maybe-vegetarian']

# Drop unecessary columns

In [14]:
kept_columns = ["code", "product_quantity", "energy-kcal_100g", "fat_100g", "saturated-fat_100g", "unsaturated-fat_100g", "monounsaturated-fat_100g", "polyunsaturated-fat_100g", "trans-fat_100g", 
                "carbohydrates_100g", "sugars_100g", "added-sugars_100g", "starch_100g", "fiber_100g", "proteins_100g", "allergens", "traces", "vitamin-a_100g", "vitamin-c_100g", "vitamin-d_100g",
                "vitamin-e_100g", "vitamin-k_100g", "vitamin-b1_100g", "vitamin-b2_100g", "vitamin-b6_100g", "vitamin-b9_100g", "vitamin-b12_100g", "calcium_100g",
                "iron_100g", "magnesium_100g", "potassium_100g", "zinc_100g", "food_groups_tags", "serving_size", "serving_quantity", "cholesterol_100g", "salt_100g", "glycemic-index_100g"]

In [15]:
df_kept_columns = df.select(kept_columns).cache()
df.unpersist()

DataFrame[code: double, url: string, creator: string, created_t: int, created_datetime: timestamp, last_modified_t: int, last_modified_datetime: timestamp, last_modified_by: string, last_updated_t: int, last_updated_datetime: timestamp, product_name: string, abbreviated_product_name: string, generic_name: string, quantity: string, packaging: string, packaging_tags: string, packaging_en: string, packaging_text: string, brands: string, brands_tags: string, categories: string, categories_tags: string, categories_en: string, origins: string, origins_tags: string, origins_en: string, manufacturing_places: string, manufacturing_places_tags: string, labels: string, labels_tags: string, labels_en: string, emb_codes: string, emb_codes_tags: string, first_packaging_code_geo: string, cities: string, cities_tags: string, purchase_places: string, stores: string, countries: string, countries_tags: string, countries_en: string, ingredients_text: string, ingredients_tags: string, ingredients_analysis_

# Write DF to a database

## Get user's database credentials (not necessary)

In [16]:
# properties = {}
# url = {}
# for text in ["database URL", "table name"]
#     url[text] = input("Enter " + text + ": ")


# for text in ["user", "password", "driver"]:
#     properties[text] = input("Enter " + text + ": ")

## Write to the database using JDBC Driver

In [18]:
properties = {
    "user": "user",
    "password": "userpassword",
    "driver": "com.mysql.cj.jdbc.Driver"
}

df_kept_columns.write.jdbc(url="jdbc:mysql://mysql:3306/pyspark_db", table="products", mode="append", properties=properties)

Py4JJavaError: An error occurred while calling o137.jdbc.
: java.sql.SQLSyntaxErrorException: Access denied for user 'user'@'%' to database 'pyspark_db'
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
	at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:828)
	at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:448)
	at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:241)
	at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider.getConnection(BasicConnectionProvider.scala:49)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProviderBase.create(ConnectionProvider.scala:102)
	at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1(JdbcDialects.scala:160)
	at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1$adapted(JdbcDialects.scala:156)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:50)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:756)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)


# GPT help