# Big Data Optimisation in Object storage using different file formats

Data access in efficient way is one of the prominent factors for having a good performance data processing pipeline . Identifying the layout of data values in the filesystem has basic impacts on the overall performance of data access. We are going to show the insights on how data layout affects the performance of data access. We will differentiate the difference between different columnar file formats and row file formats, explain how to use them efficiently to store data values. Then we would demonstrate the best practice on how to store datasets which would include the concepts partitioning and bucketing.

The main objective is to find the cost reducing and efficient way to reduce the capital expense and increase the operating efficiency while working with the Big data in cloud object storages(S3 , IBM COS)

This notebook will be a guide to Optimising Bigdata queries using denormalization and different file formats(CSV,Json and Parquet etc). It is expected that you will run this notebook on a local jupyter environment or from DSX . 

We would be using the datasets from movielens(https://grouplens.org/datasets/movielens/20m/) as a data source for analysis. 

The main tests we are going to do:

- Test 1: Time how long it takes to read in two csv datasets, join them and then do a count on the resulting (denormalised) dataset.  
   - Note: The denormalised dataset from step 1 is saved to cloud object storage as a CSV, Partioned Parquet and JSON and used in subsequent steps
- Test 2: Time how long it takes to read the denormalised csv dataset from COS.  This should be quicker than step 1 because CSV is very naive and simple,reading directly from it will be very quick;select, modify and manipulate data would add much time cost to your call
- Test 3: Time how long it takes to read the denormalised json dataset from COS.  This should be less quicker than step 2 and step 1 because CSV file size is less than JSON and less data needs to be read from the disk .
- Test 4: Time how long it takes to read the denormalised parquet dataset from COS.  This should be much quicker than step 1,2 and 3 because it provides columnar compression, which saves storage space and allows for reading individual columns instead of entire files.


### Load the main data sets

In this section we load two datasets, ratings.csv and movies.csv

In [13]:
# Load the ratings.csv file to Object storage as a Spark dataset from https://grouplens.org/datasets/movielens/20m/ 
import ibmos2spark

credentials = {
    'endpoint': 'https://s3-api.us-geo.objectstorage.service.networklayer.com',
    'api_key': 'fYpsl9yOVZc4nCZkGsO8c4EVFwIA9chGtafX6bva8vRn',
    'service_id': 'iam-ServiceId-8312f186-2ed5-4d25-a8b9-744a2f5b0434',
    'iam_service_endpoint': 'https://iam.ng.bluemix.net/oidc/token'}

configuration_name = 'os_3269765585d54624a3f9e9ca7b7a831e_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df_data_1 = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(cos.url('ratings.csv', 'optimisationofbigdataperformanc264b8c876f09466290ceff9606e7dd66'))
df_data_1.take(5)

[Row(userId='1', movieId='2', rating='3.5', timestamp='1112486027'),
 Row(userId='1', movieId='29', rating='3.5', timestamp='1112484676'),
 Row(userId='1', movieId='32', rating='3.5', timestamp='1112484819'),
 Row(userId='1', movieId='47', rating='3.5', timestamp='1112484727'),
 Row(userId='1', movieId='50', rating='3.5', timestamp='1112484580')]

In [14]:
df_data_1.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      2|   3.5|1112486027|
|     1|     29|   3.5|1112484676|
|     1|     32|   3.5|1112484819|
|     1|     47|   3.5|1112484727|
|     1|     50|   3.5|1112484580|
|     1|    112|   3.5|1094785740|
|     1|    151|   4.0|1094785734|
|     1|    223|   4.0|1112485573|
|     1|    253|   4.0|1112484940|
|     1|    260|   4.0|1112484826|
|     1|    293|   4.0|1112484703|
|     1|    296|   4.0|1112484767|
|     1|    318|   4.0|1112484798|
|     1|    337|   3.5|1094785709|
|     1|    367|   3.5|1112485980|
|     1|    541|   4.0|1112484603|
|     1|    589|   3.5|1112485557|
|     1|    593|   3.5|1112484661|
|     1|    653|   3.0|1094785691|
|     1|    919|   3.5|1094785621|
+------+-------+------+----------+
only showing top 20 rows



In [15]:
# load movies.csv file to Object storage as a Spark dataset from https://grouplens.org/datasets/movielens/20m/ 

df_data_3 = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(cos.url('movies.csv', 'optimisationofbigdataperformanc264b8c876f09466290ceff9606e7dd66'))
df_data_3.take(5)


[Row(movieId='1', title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'),
 Row(movieId='2', title='Jumanji (1995)', genres='Adventure|Children|Fantasy'),
 Row(movieId='3', title='Grumpier Old Men (1995)', genres='Comedy|Romance'),
 Row(movieId='4', title='Waiting to Exhale (1995)', genres='Comedy|Drama|Romance'),
 Row(movieId='5', title='Father of the Bride Part II (1995)', genres='Comedy')]

In [16]:
df_data_3.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

### Test 1 - Join datasets at query time

In this section we want to see how long it takes to load two separate datasets and join them

Denormalize the two datasets df_data_1 and df_data_3 using a join 

In [17]:
df_data_4 = df_data_1.join(df_data_3,"movieId")

In [18]:
df_data_4.show()

+-------+------+------+----------+--------------------+--------------------+
|movieId|userId|rating| timestamp|               title|              genres|
+-------+------+------+----------+--------------------+--------------------+
|      2|     1|   3.5|1112486027|      Jumanji (1995)|Adventure|Childre...|
|     29|     1|   3.5|1112484676|City of Lost Chil...|Adventure|Drama|F...|
|     32|     1|   3.5|1112484819|Twelve Monkeys (a...|Mystery|Sci-Fi|Th...|
|     47|     1|   3.5|1112484727|Seven (a.k.a. Se7...|    Mystery|Thriller|
|     50|     1|   3.5|1112484580|Usual Suspects, T...|Crime|Mystery|Thr...|
|    112|     1|   3.5|1094785740|Rumble in the Bro...|Action|Adventure|...|
|    151|     1|   4.0|1094785734|      Rob Roy (1995)|Action|Drama|Roma...|
|    223|     1|   4.0|1112485573|       Clerks (1994)|              Comedy|
|    253|     1|   4.0|1112484940|Interview with th...|        Drama|Horror|
|    260|     1|   4.0|1112484826|Star Wars: Episod...|Action|Adventure|...|

Analysing the time how long it takes to do a count of denormalised dataset,joined at query time  which took 34.3 Seconds .

In [19]:
df_data_4.count()

20000263

Analysing the time how long it takes to do a count of records from data set joined at the time of querying grouped by rating attribute , it took 35.69

In [20]:
df_data_4.groupBy("rating").count().show()

+------+-------+
|rating|  count|
+------+-------+
|   1.0| 680732|
|   4.5|1534824|
|   2.5| 883398|
|   3.5|2200156|
|   5.0|2898660|
|   0.5| 239125|
|   4.0|5561926|
|   1.5| 279252|
|   2.0|1430997|
|   3.0|4291193|
+------+-------+



### Test 2 :Converting and Reading the denormalised CSV dataset

In this section we want to see how long it takes to convert spark dataframe into CSV and analyse the query time

Converting the denormalised spark dataframe into CSV file with header 


In [21]:
df_data_4.write.csv('Sample2csv',header = 'true')
# PLease make sure to involve the option header = 'true' to get the header involved in CSV file 

AnalysisException: 'path file:/gpfs/global_fs01/sym_shared/YPProdSpark/user/s3ba-4a57de59dd0705-90c0cef5c9f0/notebook/work/Sample2csv already exists.;'

Making the csv file readable by Spark using csvFile

In [None]:
# Defining the CSV File as csvFile.
csvFile = spark.read.format("csv")\
  .option("header", "true")\
  .option("mode", "FAILFAST")\
  .option("inferSchema", "true")\
  .load("/gpfs/global_fs01/sym_shared/YPProdSpark/user/s3ba-4a57de59dd0705-90c0cef5c9f0/notebook/work/Sample2csv")

Analysing the time how long it takes to do a count of records in convertedCSV file, which took 6.5 Seconds .

In [None]:
csvFile.count()

Analysing the time how long it takes to do a count of records from csv file grouped by rating attribute , it took 7.12 sec 

In [None]:
countsByRating = csvFile.groupBy("rating").count().show()

### Test 3: Reading the denormalised Json dataset

In this section we want to see how long it takes from CSV to Json and analyse the query time

Converting the CSV file to Json file

In [None]:
csvFile.write.format("json").mode("overwrite")\
  .save("/gpfs/global_fs01/sym_shared/YPProdSpark/user/s3ba-4a57de59dd0705-90c0cef5c9f0/notebook/work/Sample.json")

Analysing the time how long it takes to do a count of records in converted Json file, which took 23.79 Seconds

In [22]:
spark.read.format("json").option("mode", "FAILFAST")\
  .option("inferSchema", "true")\
  .load("/gpfs/global_fs01/sym_shared/YPProdSpark/user/s3ba-4a57de59dd0705-90c0cef5c9f0/notebook/work/Sample.json").count()

20000263

Analysing the time how long it takes to do a count of records from josn file grouped by rating attribute , it took 16.12 sec 

In [23]:
spark.read.format("json").option("mode", "FAILFAST")\
  .option("inferSchema", "true")\
  .load("/gpfs/global_fs01/sym_shared/YPProdSpark/user/s3ba-4a57de59dd0705-90c0cef5c9f0/notebook/work/Sample.json").groupBy("rating").count().show()

+------+-------+
|rating|  count|
+------+-------+
|   3.5|2200156|
|   4.5|1534824|
|   2.5| 883398|
|   1.0| 680732|
|   4.0|5561926|
|   0.5| 239125|
|   3.0|4291193|
|   2.0|1430997|
|   1.5| 279252|
|   5.0|2898660|
+------+-------+



### Test 4: Reading the denormalised parquet dataset

In this section we want to see how long it takes to convert csv to parquet and analyse the query time

Conversion of CSV file to parquet file

In [None]:
csvFile.write.mode("overwrite").partitionBy("rating")\
  .save("/gpfs/global_fs01/sym_shared/YPProdSpark/user/s3ba-4a57de59dd0705-90c0cef5c9f0/notebook/work/partitioned-files.parquet")

Analysing the time how long it takes to do a count of records in converted parquet file, which took 1.81 Seconds

In [24]:
spark.read.format("parquet")\
  .load("/gpfs/global_fs01/sym_shared/YPProdSpark/user/s3ba-4a57de59dd0705-90c0cef5c9f0/notebook/work/partitioned-files.parquet").count()

20000263

In [None]:
spark.read.format("parquet")\
  .load("/gpfs/global_fs01/sym_shared/YPProdSpark/user/s3ba-4a57de59dd0705-90c0cef5c9f0/notebook/work/partitioned-files.parquet").show()

Analysing the time how long it takes to do a count of records in converted parquet file, which took 3.92 Seconds

In [25]:
spark.read.format("parquet")\
  .load("/gpfs/global_fs01/sym_shared/YPProdSpark/user/s3ba-4a57de59dd0705-90c0cef5c9f0/notebook/work/partitioned-files.parquet").groupBy("rating").count().show()

+------+-------+
|rating|  count|
+------+-------+
|   3.5|2200156|
|   4.5|1534824|
|   2.5| 883398|
|   1.0| 680732|
|   4.0|5561926|
|   0.5| 239125|
|   3.0|4291193|
|   2.0|1430997|
|   1.5| 279252|
|   5.0|2898660|
+------+-------+



### Conclusion 

Here we are concluding that parquet is the best file format we could use with spark in a cost reducing and efficient way to reduce the capital expense and increase the operating efficiency while working with the Big data in cloud object storages(S3 , IBM COS)