<div align="center">
  <img src="https://officenationalstatistics.sharepoint.com/:i:/r/sites/itoDSTPMO/DAPCATS/04.%20Technical/02.%20Development_Test/images_for_gitlab/dap-cats-ds-logo.png"/>
</div>

## Tip of the Week: Staging Tables

Staging tables are an alternative way of checkpointing data in Spark, in which the data is written out as a named Hive table in a database, rather than to the checkpointing location.

### A quick recap of persisting

Persisting in Spark is where we store the data at an intermediate point of the code in memory or on disk. This is generally done with `.cache()` (to store the data in memory) or `.checkpoint()` (to write it to disk). If you are not familiar with the concept of persisting, please read the [Persisting in Spark notebook](http://np2rvlapxx507/DAP_CATS/Training/more-spark/blob/master/more_spark/notebooks/material/persist.ipynb) which explains these concepts in more detail.

Importantly, if your code is short and non-complex then it is unlikely you will need any form of persisting in your code, but if you have long and complex code sensible persisting can help make it more efficient.

### Staging tables: the concept

You can write a staging table to HDFS with `df.write.mode("overwrite").saveAsTable(table_name, format="parquet")` or `df.write.insertInto(table_name, overwrite=True)`(of course, if using `.insertInto()` you will need to create the table first). You can then read the table back in with `spark.read.table()`. Like with checkpointing, this will break the lineage of the DataFrame, and therefore they can be useful in large, complex pipelines, or those that involve processes in a loop. As Spark is more efficient at reading in tables than CSV files, another use case is staging CSV files as tables at the start of your code before doing any complex calculations.

Staging has some advantages over checkpointing:
- The same table can be overwritten, meaning there is no need to clean up old checkpointed files
- It is stored in a location that is easier to access, rather than the checkpointing folder, which can help with debugging and testing changes to the code
- They can be re-used elsewhere
- If `.insertInto()` is used, you can take advantage of the table schema, as an exception will be raised if the DataFrame and table schemas do not match
- It is more efficient for Spark to read Hive tables than CSV files as the underlying format is Parquet, so if your data are delivered as CSV files you may want to stage them as Hive tables first. For more information see the [Storing as a Parquet tip](http://np2rvlapxx507/DAP_CATS/troubleshooting/tip-of-the-week/blob/master/tip_18_parquet.ipynb).

There are also some disadvantages:
- Takes longer to write the code
- More difficult to maintain, especially if `.insertInto()` is used, as you will have to alter the table if the DataFrame structure changes
- Ensure that you are not using them unnecessarily (the same is true with any method of persisting data)

The examples here use PySpark, but the same principles apply to R users who are using sparklyr in DAP.

### Example

Our example will be very simple, and show how to read a CSV file, perform some basic data cleansing, then stage as a Hive table, and then read it back in as a DataFrame. 

Often staging tables are most useful in large, complex pipelines; for obvious reasons our example will instead be simple!

First, import the relevant modules and create a Spark session:

In [1]:
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = (
    SparkSession.builder.appName("staging-tables-tip")
    .config("spark.executor.memory", "1g")
    .config("spark.executor.cores", 1)
    .config("spark.dynamicAllocation.enabled", "true")
    .config("spark.dynamicAllocation.maxExecutors", 3)
    .config("spark.sql.shuffle.partitions", 12)
    .config("spark.shuffle.service.enabled", "true")
    .config("spark.ui.showConsoleProgress", "false")
    .enableHiveSupport()
    .getOrCreate()
)

Now read in the CSV:

In [2]:
df = spark.read.csv("/training/animal_rescue.csv", header=True)

Then do some preparation: drop and rename some columns, change the format, then sort.

Note that if saving as a Hive table there are some stricter rules, including:
- Some characters aren't allowed in column names, including `£`
- The table won't load in the browser in HUE if you use date, but will accept a timestamp

We then preview the DataFrame with `.toPandas()` (remember to use `.limit()` when looking at data in this way):

In [3]:
df = (df.
    drop(
        "WardCode", 
        "BoroughCode", 
        "Easting_m", 
        "Northing_m", 
        "Easting_rounded", 
        "Northing_rounded")
    .withColumnRenamed("PumpCount", "EngineCount")
    .withColumnRenamed("FinalDescription", "Description")
    .withColumnRenamed("HourlyNotionalCost(£)", "HourlyCost")
    .withColumnRenamed("IncidentNotionalCost(£)", "TotalCost")
    .withColumnRenamed("OriginofCall", "OriginOfCall")
    .withColumnRenamed("PumpHoursTotal", "JobHours")
    .withColumnRenamed("AnimalGroupParent", "AnimalGroup")
    .withColumn(
        "DateTimeOfCall", F.to_timestamp(F.col("DateTimeOfCall"), "dd/MM/yyyy"))
    .orderBy("IncidentNumber")
    )

df.limit(5).toPandas()

Unnamed: 0,IncidentNumber,DateTimeOfCall,CalYear,FinYear,TypeOfIncident,EngineCount,JobHours,HourlyCost,TotalCost,Description,AnimalGroup,OriginOfCall,PropertyType,PropertyCategory,SpecialServiceTypeCategory,SpecialServiceType,Ward,Borough,StnGroundName,PostcodeDistrict
0,000014-03092018M,2018-09-03,2018,2018/19,Special Service,2.0,3.0,333,999.0,,Unknown - Heavy Livestock Animal,Other FRS,Animal harm outdoors,Outdoor,Other animal assistance,Animal harm involving livestock,CARSHALTON SOUTH AND CLOCKHOUSE,SUTTON,Wallington,CR8
1,000099-01012017,2017-01-01,2017,2016/17,Special Service,1.0,2.0,326,652.0,DOG WITH HEAD STUCK IN RAILINGS CALLED BY OWNER,Dog,Person (mobile),Railings,Outdoor Structure,Other animal assistance,Assist trapped domestic animal,BROMLEY TOWN,BROMLEY,Bromley,BR2
2,000260-01012017,2017-01-01,2017,2016/17,Special Service,1.0,1.0,326,326.0,BIRD TRAPPED IN NETTING BY THE 02 SHOP AND NEA...,Bird,Person (land line),Single shop,Non Residential,Animal rescue from height,Animal rescue from height - Bird,Fairfield,CROYDON,Croydon,CR0
3,000375-01012017,2017-01-01,2017,2016/17,Special Service,1.0,2.0,326,652.0,DOG STUCK IN HULL OF DERELICT BOAT - WATER RES...,Dog,Person (mobile),Barge,Boat,Animal rescue from water,Animal rescue from water - Domestic pet,BRENTFORD,HOUNSLOW,Chiswick,TW8
4,000477-01012017,2017-01-01,2017,2016/17,Special Service,1.0,1.0,326,326.0,DEER TRAPPED IN RAILINGS JUNCTION WITH DENNIS ...,Deer,Person (mobile),Animal harm outdoors,Outdoor,Other animal assistance,Animal assistance involving wild animal - Othe...,STANMORE PARK,HARROW,Stanmore,HA7


Let's look at the plan with `df.explain()`. This displays what precisely Spark will do once an action is called (*lazy evaluation*). This is a simple example but in long pipelines this plan can get complicated. Using a staging table can split this process, referred to as *cutting the lineage*.

In [4]:
df.explain()

== Physical Plan ==
*(2) Sort [IncidentNumber#10 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(IncidentNumber#10 ASC NULLS FIRST, 12)
   +- *(1) Project [IncidentNumber#10, cast(unix_timestamp(DateTimeOfCall#11, dd/MM/yyyy, Some(Etc/UTC)) as timestamp) AS DateTimeOfCall#229, CalYear#12, FinYear#13, TypeOfIncident#14, PumpCount#15 AS EngineCount#82, PumpHoursTotal#16 AS JobHours#187, HourlyNotionalCost(£)#17 AS HourlyCost#124, IncidentNotionalCost(£)#18 AS TotalCost#145, FinalDescription#19 AS Description#103, AnimalGroupParent#20 AS AnimalGroup#208, OriginofCall#21 AS OriginOfCall#166, PropertyType#22, PropertyCategory#23, SpecialServiceTypeCategory#24, SpecialServiceType#25, Ward#27, Borough#29, StnGroundName#30, PostcodeDistrict#31]
      +- *(1) FileScan csv [IncidentNumber#10,DateTimeOfCall#11,CalYear#12,FinYear#13,TypeOfIncident#14,PumpCount#15,PumpHoursTotal#16,HourlyNotionalCost(£)#17,IncidentNotionalCost(£)#18,FinalDescription#19,AnimalGroupParent#20,OriginofCall#21,P

Now save the DataFrame as table, using `mode("overwrite")`, which overwrites the existing table if there is one. The first time you create a staging table this option will be redundant, but on subsequent runs on the code you will get an error without this as the table will already exist.

In [5]:
username = os.getenv('HADOOP_USER_NAME') 
table_name = f"train_tmp.staging_example_{username}"

df.write.mode("overwrite").saveAsTable(table_name, format="parquet")

Now read the data in again and preview:

In [6]:
df = spark.read.table(table_name)
df.limit(5).toPandas()

Unnamed: 0,IncidentNumber,DateTimeOfCall,CalYear,FinYear,TypeOfIncident,EngineCount,JobHours,HourlyCost,TotalCost,Description,AnimalGroup,OriginOfCall,PropertyType,PropertyCategory,SpecialServiceTypeCategory,SpecialServiceType,Ward,Borough,StnGroundName,PostcodeDistrict
0,000014-03092018M,2018-09-03,2018,2018/19,Special Service,2.0,3.0,333,999.0,,Unknown - Heavy Livestock Animal,Other FRS,Animal harm outdoors,Outdoor,Other animal assistance,Animal harm involving livestock,CARSHALTON SOUTH AND CLOCKHOUSE,SUTTON,Wallington,CR8
1,000099-01012017,2017-01-01,2017,2016/17,Special Service,1.0,2.0,326,652.0,DOG WITH HEAD STUCK IN RAILINGS CALLED BY OWNER,Dog,Person (mobile),Railings,Outdoor Structure,Other animal assistance,Assist trapped domestic animal,BROMLEY TOWN,BROMLEY,Bromley,BR2
2,000260-01012017,2017-01-01,2017,2016/17,Special Service,1.0,1.0,326,326.0,BIRD TRAPPED IN NETTING BY THE 02 SHOP AND NEA...,Bird,Person (land line),Single shop,Non Residential,Animal rescue from height,Animal rescue from height - Bird,Fairfield,CROYDON,Croydon,CR0
3,000375-01012017,2017-01-01,2017,2016/17,Special Service,1.0,2.0,326,652.0,DOG STUCK IN HULL OF DERELICT BOAT - WATER RES...,Dog,Person (mobile),Barge,Boat,Animal rescue from water,Animal rescue from water - Domestic pet,BRENTFORD,HOUNSLOW,Chiswick,TW8
4,000477-01012017,2017-01-01,2017,2016/17,Special Service,1.0,1.0,326,326.0,DEER TRAPPED IN RAILINGS JUNCTION WITH DENNIS ...,Deer,Person (mobile),Animal harm outdoors,Outdoor,Other animal assistance,Animal assistance involving wild animal - Othe...,STANMORE PARK,HARROW,Stanmore,HA7


The DataFrame has the same structure as previously, but when we look at the plan with `df.explain()` we can see that less is being done. This is an example of cutting the lineage and can be useful when you have complex plans.

In [7]:
df.explain()

== Physical Plan ==
*(1) FileScan parquet train_tmp.staging_example_princa[IncidentNumber#310,DateTimeOfCall#311,CalYear#312,FinYear#313,TypeOfIncident#314,EngineCount#315,JobHours#316,HourlyCost#317,TotalCost#318,Description#319,AnimalGroup#320,OriginOfCall#321,PropertyType#322,PropertyCategory#323,SpecialServiceTypeCategory#324,SpecialServiceType#325,Ward#326,Borough#327,StnGroundName#328,PostcodeDistrict#329] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://dnt01/training/train_tmp/hive/staging_example_princa], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<IncidentNumber:string,DateTimeOfCall:timestamp,CalYear:string,FinYear:string,TypeOfInciden...


### Using `.insertInto()`

Another method is to create an empty table and then use `.insertInto()`; here we will just use a small number of columns as an example:

In [8]:
small_table = f"train_tmp.staging_small_{username}"

spark.sql(f"""
    CREATE TABLE {small_table} (
        IncidentNumber STRING,
        CalYear INT,
        EngineCount INT,
        AnimalGroup STRING
    )
    STORED AS PARQUET
    """)

DataFrame[]

Note that the columns will be inserted by position, not name, so it's a good idea to re-select the column order to match that of the table before inserting in:

In [9]:
col_order = spark.read.table(small_table).columns
df.select(col_order).write.insertInto(small_table, overwrite=True)

This can then be read in as before:

In [10]:
df = spark.read.table(small_table)
df.show(5)

+----------------+-------+-----------+--------------------+
|  IncidentNumber|CalYear|EngineCount|         AnimalGroup|
+----------------+-------+-----------+--------------------+
|000014-03092018M|   2018|          2|Unknown - Heavy L...|
| 000099-01012017|   2017|          1|                 Dog|
| 000260-01012017|   2017|          1|                Bird|
| 000375-01012017|   2017|          1|                 Dog|
| 000477-01012017|   2017|          1|                Deer|
+----------------+-------+-----------+--------------------+
only showing top 5 rows



Finally we will drop the tables used in this example, which we can do with the `DROP` SQL statement. This is much easier than deleting a checkpointed file.

Of course, with staging tables you generally want to keep the table, but just overwrite the data each time, so this step often won't be needed.

Always be very careful when using `DROP` as this will delete the table without warning!

In [11]:
spark.sql(f"DROP TABLE {table_name}")
spark.sql(f"DROP TABLE {small_table}")

DataFrame[]

## Further Resources

Notebooks:
- [Persisting in Spark](http://np2rvlapxx507/DAP_CATS/Training/more-spark/blob/master/more_spark/notebooks/material/persist.ipynb)
- [Storing as a Parquet](http://np2rvlapxx507/DAP_CATS/troubleshooting/tip-of-the-week/blob/master/tip_18_parquet.ipynb)

Functions:
- [df.write.insertInto()](https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.insertInto)
- [df.write.saveAsTable()](https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.saveAsTable)
- [spark.read.csv()](https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.csv)
- [spark.read.table()](https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.table)

Other material:
- <a href="https://en.wikipedia.org/wiki/Staging_(data)">Staging (data) article on Wikipedia</a>