![alt text](./img/spark-storage.png)

#### Let's compare the primitive RDD with Modern DataFrames

In [1]:
from pyspark import SparkContext, SparkConf

conf = SparkConf()
conf.set('spark.driver.memory', '12g')
conf.set('spark.executor.memory', '1g')
conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:2.7.1')
sc = SparkContext(conf=conf)

print("Done.")

# Add keys
hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoopConf.set("fs.s3a.access.key", s3Key)
hadoopConf.set("fs.s3a.secret.key", s3Secret)
hadoopConf.setInt("fs.s3a.connection.maximum", 100)

Done.


In [44]:
rdd_data = sc.textFile("<csv path>")
rdd_data = rdd_data.map(lambda x: x.split("|")).filter(lambda x: len(x) == 6)

### Simple total count operation with RDDs

In [3]:
# time to count
import time

start_time = time.time()

total_count = rdd_data.count()

end_time = time.time()

print("Total time taken: ", end_time-start_time)

Total time taken:  3.006709337234497


#### Performing the same operation on Spark DataFrame

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType, FloatType, IntegerType
import pyspark.sql.functions as F

spark = SparkSession(sc)

In [8]:
schema_list = [StructField('name',StringType(),True),
              StructField('city',StringType(),True),
              StructField('state',StringType(),True),
              StructField('category',StringType(),True),
              StructField('score',FloatType(),False),
              StructField('amount',FloatType(),False),]

my_schema = StructType(schema_list)

In [23]:
data_frame = spark.read.option("delimiter","|").csv("<csv-path>",schema=my_schema)

### Simple count operation on DF

In [12]:
# time to count
import time

start_time = time.time()

total_count = data_frame.count()

end_time = time.time()

print("Total time taken: ", end_time-start_time)

Total time taken:  0.7043869495391846


### Why do DataFrames perform better?

<p> As network latency has decreased the new compact representation of DataFrames gives<br>
it an edge over RDDs<p>

![alt text](./img/hw_trend.png)

### Let's try some more complex queries and compare

- total sale
- total sale per state
- unique categories
and so on.

In [39]:
# Total Sale
start_time = time.time()
total_sum = rdd_data.map(lambda x: float(x[-1])).sum()
end_time = time.time()

print("Total time taken by RDD: ",end_time-start_time)

start_time = time.time()
total_sum = data_frame.agg(F.sum("amount")).collect()
end_time = time.time()

print("Total time taken by DF: ",end_time-start_time)

Total time taken by RDD:  1.6312265396118164
Total time taken by DF:  1.0419425964355469


In [49]:
# Total Sale per State
start_time = time.time()
total_sum = rdd_data.map(lambda x: (x[2],float(x[-1]))).groupByKey().mapValues(sum).collect()
end_time = time.time()

print("Total time taken by RDD: ",end_time-start_time)

start_time = time.time()
total_sum = data_frame.groupBy('state').agg(F.sum("amount")).collect()
end_time = time.time()

print("Total time taken by DF: ",end_time-start_time)

Total time taken by RDD:  2.0678892135620117
Total time taken by DF:  1.219912052154541


In [50]:
# All unique categories
start_time = time.time()
categories = rdd_data.map(lambda x: x[3]).distinct().collect()
end_time = time.time()

print("Total time taken by RDD: ",end_time-start_time)

start_time = time.time()
categories = data_frame.select('category').distinct().collect()
end_time = time.time()

print("Total time taken by DF: ",end_time-start_time)

Total time taken by RDD:  1.8873486518859863
Total time taken by DF:  1.5895500183105469


![alt text](./img/space-improvement.png)

### Benefits of Dataset API:
- Compact (less overhead)
- Reduce our memory footprint significantly.
- Spark knows what data is it handling now.
- Spark also knows the operation that user wants to perform on Dataset
- Both the above two listed benefits paved way to one more not so obvious advantage which is:
- Possible in-place transformations for simple one’s without the need to deserialise. (Let’s see how this happens in detail below)

# Can we further improve our performance?

![alt text](./img/parquet.png)

In [52]:
parquet_data = spark.read.parquet('<parquet_path>')

In [53]:
parquet_data.show(5)

+--------------------+--------------------+-----+--------------------+-----+---------+
|                name|                city|state|            category|score|   amount|
+--------------------+--------------------+-----+--------------------+-----+---------+
|   St William Church|              Warren|   OH|Community and Gov...|  0.9| 78.09576|
|Centre Elementary...|              Centre|   AL|Community and Gov...|  1.0|54.818233|
|         Mobile Edge|           Lehighton|   PA|Automotive,Mainte...|  0.9| 9.512629|
|National Park Com...|Hot Springs Natio...|   AR|Community and Gov...|  1.0|34.694485|
|Mr G Natural Heal...|        Mount Vernon|   NY|Retail,Food and B...|  0.9| 63.44141|
+--------------------+--------------------+-----+--------------------+-----+---------+
only showing top 5 rows



In [54]:
# Total Sale per State
start_time = time.time()
total_sum = rdd_data.map(lambda x: (x[2],float(x[-1]))).groupByKey().mapValues(sum).collect()
end_time = time.time()

print("Total time taken by RDD: ",end_time-start_time)

start_time = time.time()
total_sum = parquet_data.groupBy('state').agg(F.sum("amount")).collect()
end_time = time.time()

print("Total time taken by DF: ",end_time-start_time)

Total time taken by RDD:  2.0981931686401367
Total time taken by DF:  0.7468547821044922
