In [1]:
from pyspark.sql import DataFrame
from pyspark.sql import *
import numpy as np
import pandas as pd
import scipy as sc
from pyspark.sql import DataFrame
from pyspark.sql import *
from pyspark.ml import Pipeline
from pyspark.ml.classification import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.tuning import *
from pyspark.ml.feature import Imputer
import time
from pyspark import StorageLevel

In [2]:
#df = spark.read.csv('/test.csv', inferSchema=True, header=True)
dfbig= spark.read.json('/review.json')

# [Persist vs. cache](https://stackoverflow.com/questions/26870537/what-is-the-difference-between-cache-and-persist)
- Persist: can store in other areas (ie. disk) 
    - On-heap: subject to garbage collection
    - Off-heap: serialized data not subject to garbage collection
        Slightly slower than memory-based (have to be deserialized still)
        Still faster than disk-based
        Don’t waste GC’s time
        Java and Scala only!
- Cache: store in memory only
- Storage Levels: 
    - MEMORY_ONLY: 
        - Pro: Definitely the fastest, if you have the resources availabe
        - Con: Based on the eviction policy, could use frames if you start running out of memroy
    - MEMORY_AND_DISK:
        - Pro: Best if you're going to run out of memory
        - Pro: Still faster than re-evaluating the RDD
        - Con: Slower than if you could just cache to memory
    - Appending a number (ie. DISK_ONLY_2) adds that number of replicas to other nodes
        - Pro: Keeps speed up if there's a node failure
        - Con: Sucks up extra memory
    - Appending SER (ie MEMORY_ONLY_SER) will serialize the data --> only in Java/Scala
        - Pro: Relieves some strain on memory
        - Con: Instead increases processing time

In [4]:
#first try without caching --> 17 seconds
start_time = time.time()
dfbig.groupBy(col("stars")).count().collect()
dur1 = time.time() - start_time
print(dur1)

17.456082105636597


### Persisting vs. caching: 
- persist(MEMORY_ONLY) and cache() should be the same
- That being said, cacheing the dataframe to memory at best takes a minute longer, and usually just causes the program to crash. 

Times: 
- cache: 117
    1. 83.89
- memory_only: 73.37
    1. 42.1
- memory_and_disk: 42.47
    1. 38.50
- memory and disk serialized: n/a
    1. 88.7
- disk only: 44.11
    1. 38.15
- memory serialized: 41.84
    1. 36.90

1. "spark.sql.inMemoryColumnarStorage.compressed" to True in spark-defaults.conf. 
    - This enables Spark to automatically choose the optimal compression codec for the data. 
    - Moral of the story: have this enabled! Easy boost 
2. Tuning "spark.sql.inMemoryColumnarStorage.batchSize"
    - "Larger batch sizes may improve utilization but also result in out of memory errors" [source](https://qubole.zendesk.com/hc/en-us/articles/216920846-How-To-Spark-SQL-Tuning)
    - Probably best to tune once have acccess to the server

In [9]:
#try caching the dataframe, then run the groupBy --> 111 seconds
dfbig.cache()
start_time = time.time()
dfbig.groupBy(col("stars")).count().collect()
cache_time = time.time() - start_time
print(cache_time)
dfbig.unpersist()

36.8015570640564


DataFrame[business_id: string, cool: bigint, date: string, funny: bigint, review_id: string, stars: bigint, text: string, useful: bigint, user_id: string]

In [4]:
# try persisting the dataframe to memory, then run the groupBy --> 111 seconds
dfbig.persist(StorageLevel.MEMORY_ONLY)
start_time = time.time()
dfbig.groupBy(col("stars")).count().collect()
mem_time = time.time() - start_time
print(mem_time)
dfbig.unpersist()

42.10225009918213


DataFrame[business_id: string, cool: bigint, date: string, funny: bigint, review_id: string, stars: bigint, text: string, useful: bigint, user_id: string]

### Persisting at different storage levels
Memory only and serialized seems to work best, and doesn't cause any crashes, unlike directly storing it in memory. 

In [5]:
#persist to disk as opposed to memory
dfbig.persist(StorageLevel.MEMORY_AND_DISK)
start_time = time.time()
dfbig.groupBy(col("stars")).count().collect()
mem_disk_time = time.time()-start_time
print(mem_disk_time)
dfbig.unpersist()

38.492923974990845


DataFrame[business_id: string, cool: bigint, date: string, funny: bigint, review_id: string, stars: bigint, text: string, useful: bigint, user_id: string]

In [6]:
#persist to disk as opposed to memory
#be warned! another crash-inducing storage level
dfbig.persist(StorageLevel.MEMORY_AND_DISK_SER)
start_time = time.time()
dfbig.groupBy(col("stars")).count().collect()
mem_disk_ser_time = time.time()-start_time
print(mem_disk_ser_time)
dfbig.unpersist()

88.67262363433838


DataFrame[business_id: string, cool: bigint, date: string, funny: bigint, review_id: string, stars: bigint, text: string, useful: bigint, user_id: string]

In [7]:
dfbig.persist(StorageLevel.DISK_ONLY)
start_time = time.time()
dfbig.groupBy(col("stars")).count().collect()
disk_time = time.time()-start_time
print(disk_time)
dfbig.unpersist()

38.15035128593445


DataFrame[business_id: string, cool: bigint, date: string, funny: bigint, review_id: string, stars: bigint, text: string, useful: bigint, user_id: string]

In [8]:
dfbig.persist(StorageLevel.MEMORY_ONLY_SER)
start_time = time.time()
dfbig.groupBy(col("stars")).count().collect()
mem_ser_time = time.time()-start_time
print(mem_ser_time)
dfbig.unpersist()

36.89974522590637


DataFrame[business_id: string, cool: bigint, date: string, funny: bigint, review_id: string, stars: bigint, text: string, useful: bigint, user_id: string]

# [When to cache/persist](https://stackoverflow.com/questions/28981359/why-do-we-need-to-call-cache-or-persist-on-a-rdd)	
- Nothing happens to data until actually perform an operation on it
    - Only references
    - Both RDDs and Dataframes are computed lazily
- Linear situation: ie load file into rdd, perform a basic transformation, then count
    - Cache not needed
    - Data loaded to executors, transform, and count computed all in memory
- Non-linear: ie. load file, want to create a filtered dataset to work with
    - Cache before begin to branch
- Rule of thumb: branching out
    - Iterating (so very important in any ML)
    - Reusing the same dataframe a lot in the program
    - Cost to generate the dataframe is high (ie applied a complex map function)


# Pyspark Serializers
- Best practice to cache serialized objects
- Pickle Serializer 
    - More universal
- Marshall Serializer
    - Faster
- [TBD whether either of these actually offer an advantage over Kyros](https://stackoverflow.com/questions/36278574/do-you-benefit-from-the-kryo-serializer-when-you-use-pyspark)


In [28]:
rdd = dfbig.rdd
rdd.cache()
start_time = time.time()
rdd.count()
dur5 = time.time()-start_time #38 seconds

Other Resources Used: 
- https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html
- https://unraveldata.com/to-cache-or-not-to-cache/
