In [1]:
import time
from pyspark import SparkContext
from pyspark.storagelevel import StorageLevel

In [2]:
class Power:
    def __init__(self, p):
        self.p = p
        time.sleep(2)
        
    def applyPower(self, x):
        return x**self.p
        
# map
def power_map(num):
    c = Power(5)
    return c.applyPower(num)

In [3]:
sc = SparkContext.getOrCreate()
numbers = sc.textFile("../Data/numbers.txt", 5).map(lambda x : int(x))

In [4]:
sc

## Without Persisting or Caching

In [5]:
start = time.time()
powered_num = numbers.map(power_map)
powered_num.collect()
print("first ", time.time() - start)

start = time.time()
powered_num.collect()
print("second ", time.time() - start)

first  43.134905099868774
second  42.05216670036316


## With Persisting/Caching

In [6]:
powered_num.cache()

PythonRDD[2] at collect at <ipython-input-5-cf0093f888ec>:3

In [7]:
start = time.time()
powered_num.collect()
print("first ", time.time() - start)  # trigger re-evaluation

first  42.05640411376953


In [8]:
start = time.time()
powered_num.collect()
print("second ", time.time() - start)  # doesn't trigger re-evaluation

second  0.04544210433959961


## What happens?

In [9]:
powered_num.persist(StorageLevel.MEMORY_AND_DISK)

Py4JJavaError: An error occurred while calling o29.persist.
: java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level
	at org.apache.spark.rdd.RDD.persist(RDD.scala:174)
	at org.apache.spark.rdd.RDD.persist(RDD.scala:198)
	at org.apache.spark.api.java.JavaRDD.persist(JavaRDD.scala:50)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


## How to solve it?

In [10]:
powered_num.unpersist()

PythonRDD[2] at collect at <ipython-input-5-cf0093f888ec>:3

## Reading/writing from disk

In [11]:
powered_num.persist(StorageLevel.DISK_ONLY)
start = time.time()
powered_num.collect()
print("first ", time.time() - start)  # trigger reevaluation

start = time.time()
powered_num.collect()
print("second ", time.time() - start)  # doesn't trigger reevaluation

first  42.090165853500366
second  0.03063821792602539


In [12]:
powered_num.unpersist()

PythonRDD[2] at collect at <ipython-input-5-cf0093f888ec>:3

In [13]:
sc.stop()