In [1]:
import $ivy.`org.apache.spark::spark-sql:2.4.5`
import $ivy.`io.delta::delta-core:0.5.0`
import $ivy.`io.circe::circe-parser:0.12.3`
import $ivy.`com.lihaoyi::os-lib:0.2.7`
import org.apache.spark.sql._
import io.circe.parser._
import java.time.Instant

os.remove.all(os.pwd / "parquet-example")
os.remove.all(os.pwd / "parquet-example2")

[32mimport [39m[36m$ivy.$                                  
[39m
[32mimport [39m[36m$ivy.$                           
[39m
[32mimport [39m[36m$ivy.$                              
[39m
[32mimport [39m[36m$ivy.$                          
[39m
[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36mio.circe.parser._
[39m
[32mimport [39m[36mjava.time.Instant

[39m

# Working with simple parquet files

Parquet it's a great data file format, and in combination with spark, makes a powerfull combination to make our data analisis faster and cheaper. I'm not gonna explain all that makes parquet so good, but let's see in what we can do in a ordinary day with it.

But first, let's put the tools that we are going to use:

* First, apache spark is one of the most popular big data processing, if you want to learn more, contact us for information of our trainings in [habla computing](www.habla.dev) or consultancy.
* In this notebook, we are going to work in a local instance of spark, so all the data that we generate will be in the local file system. To make it easier to see the generated files, we are going to use [OS lib](https://github.com/lihaoyi/os-lib) a simple, flexible, high-performance Scala interface to common OS filesystem and subprocess APIs. The only bad thing is that is not compatible with hadoop file system, but for the porpouse of this notebook, it's enough.
* Also some files that we will create, will be in [json](https://www.json.org/json-en.html) format, to show them properly we will use [circe](https://circe.github.io/circe/) another simple library, to parse, manipulate and validate json documents.
* And the last thing, is the kernel used in this notebook, it's called [almond](https://almond.sh/) and its a jupiter kernel based in [ammmonite]() a Scala REPL. It allows to configure everithing, and import all the dependencies as a normal build tool.

So you came for delta tables, and you leave in adition with the basics of two fantastic libraries in scala, and a powerful scala kernel for jupyter 😄.

One more thing, the notebooks are tested to be reprocessed as many times as you want (and need), so if you find any error, just start executing from the begining.

Ok let's star with some code, first lets create the spark session and write a dataframe in a parquet file, so we can explain how to read it.

In [2]:
val spark = SparkSession.builder().appName("parquet-test").master("local[1]").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/04/13 12:37:42 INFO SparkContext: Running Spark version 2.4.5
20/04/13 12:37:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/04/13 12:37:43 INFO SparkContext: Submitted application: parquet-test
20/04/13 12:37:43 INFO SecurityManager: Changing view acls to: jovyan
20/04/13 12:37:43 INFO SecurityManager: Changing modify acls to: jovyan
20/04/13 12:37:43 INFO SecurityManager: Changing view acls groups to: 
20/04/13 12:37:43 INFO SecurityManager: Changing modify acls groups to: 
20/04/13 12:37:43 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(jovyan); groups with view permissions: Set(); users  with modify permissions: Set(jovyan); groups with modify permissions: Set()
20/04/13 12:37:43 INFO Utils: Successfully started service 'sparkDriver' on port 40549.
20

[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@21ec5f0a

In [3]:
spark.sparkContext.setLogLevel("ERROR") // this is to log only the essencial

In [4]:
val file = "parquet-example"

[36mfile[39m: [32mString[39m = [32m"parquet-example"[39m

In [5]:
spark.range(1,10).write.format("parquet").save(file)

Ok, all set up, we have a parquet file, so we can read it in a very simple way.

In [6]:
val readedFile = spark.read.parquet(file)

[36mreadedFile[39m: [32mDataFrame[39m = [id: bigint]

As you can see, the dataframe of the file, already knows the structure of the data, this is one of the first interesting things in parquet, it writes the metadata in it's own file.

In [7]:
readedFile.printSchema

root
 |-- id: long (nullable = true)



Now we could do anything that we want wit the dataframe, but we are going to focus more in the read and write of the files, so lets learn how to write the file, it is as simple as

In [8]:
val newfile = "parquet-example2"
readedFile.write.parquet(newfile)

[36mnewfile[39m: [32mString[39m = [32m"parquet-example2"[39m

Let's check that we have someting in the folder, for this we are going to use the OS lib, create a reference to the destiny and list it.

In [9]:
val wd = os.pwd / newfile //pwd is the working folder
os.list(wd).foreach(println)

/home/jovyan/work/parquet-example2/._SUCCESS.crc
/home/jovyan/work/parquet-example2/.part-00000-41892a33-685c-4f26-b941-c40bb459ca60-c000.snappy.parquet.crc
/home/jovyan/work/parquet-example2/_SUCCESS
/home/jovyan/work/parquet-example2/part-00000-41892a33-685c-4f26-b941-c40bb459ca60-c000.snappy.parquet


[36mwd[39m: [32mos[39m.[32mpackage[39m.[32mpwd[39m.[32mThisType[39m = root/[32m'home[39m/[32m'jovyan[39m/[32m'work[39m/[32m"parquet-example2"[39m

Great, the file that we created, it's actualy a folder, and spark puts all the neaded files bellow it.

But imagine that we found that we have an error, and want to write the data again, let's see what happend using the same line.

In [10]:
readedFile.write.parquet(newfile)

: 

Oukey... run time error. This is because spark by default use the save mode "ErrorIfExist" that if it already find a that the folder exist and contains data, it throws an error.

In our case we want to use the save mode "Overwrite" to delete all the previous data stored.

In [11]:
readedFile.write.mode(SaveMode.Overwrite).parquet(newfile)

In [12]:
os.list(wd).foreach(println)

/home/jovyan/work/parquet-example2/._SUCCESS.crc
/home/jovyan/work/parquet-example2/.part-00000-21a86f2b-aae4-43d8-b722-b1cf5dbdeacc-c000.snappy.parquet.crc
/home/jovyan/work/parquet-example2/_SUCCESS
/home/jovyan/work/parquet-example2/part-00000-21a86f2b-aae4-43d8-b722-b1cf5dbdeacc-c000.snappy.parquet


All cleaned and only we can find the last data stored, as we wanted.

This is a good way to kickstart a new parquet table, but usualy, we don't erase all the data and put the new one, what we do is to add the new data to the previous table, or more commonly called `Append`

In [13]:
spark.read.parquet(newfile).show

+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+



In [14]:
spark.range(10,15).write.mode(SaveMode.Append).parquet(newfile)

In [15]:
spark.read.parquet(newfile).show

+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
+---+



Nice, it added all the new data that we provided.

But at this moment, we are going to talk about limitations. All kinds of files in big data have the same problems:
* Lack of history. As you show, when we ovewrite the file, it deleted all the previous data, and as a developer, it's very common to mistake the save mode that we wanted to use, and delete a production table by mistake 😰.
* Upsert some data it's not easy, if we want to modify some of the records of our table, and keep them in the same table, we will have to write the results first in an auxiliary table, and then overwrite the original one. This is because is never a good practice to overwrite a table that it's been readed in the same process. Also, as you can imagine, moving the data of the extenal table, and the new one, means that the table can't be accessed for reading because it's in maintenance.

This problems are realated to the lack of [ACID properties](https://en.wikipedia.org/wiki/ACID) in big data files or tables.

But [Databricks](https://databricks.com/) has open sourced a new file format called [delta lake](https://delta.io), that uses parquet as the persistence format, and solves a lot of the problems that we find in our everyday life.

Go to the [next notebook](./DeltaTableExample.ipynb) to learn about it.