In [1]:
sc

In [2]:
spark

In [4]:
hr_employee = spark.read.csv("file:///home/hadoop/Downloads/HR_Employee.csv", inferSchema=True,
                            header=True)

In [6]:
hr_employee.printSchema()

root
 |-- EmployeeID: integer (nullable = true)
 |-- Department: string (nullable = true)
 |-- JobRole: string (nullable = true)
 |-- Attrition: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- MaritalStatus: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- EducationField: string (nullable = true)
 |-- BusinessTravel: string (nullable = true)
 |-- JobInvolvement: string (nullable = true)
 |-- JobLevel: integer (nullable = true)
 |-- JobSatisfaction: string (nullable = true)
 |-- Hourlyrate: integer (nullable = true)
 |-- Income: integer (nullable = true)
 |-- Salaryhike: integer (nullable = true)
 |-- OverTime: string (nullable = true)
 |-- Workex: integer (nullable = true)
 |-- YearsSinceLastPromotion: integer (nullable = true)
 |-- EmpSatisfaction: string (nullable = true)
 |-- TrainingTimesLastYear: integer (nullable = true)
 |-- WorkLifeBalance: string (nullable = true)
 |-- Performance_Rating: string (nul

#### Big DataFile Types
    * Parquet FileFormat - records are stored Columnwise, this file format compreses dataset of .csv or structured format into parquet format. (Parquet Format will compress file format from original to compressed by reducing filesize.)
    * similar to this there are other file formats - orc, avro (stored records as keys and values format, schema format is stored as JSON format).

In [10]:
# Write csv file to local directory as parquet file format
hr_employee.write.parquet('file:///home/hadoop/Downloads/HR/')

In [13]:
!hdfs dfs -rm -r /home

Deleted /home


In [14]:
# Write csv file to local directory as parquet file format
hr_employee.write.orc('/HR_Data')

In [12]:
spark.read.orc("/HR_Data").show()

+----------+--------------------+--------------------+---------+------+---+-------------+-------------+--------------+-----------------+--------------+--------+---------------+----------+------+----------+--------+------+-----------------------+---------------+---------------------+---------------+------------------+
|EmployeeID|          Department|             JobRole|Attrition|Gender|Age|MaritalStatus|    Education|EducationField|   BusinessTravel|JobInvolvement|JobLevel|JobSatisfaction|Hourlyrate|Income|Salaryhike|OverTime|Workex|YearsSinceLastPromotion|EmpSatisfaction|TrainingTimesLastYear|WorkLifeBalance|Performance_Rating|
+----------+--------------------+--------------------+---------+------+---+-------------+-------------+--------------+-----------------+--------------+--------+---------------+----------+------+----------+--------+------+-----------------------+---------------+---------------------+---------------+------------------+
|         1|               Sales|     Sales

#### Optimization Techniques
    * Optimizing Spark Jobs can significantly improve performance of spark running queries, spark jobs.

2. Partitioning 
    * Partitioning divides data into smaller chunks, which can be procressed in parallel.

In [15]:
partitioned_df = hr_employee.repartition(3)

In [16]:
partitioned_df.write.parquet("/HRPartition")

3. Caching & Persistance
    * Managing Different level of storage.

In [17]:
hr_employee.cache()

DataFrame[EmployeeID: int, Department: string, JobRole: string, Attrition: string, Gender: string, Age: int, MaritalStatus: string, Education: string, EducationField: string, BusinessTravel: string, JobInvolvement: string, JobLevel: int, JobSatisfaction: string, Hourlyrate: int, Income: int, Salaryhike: int, OverTime: string, Workex: int, YearsSinceLastPromotion: int, EmpSatisfaction: string, TrainingTimesLastYear: int, WorkLifeBalance: string, Performance_Rating: string]

In [18]:
# Persistance of dataframe with a specific storage level
from pyspark import StorageLevel
hr_employee.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[EmployeeID: int, Department: string, JobRole: string, Attrition: string, Gender: string, Age: int, MaritalStatus: string, Education: string, EducationField: string, BusinessTravel: string, JobInvolvement: string, JobLevel: int, JobSatisfaction: string, Hourlyrate: int, Income: int, Salaryhike: int, OverTime: string, Workex: int, YearsSinceLastPromotion: int, EmpSatisfaction: string, TrainingTimesLastYear: int, WorkLifeBalance: string, Performance_Rating: string]

#### 4. Serialization
    * Efficient Serialization reduces time to read/write data and transfer it over the network. Kyro Serialization is a popular serialization method for better performance over default Java Serialization.

a) Java Serialization: It is default serialization method, Its easy to use but drawback is it will slow down the read, write process. It can produce large serialized sizes.

In [19]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Java Serialization Example").getOrCreate()

b) Kyro Serialization: Faster, more compact than JavaSerialization.

In [22]:
spark.stop()

In [26]:
'''
spark = SparkSession.builder\
.config("spark.serializer","org.apache.spark.serializer.KyroSerializer")\
.config("spark.kyro.registrationRequired","true")\
.config("spark.kyro.classesToRegister","org.apache.spark.example.Person")\
.appName("Kyro Serialization Example").getOrCreate()
'''

'\nspark = SparkSession.builder.config("spark.serializer","org.apache.spark.serializer.KyroSerializer").config("spark.kyro.registrationRequired","true").config("spark.kyro.classesToRegister","org.apache.spark.example.Person").appName("Kyro Serialization Example").getOrCreate()\n'

In [30]:
spark = SparkSession.builder.appName('PySparkSession')\
.config("spark.serializer","org.apache.spark.serializer.JavaSerializer").getOrCreate()

In [31]:
spark

#### 5. Broadcast Joins
    * Broadcasting small datasets improves join performance.

In [39]:
small_df = spark.read.csv("file:///home/hadoop/Downloads/airports.csv", header=True, inferSchema= True)
df = spark.read.csv("file:///home/hadoop/Downloads/raw_flight_data.csv", header=True, inferSchema= True)

In [40]:
df.printSchema()

root
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Carrier: string (nullable = true)
 |-- OriginAirportID: integer (nullable = true)
 |-- DestAirportID: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)



In [41]:
from pyspark.sql.functions import broadcast
broadcast_df = broadcast(small_df)

In [43]:
broadcast_df.cache()
df.cache()

DataFrame[DayofMonth: int, DayOfWeek: int, Carrier: string, OriginAirportID: int, DestAirportID: int, DepDelay: int, ArrDelay: int]

In [44]:
df.join(broadcast_df, df.OriginAirportID == broadcast_df.airport_id)

DataFrame[DayofMonth: int, DayOfWeek: int, Carrier: string, OriginAirportID: int, DestAirportID: int, DepDelay: int, ArrDelay: int, airport_id: int, city: string, state: string, name: string]

#### 6. Level of Parallelism

In [45]:
# Adjust level of parallelism based on your cluster size.
spark.conf.set("spark.default.parallelism", 100)

#### 7. Avoid GroupByKey

 * Use ReduceBykey or aggregateByKey() instead of groupByKey() to reduce shuffling.

In [47]:
rdd = spark.sparkContext.parallelize([('dosa',2),('salad',3),('idli',1),('dosa',3),
                                      ('chocolates',4),('idli',2)])
rdd.reduceByKey(lambda x,y:x+y).collect()

[('dosa', 5), ('salad', 3), ('idli', 3), ('chocolates', 4)]

#### 8. Reduce Shufffle
    * Reduce the number of shuffles by optimizing transformations.
    * Use reduceByKey() over groupByKey()
    * Use map() and reduce() over groupBy()

#### 9. Repartition and Coalesce

#### 10. Accumulators
    * Use accumulators for aggregate information  like count(), sum(), max(), std(), corr() etc. across executors.

In [48]:
acc = spark.sparkContext.accumulator(0)

In [49]:
type(acc)

pyspark.accumulators.Accumulator

In [52]:
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5, 6, 7, 8])

In [54]:
# python udf
def add(x):
    acc.add(x)

In [55]:
rdd.foreach(add)

In [57]:
print(acc.value)

36


#### 11. Bucketing
    * Use to create Buckets of large datasets for efficient query and joins.

In [59]:
df.write.bucketBy(7, "DayOfWeek").saveAsTable("bucket_table")

In [None]:
!hdfs dfs -cat /user/hive/warehouse/bucket_table/part-00000-94dc958e-c41b-4f93-b337-5f353727373f_00001.c000.snappy.parquet