# Data formats (CSV vs. Parquet vs. ORC)


### Overview
Comparing different data formats for Dataframes.  We will evaluate JSON, Parquet and ORC format.

Background reads:
- [Spark data frames](https://spark.apache.org/docs/latest/sql-programming-guide.html)
- JSON format 
    - [wikipedia](https://en.wikipedia.org/wiki/JSON)
    - [json.org](http://json.org/)
- Parquet format
    - [Parquet project](https://parquet.apache.org/)
    - [parquet github](https://github.com/Parquet/parquet-format)
    - [presentation](http://www.slideshare.net/larsgeorge/parquet-data-io-philadelphia-2013)
- ORC format
    + [ORC project](https://orc.apache.org/)
    + [ORC explained](http://www.semantikoz.com/blog/orc-intelligent-big-data-file-format-hadoop-hive/)
    + [ORC performance](http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.3/bk_performance_tuning/content/hive_perf_best_pract_use_orc_file_format.html)

### Depends On 
None

### Run time
20-30 mins


## STEP-1: Verify Data

We will use the transaction data that you have generated before.

We will assume it is under `data/transactions/csv` directory.

If you don't have this, you can follow the **data-generation** labs to generate some data.  

If you need to generate data, you can use script `03-data-generator/datagen-tx-medium.py`

### Benchmarking tips

You can start with 1 million rows of data.  
To get accurate benchmark readings, use atleast **100 million rows** of data.



## STEP-2: Benchmarking Spreadsheet
Download and inspect **dataformats-benchmark.xlsx** (from this directory)

**We will be filling out the values in this spreadsheet**

## Step-3 - Init Spark

In [1]:
try:
    spark
except NameError:
    import findspark
    findspark.init()  # uses SPARK_HOME
    print("Spark found in : ", findspark.find())

    import pyspark
    from pyspark import SparkConf
    from pyspark.sql import SparkSession

    # use a unique tmep dir for warehouse dir, so we can run multiple spark sessions in one dir
    import tempfile
    tmpdir = tempfile.TemporaryDirectory()

    config = ( SparkConf()
             .setAppName("TestApp")
             .setMaster("local[*]")
             .set('executor.memory', '2g')
             .set('spark.sql.warehouse.dir', tmpdir.name)
             .set("some_property", "some_value") # another example
             )

    spark = SparkSession.builder.config(conf=config).getOrCreate()
    sc = spark.sparkContext

print('Spark UI running on port ' + spark.sparkContext.uiWebUrl.split(':')[2])

Spark found in :  /home/sujee/apps/spark
Spark UI running on port 4040


## Step-4: Monitoring processes

It is recommended that you use a process monitor, like `top` or `atop`, to keep an eye on the processes running while the data conversions are running

If you are on a linux based system, 

- open a terminal 
- invoke `top`  or `atop`

## STEP 5: Load Transaction data (CSV)

- **==> While the import is running take a look at `atop` terminal.  Which of the resources (cpu / memory / disk_ are we maxing out?**  


In [3]:
import time

t1 = time.perf_counter()

df_csv = spark.read.csv("../data/transactions/csv", header=True, inferSchema=True)

t2 = time.perf_counter()
print ("Loaded CSV in {:,.2f} ms ".format( (t2-t1)*1000))
print(df_csv)

Loaded CSV in 1,314.26 ms 
DataFrame[id: string, timestamp: string, mti: int, card_number: bigint, amount_customer: double, merchant_type: int, merchant_id: int, merchant_address: string, ref_id: string, amount_merchant: double, response_code: int]


## Step 6 - Save Data in Other formats

Our original data is in CSV.  We will save this data in json, parquet and orc formats.

As we save in each format, measure the time taken, and record it in the spreadsheet.


In [5]:
## JSON

import time

t1 = time.perf_counter()

df_csv.write.json('../data/transactions/json/', "overwrite")

t2 = time.perf_counter()
print ("Saved as json in {:,.2f} ms ".format( (t2-t1)*1000))


Saved as json in 727.32 ms 


In [14]:
## parquet

import time

t1 = time.perf_counter()

df_csv.write.parquet('../data/transactions/parquet/', "overwrite")

t2 = time.perf_counter()
print ("Saved as parquet in {:,.2f} ms ".format( (t2-t1)*1000))


Saved as parquet in 1,052.35 ms 


In [17]:
## orc

import time

t1 = time.perf_counter()

df_csv.write.orc('../data/transactions/orc/', "overwrite")

t2 = time.perf_counter()
print ("Saved as orc in {:,.2f} ms ".format( (t2-t1)*1000))


Saved as orc in 809.51 ms 


## Step-6: Read all the data formats back

And as we read each format, record the time it takes to load the data

In [10]:
## read CSV

import time

t1 = time.perf_counter()

df_csv = spark.read.csv("../data/transactions/csv", header=True, inferSchema=True)

t2 = time.perf_counter()
print ("Loaded CSV in {:,.2f} ms ".format( (t2-t1)*1000))


Loaded CSV in 513.01 ms 


In [12]:
## JSON

import time

t1 = time.perf_counter()

df_json = spark.read.json("../data/transactions/json")

t2 = time.perf_counter()
print ("Loaded json in {:,.2f} ms ".format( (t2-t1)*1000))

Loaded json in 420.41 ms 


In [16]:
## Parquet

import time

t1 = time.perf_counter()

df_parquet = spark.read.parquet("../data/transactions/parquet/")

t2 = time.perf_counter()
print ("Loaded parquet in {:,.2f} ms ".format( (t2-t1)*1000))

Loaded parquet in 64.44 ms 


In [19]:
## ORC

import time

t1 = time.perf_counter()

df_orc = spark.read.orc("../data/transactions/orc/")

t2 = time.perf_counter()
print ("Loaded orc in {:,.2f} ms ".format( (t2-t1)*1000))

Loaded orc in 114.80 ms 


### Step-7: Query

- **==> Find the max value of `amount_customer`**
- **==> Note the time it took to run the query, and record it in spreadsheet**
- **==> While the query is running, check `atop`**

The query is basically 

```python
df.agg(max('amount_customer')).show()
```

In [27]:
## CSV
import time

from pyspark.sql.functions import max


t1 = time.perf_counter()
df_csv.agg(max('amount_customer')).show()
t2 = time.perf_counter()

print ("MAX in csv in {:,.2f} ms ".format( (t2-t1)*1000))

+--------------------+
|max(amount_customer)|
+--------------------+
|              1009.9|
+--------------------+

MAX in csv in 266.31 ms 


In [28]:
## JSON
import time

from pyspark.sql.functions import max


t1 = time.perf_counter()
df_json.agg(max('amount_customer')).show()
t2 = time.perf_counter()

print ("MAX in json in {:,.2f} ms ".format( (t2-t1)*1000))

+--------------------+
|max(amount_customer)|
+--------------------+
|              1009.9|
+--------------------+

MAX in json in 325.14 ms 


In [29]:
## Parquet

import time

from pyspark.sql.functions import max


t1 = time.perf_counter()
df_parquet.agg(max('amount_customer')).show()
t2 = time.perf_counter()

print ("MAX in parquet in {:,.2f} ms ".format( (t2-t1)*1000))

+--------------------+
|max(amount_customer)|
+--------------------+
|              1009.9|
+--------------------+

MAX in parquet in 301.98 ms 


In [30]:
## ORC

import time

from pyspark.sql.functions import max


t1 = time.perf_counter()
df_orc.agg(max('amount_customer')).show()
t2 = time.perf_counter()

print ("MAX in orc in {:,.2f} ms ".format( (t2-t1)*1000))

+--------------------+
|max(amount_customer)|
+--------------------+
|              1009.9|
+--------------------+

MAX in orc in 253.23 ms 


## Step 8 : Compare Data Sizes

**==> Record the byte sizes in spreadsheet**  

In [32]:
# for human readable format
!  du -skh ../data/transactions/csv  \
    ../data/transactions/json \
    ../data/transactions/parquet  \
    ../data/transactions/orc

175M	../data/transactions/csv
296M	../data/transactions/json
88M	../data/transactions/parquet
83M	../data/transactions/orc


## Step-9: Run this benchmaring on a Hadoop cluster


## Step-10: Discussion

Discuss your findings with the class