# Hadoop
- Open Source
- Distributed store and computing
- Can scale to petabytes of data
- Consists of
    - Hadoop Distributed File System (HDFS)
    - MapReduce
    
Source: LinkedIn: big-data-analytics-with-hadoop-and-apache-spark

Practical on 
- Ambari sandbox
- Zeppelin notebook

## Hadoop Distributed File System (HDFS)
- good and cheap option to store large amount of data
- provides scaling, security and cost benefit
- suitable for enterprises with in-house data centers
- Cloud Alternatives - Amazon S3, Oracle OSS, Google Cloud Storage

## MapReduce
- Scales Horizontally
- Very Slow as it uses disk storage for internediate caching instead of memory
- Faster Alternatives - Apache Spark, Apache Flink
- These alternatives also has growing list of supporting libraries

## Apache Spark
- Open Source
- Large scale distributed data processing engine
- Uses memory to speed up computations
- Batch, streaming, ML and graph capabilities
- Support Scala, Java, Python and R 
- Most popular big data platform today

## Hadoop and Spark
- Spark is well integrated with Hadoop
- Spark can access and process data HDFS using 
    - parallel nodes
    - read optimization to use less memory and I/O
    - use HDFS for intermediate Data Caching
    - Yarn provides single cluster management for both HDFS and Spark

## HDFS Data Modelling for Analytics

#### 1. Hadoop Storage Formats
- Raw Texts (blobs)
- Structured text files (csv, xml, json)
- Sequence Files
- Avro
- ORC
- Parquet

Text Files:  
- Simple to read/write
- Low performance - no parallel operations
- More Storage
- No schema

Avro:  
- Language neutral data serialization
- Row format
- Self-describing schema support
- Compressible
- Splittable
- Ideal for multi-language support

Parquet:  
- Columnar Format
- Read-only selected columns (saves I/O)
- Schema Support
- Compressible (column level) and Splittable
- Supports nested Data Structures
- Ideal for analytics applications  

Parquet for Analytics:  
- provides overall better performance and flexibility for analytics applications

#### 2. Hadoop Compression Options
- Snappy
    - Compression codec developed by google
    - Moderate Compression
    - Excellent read/write performance
    - Compresses entire file
    - Not splittable so dont support parallel operations
- LZO
    - Moderate compression
    - Excellent processing performance
    - Splittable - Support parallel processing
    - requires separate license
- GZIP
    - Very good compression
    - Moderate processing performance
    - Not Splittable
    - Ideal for container type applications
- bzip2
    - Excellent compression
    - Slower processing performance 
    - Splittable
    - Ideal for archival type applications

#### 3. Partitioning
- HDFS does not have the concept of indexes
- Even for reading one row, the entire file should be read
- Partitioning provides a way to read only a subset of data
- Multiple attributes can be used for hierarchical partitioning
- Split data into directories based on individual values of attributes

<img src="Image/partitioning.JPG" width="600" />

- choose attributes with a limited set of values and those that are most used in SELECT filters
- otherwise many sub directories will be created

#### 4. Bucketing
- Partitioning is optimal when an attribute is having small set of unique values
- What if we need to partition based on a key having large no of values
- Similar to Partioning but instead of value it uses a Hash function to convert value to a Hash key
- Controls number of unique directories created
- Even disribution
- Choose attributes with large number of unique values and those that are most used in SELECT filters.

#### 5. HDFS Schema Designing and Storage Best Practices
- understand the data whether its read intensive or write intensive or both
- determine what needs optimization and what can be compromised (reduce storage req or compromise storage for better read/write performance)
- choose options carefully as they cant be changed easily
- run tests on actual data to understand performance and storage caracteristics
- choose partitioning and bucketing keys wisely

# 1. Exercises - Data Ingestion

### 01. Reading Files into Spark
Data can be read into Apache Spark data frames from a variety of data sources.

examples :

- A flat file on a local disk
- A file from HDFS
- A Kafka Topic
- In this example, we will read a CSV file in a HDFS folder into a Spark Data Frame.

In [4]:
#Read the raw CSV file int a Spark DataFrame
#    Use inferSchema to infer the schema automatically from the CSV file
val rawSalesData = spark
                .read
                .option("inferSchema", "true")
                .option("header", "true")
                .csv("/user/raj_ops/raw_data/sales_orders.csv");

#Print the schema for verification
rawSalesData.printSchema();

#Print the first 5 records for verification
rawSalesData.show(5)

### 02.Writing to HDFS
Write the rawSalesData Data Frame into HDFS as a Parquet file. Use Parquet as the format since it enables splitting and filtering. Use GZIP as the compression codec.

On completion, verify if the files are correctly through HDFS command line or Ambari

In [None]:
#Write to Sales Data to HDFS for future processing

rawSalesData.write
            .format("parquet")
            .mode("overwrite")
            .option("compression", "gzip")
            .save("/user/raj_ops/raw_parquet");
            

### 03. Writing to HDFS with Partitioning
Write a partitioned Parquet file in HDFS. Partition will be done by Product. This will create one directory per unique product available in the raw CSV. Verify through HDFS command line or Ambari
            

In [None]:
rawSalesData.write
            .format("parquet")
            .mode("overwrite")
            .option("compression", "gzip")
            .partitionBy("Product")
            .save("/user/raj_ops/partitioned_parquet")

### 04. Writing to Hive with Bucketing
Create a Bucketed Hive table for orders. Bucketing will be done by Product. It will create 3 buckets based on the hash generated by Product. Hive tables can be queried through SQL.

In [None]:
#Create a Hive Table for sales data with 2 buckets.
rawSalesData.write
            .format("parquet")
            .mode("overwrite")
            .bucketBy(3, "Product")
            .saveAsTable("product_bucket_table")
            
#Data goes in here.
println("Hive Data Stored in : " + sc.getConf.get("spark.sql.warehouse.dir") + "\n")
            
#Read through SQL
sql("SELECT * FROM product_bucket_table where Product='Mouse'").show(5)

### Data Ingestion with Spark Best Practices
- Enable parallelism for max write performance 
    - use splitable file format like parquet
    - use partitions or buckets
- Use APPEND for incremental data ingestion
- External data reads - use resources that provide parallelism
    - e.g: JDBC, Kafka
    - Break down large files into smaller files

# Spark

## How Spark Works - Architecture

- Spark programs run in driver node which use Spark cluster to execute them
- Spark cluster have multiple Executer node which execute programs in parallel

<img src="Image/spark_arc.JPG" width="600" />

## Spark Execution Plan
- Lazy Execution - only an action triggers execution
- Spark optimizer comes with a Physical Plan
- Physical plan optimizes for:
    - Reduced I/O
    - Reduced Shuffling
    - Reduced Memory Usage
- Spark executers can read and write directly from external sources when they support parallel I/O and reduce memory req at the driver
    - e.g: HDFS, Kafka, JDBC

# 2. Exercises - Data Extraction

### 01. Read Parquet Files into Spark
Read a non-partitioned Parquet file into Spark. Measure the time taken. Also look at the execution plan.

In [None]:
#Read the file
val salesParquet = spark.read
                        .parquet("/user/raj_ops/raw_parquet")

#Display the results and time the operation                     
spark.time(salesParquet.show(5))

#Show the execution Plan
println("\n-------------------------------EXPLAIN------------------------------------")
salesParquet.explain
println("-------------------------------END EXPLAIN--------------------------------\n")

### 02. Read Partitioned Data into Spark


In [None]:
#Read all the partitions. Use basePath to have partition key as part of data
val salesPartitioned = spark.read
                            .option("basePath", "/user/raj_ops/partitioned_parquet/")
                            .parquet("/user/raj_ops/partitioned_parquet/*")
                            
#Display the results and time the operation                     
spark.time(salesPartitioned.show(5))

#Show the execution Plan
println("\n-------------------------------EXPLAIN------------------------------------")
salesPartitioned.explain
println("-------------------------------END EXPLAIN--------------------------------\n")

In [None]:
#Read a specific partition only
val salesHeadset = spark.read
                            .parquet("/user/raj_ops/partitioned_parquet/Product=Headset")

#Display the results and time the operation                     
spark.time(salesHeadset.show(5))

#Show the execution Plan
println("\n-------------------------------EXPLAIN------------------------------------")
salesHeadset.explain
println("-------------------------------END EXPLAIN--------------------------------\n")

### 03. Read Bucketed Data into Spark

In [None]:
#Read data from Hive
val salesBucketed = sql("SELECT * FROM product_bucket_table")

#Display the results and time the operation                     
spark.time(salesBucketed.show(5))

#Show the execution Plan
println("\n-------------------------------EXPLAIN------------------------------------")
salesBucketed.explain
println("-------------------------------END EXPLAIN--------------------------------\n")

### Data Extraction with Spark Best Practices
- reduce data read into memory by:
    - using filters based on partition key
    - reading only required columns
- use data sources and file formats that support parallelism
- keep number of partitions >= (No. of executors * No. of cores per executor)

# 3. Exercises - Optimizing Spark Processing

### 01. Pushing down Projections
When downstream queries/processing only looks for a subset of columns, Spark optimizer is smart enough to identify them and only read those columns into the in-memory data frame. This saves on I/O and memory. This is called Projection Push down. While building data pipelines, it helps to be aware of how Spark works and take advantage of this for optimization.

In [None]:
#Read sales data from partitioned parquet file
val salesData = spark.read
                .option("basePath", "/user/raj_ops/partitioned_parquet/")
                .parquet("/user/raj_ops/partitioned_parquet/*")
                
#Projection gets pushed down to the file scan
println("-------------------------------EXPLAIN------------------------------------")
salesData.select("Product","Quantity").explain
println("-------------------------------END EXPLAIN--------------------------------\n")


### 02. Pushing down Filters
When downstream queries/processing only looks for a subset of subset, Spark optimizer is smart enough to identify them and only read those columns into the in-memory data frame. This saves on I/O and memory. This is called Filter Push down. This works for both partition columns (Product) and non-partition columns (Customer). While building data pipelines, it helps to be aware of how Spark works and take advantage of this for optimization.

In [None]:
val mouse = salesData.where($"Product" === "Mouse")

println("-------------------------------EXPLAIN Filter by Partition------------------------------------")
mouse.explain
println("-------------------------------END EXPLAIN--------------------------------\n")


val google=salesData.where( $"Customer" === "Google")

println("-------------------------------EXPLAIN Filter without Partition------------------------------------")
google.explain
println("-------------------------------END EXPLAIN--------------------------------\n")

println();

### 03.Partitioning and coalescing
- While performing actions, Spark creates results with the default partition count. In the case of Local mode, its usually equal to the number of cores. In the case of Clusters, the default is 200. This can be too much, if the number of cores in the cluster is significantly less than the number of partitions. So repartitioning helps to set the optimal number of partitions.

- Repartition does a full reshuffle and can be used for increasing/decreasing partitions.

- Coalasce simply consolidates existing partitions and avoids a full reshuffle. It can be used to decrease the number of partitions.

- Repartition and Coalasce themselves take significant time and resources. Do them only if multiple steps downstream will benefit from them.

In [None]:
println("Default parallelism : " + sc.defaultParallelism + "\n")
//Optimal number of partitions = # of cores available.

println("Partitions in SalesData from Parquet : " + salesData.rdd.getNumPartitions + "\n")

//Read file without parallelizing
val rawSalesData = spark
                .read
                .option("inferSchema", "true")
                .option("header", "true")
                .csv("/user/raj_ops/raw_data/sales_orders.csv");
                
println("Partitions in raw CSV Read :" + rawSalesData.rdd.getNumPartitions + "\n")

//Repartition to 8 partitions
val partitionedSalesData = rawSalesData.repartition(8)

println("Partitions after repartitioning :" + partitionedSalesData.rdd.getNumPartitions + "\n")

//Coalesce to 3 partitions
val coalasedSalesData = partitionedSalesData.coalesce(3)

println("Partitions after coalese :" + coalasedSalesData.rdd.getNumPartitions + "\n")

### 04.Managing Shuffling
- Actions trigger shuffling. Shuffling takes time, memory and bandwidth. While building pipelines focus on

    - Minimize number of shuffles
    - Do actions late in the pipeline after data has been filtered.
    - Use aggregations by partition key as much as possible, as records with the same partition key stays in the same executor node.

In [None]:
val wordsRDD=spark.sparkContext
                .parallelize(Seq("Google", "Apple", "Apple", "Google", 
                    "Google", "Apple", "Apple", "Apple", "Apple", "Apple"))
                    

#Doing groupBy first. Shuffling has more data. Check DAG

print("For using group by key : ")
val groupRDD = spark.time(
                wordsRDD.map( word => (word, 1) )
                    .groupByKey()
                    .map( words => (words._1, words._2.sum ))
                    .collect())

#Doing reduce. Shuffling has less data. Check DAG
print("For using reduce : ")
var reduceRDD = spark.time(
                wordsRDD.map( word => (word, 1) )
                    .reduceByKey(_+_).collect())

                
#See content generated by groupByKey and reduceByKey
println("\nData shuffled after Group by Key : ")
wordsRDD.map( word => (word, 1) )
                    .groupByKey()
                    .collect()
                    .foreach(println)

println("\nData shuffled after Reduce by Key: ")                    
wordsRDD.map( word => (word, 1) )
                    .reduceByKey(_+_)
                    .collect()
                    .foreach(println)

### 05. Optimizing Joins
- By default, joining two data frames require a lot of shuffling. If one data frame is considerably small, a better option is to broadcast that data frame to all the executors and then use those copies to join locally. Spark Optimizer chooses Broadcast joins when possible. Data frames within spark.sql.autoBroadcastJoinThreshold are automatically broadcasted
- recommended to use denormalize data and avoid joins if possible

In [None]:
val products = spark
                .read
                .option("inferSchema", "true")
                .option("header", "true")
                .csv("/user/raj_ops/raw_data/product_vendor.csv");
                
products.show()

import org.apache.spark.sql.functions.broadcast
println("-------------------------------EXPLAIN------------------------------------")
salesData.join(broadcast(products),"Product").explain
println("-------------------------------END EXPLAIN--------------------------------\n")

### 06.Caching in Spark/Storing Intermediate Results
- By default, every time an action is performed, Spark executes all the previous steps right from the data read. This can end up being very expensive, especially while using Spark in a development or interactive mode. A better option is to cache intermediate results. 
- Two types of Caching
    - Caching - Spark can cache in memory. 
    - Persistance - It can also persist in both memory and disk. While running under YARN, persistance happens in HDFS by default.

In [None]:
#In memory only
wordsRDD.cache()
#Trigger an action for caching
wordsRDD.collect()

println("\nPlan before caching intermediate results :")
val dataBefore = coalasedSalesData.where($"Product" === "Mouse")
dataBefore.explain

#Store on disk
import org.apache.spark.storage.StorageLevel
coalasedSalesData.persist(StorageLevel.DISK_ONLY)

#Trigger an action for persisting.
coalasedSalesData.count()

println("\nPlan after caching :")
val dataAfter = coalasedSalesData.where($"Product" === "Mouse")
dataAfter.explain

### Data Processing Best Practices
- Push down filters and projections to data sources as much as possible
- Choose partition keys , based on columns most used for filters and aggregations
- Repartition or Coalesce only if multiple transforms
- Avoid joins and use denormalize data
- Time operations using spark.time() on production equivalent data
- Use Caching when appropriate 
- Use Explain to understand the physical plan