# Saving Data

## First, load some data ...

In [1]:
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
import os
spark = SparkSession.builder.master("local").getOrCreate()
DATA_DIR = os.path.abspath(os.path.join(os.getcwd(), "..", "data"))

In [2]:
users = spark.read.option("header", "true").csv(f"{DATA_DIR}/users.csv")
messages = spark.read.option("header", "true").csv(f"{DATA_DIR}/messages.csv")
users.show(10, False)
messages.show(10, False)
users.printSchema()

+---+---------+--------+----------+
|id |firstname|lastname|dob       |
+---+---------+--------+----------+
|1  |John     |Smith   |2001-01-01|
|2  |Kim      |Melly   |1998-08-28|
+---+---------+--------+----------+

+----------+---------+-----------------------+-------------+----+-----------+
|date      |m_title  |m_body                 |m_attachments|user|recipient  |
+----------+---------+-----------------------+-------------+----+-----------+
|2022-01-03|Title    |Hello World            |null         |2   |p@gmail.com|
|2022-01-02|Title 2  |Hello World            |null         |3   |d@gmail.com|
|2022-01-03|Spark SQL|Let's learn about spark|null         |2   |a@gmail.com|
+----------+---------+-----------------------+-------------+----+-----------+

root
 |-- id: string (nullable = true)
 |-- firstname: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)



## Write to parquet ...

We will take the CSV data read into dataframes above. Then we'll add some typing to the data (date string from CSV will become a DateType) just to show it is possible to have different types and that parquet will honour those types (whereas CSV won't). You can read more about types [here](https://sparkbyexamples.com/pyspark/pyspark-sql-types-datatype-with-examples/). We can also look at the output files created (**note** these files are renamed in source control to a consistent filename to make diffs easier to track).

In [3]:
users_path = f"{DATA_DIR}/parquet/users"
messages_path = f"{DATA_DIR}/parquet/messages"

(
    users
    .select("id", "firstname", "lastname", col("dob").cast("date"))
    .write.parquet(users_path, mode="overwrite")
)

(
    messages
    .withColumn("dt", col("date").cast("date"))
    .drop("date")
    .write.parquet(messages_path, mode="overwrite")
)

In [4]:
spark.read.parquet(users_path).printSchema()
spark.read.parquet(messages_path).printSchema()
os.listdir(users_path)

root
 |-- id: string (nullable = true)
 |-- firstname: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: date (nullable = true)

root
 |-- m_title: string (nullable = true)
 |-- m_body: string (nullable = true)
 |-- m_attachments: string (nullable = true)
 |-- user: string (nullable = true)
 |-- recipient: string (nullable = true)
 |-- dt: date (nullable = true)



['._SUCCESS.crc',
 '.part-00000-d65f96af-1a05-4e78-81d3-716070df6d2b-c000.snappy.parquet.crc',
 '_SUCCESS',
 'part-00000-d65f96af-1a05-4e78-81d3-716070df6d2b-c000.snappy.parquet']

## Writing parquet into partitions

Here we can take all our data and write it to partitions. Often when I am working with data it is partitioned by date - so there are folders organised by date. Let's see how that looks in a temporary folder (that won't be in version control).

In [5]:
partition_path = f"{DATA_DIR}/tmp/messages"
messages = spark.read.parquet(messages_path)
messages.write.partitionBy("dt").mode("overwrite").parquet(partition_path)
os.listdir(partition_path)


['._SUCCESS.crc', '_SUCCESS', 'dt=2022-01-02', 'dt=2022-01-03']

In [6]:
# then to read a single partition
spark.read.parquet(f"{DATA_DIR}/tmp/messages/dt=2022-01-02").show()

+-------+-----------+-------------+----+-----------+
|m_title|     m_body|m_attachments|user|  recipient|
+-------+-----------+-------------+----+-----------+
|Title 2|Hello World|         null|   3|d@gmail.com|
+-------+-----------+-------------+----+-----------+



In [7]:
# read from multiple partitions
count = spark.read.parquet(f"{DATA_DIR}/tmp/messages").count()
assert count == 3

## Watch out for overwrite

If you are writing to partitions (here by `dt`), we need to be careful not to overwrite all partitions.
Currently we have data in 2 dt partitions. If I were to load a single date's data, and write it like this:

```
messages.write.partitionBy("dt").mode("overwrite").parquet(write_path)
```

What would happen?

In [8]:
single_dt = messages.filter(col("dt") == "2022-01-02")
single_dt.write.partitionBy("dt").mode("overwrite").parquet(f"{DATA_DIR}/tmp/messages")
df = spark.read.parquet(f"{DATA_DIR}/tmp/messages")
df.show() # we've lost the other partition!


+-------+-----------+-------------+----+-----------+----------+
|m_title|     m_body|m_attachments|user|  recipient|        dt|
+-------+-----------+-------------+----+-----------+----------+
|Title 2|Hello World|         null|   3|d@gmail.com|2022-01-02|
+-------+-----------+-------------+----+-----------+----------+



We can imagine a situation where:
* we have decided to partition by date/dt
* we want to be able to rewrite data for a particular data

To achieve this, we will

1. reload the original data
2. write it again so we have 2 dt partitions
3. adapt the data for a given dt partition and then rewrite it to the specific partition
4. confirm that we have not lost the other partition's data

In [9]:
# step 1
data = spark.read.parquet(messages_path)
assert data.count() == 3
data.write.partitionBy("dt").mode("overwrite").parquet(f"{DATA_DIR}/tmp/messages") # step 2
partitioned_data = spark.read.parquet(f"{DATA_DIR}/tmp/messages")
partitioned_data.show()

+---------+--------------------+-------------+----+-----------+----------+
|  m_title|              m_body|m_attachments|user|  recipient|        dt|
+---------+--------------------+-------------+----+-----------+----------+
|    Title|         Hello World|         null|   2|p@gmail.com|2022-01-03|
|Spark SQL|Let's learn about...|         null|   2|a@gmail.com|2022-01-03|
|  Title 2|         Hello World|         null|   3|d@gmail.com|2022-01-02|
+---------+--------------------+-------------+----+-----------+----------+



In [10]:
# step 3
new_row = (
    messages.filter(col("dt") == "2022-01-02")
    .drop("m_title")
    .withColumn("m_title", lit("Updated title"))
)
new_row.show()

+-----------+-------------+----+-----------+----------+-------------+
|     m_body|m_attachments|user|  recipient|        dt|      m_title|
+-----------+-------------+----+-----------+----------+-------------+
|Hello World|         null|   3|d@gmail.com|2022-01-02|Updated title|
+-----------+-------------+----+-----------+----------+-------------+



In [11]:
(
    new_row
    .write.mode("overwrite")
    .parquet(f"{DATA_DIR}/tmp/messages/dt=2022-01-02")
)

In [12]:
# The record for 2022-01-02 should have n_title of "Updated title"
spark.read.parquet(f"{DATA_DIR}/tmp/messages").show()

+--------------------+-------------+----+-----------+----------+-------------+
|              m_body|m_attachments|user|  recipient|        dt|      m_title|
+--------------------+-------------+----+-----------+----------+-------------+
|         Hello World|         null|   3|d@gmail.com|2022-01-02|Updated title|
|         Hello World|         null|   2|p@gmail.com|2022-01-03|        Title|
|Let's learn about...|         null|   2|a@gmail.com|2022-01-03|    Spark SQL|
+--------------------+-------------+----+-----------+----------+-------------+



## Writing to other formats



In [13]:
# save as ... 
# json
json_path = f"{DATA_DIR}/tmp/messages-json"
messages.write.format("json").mode("overwrite").save(json_path)
# csv
csv_path = f"{DATA_DIR}/tmp/messages-csv"
messages.write.format("csv").mode("overwrite").save(csv_path)

which look like (note how the null attachments were not written)

```
{"m_title":"Title","m_body":"Hello World","user":"2","recipient":"p@gmail.com","dt":"2022-01-03"}
{"m_title":"Title 2","m_body":"Hello World","user":"3","recipient":"d@gmail.com","dt":"2022-01-02"}
{"m_title":"Spark SQL","m_body":"Let's learn about spark","user":"2","recipient":"a@gmail.com","dt":"2022-01-03"}
```

and

```
Title,Hello World,"",2,p@gmail.com,2022-01-03
Title 2,Hello World,"",3,d@gmail.com,2022-01-02
Spark SQL,Let's learn about spark,"",2,a@gmail.com,2022-01-03

```

If we want to force nulls to be written to JSON, we need to set an option

In [14]:
messages.write.format("json").mode("overwrite").option("ignoreNullFields", "false").save(json_path)

```
{"m_title":"Title","m_body":"Hello World","m_attachments":null,"user":"2","recipient":"p@gmail.com","dt":"2022-01-03"}
{"m_title":"Title 2","m_body":"Hello World","m_attachments":null,"user":"3","recipient":"d@gmail.com","dt":"2022-01-02"}
{"m_title":"Spark SQL","m_body":"Let's learn about spark","m_attachments":null,"user":"2","recipient":"a@gmail.com","dt":"2022-01-03"}

```