In [1]:
from pyspark import SparkConf, SparkContext
sc = SparkContext(conf=SparkConf().setAppName("MyApp").setMaster("local[2]"))
from collections import namedtuple



In [2]:
Record=namedtuple("Record",["date","open","high","low","close","adj_close","volume"])

In [3]:
def parse_record(s):
     
     fields = s.split(",")
        
     return Record(fields[0],*map(float, fields[1:6]),int(fields[6]))

In [4]:
parsed_data=sc.textFile("data2.csv").map(parse_record).cache()

In [5]:
import time
import random

In [6]:
def supper_regressor(v):
    time.sleep(random.random()/1000)
    return 0.5*((v-1934242080.342412)/23153252.115)**20

In [7]:
time_spent=sc.accumulator(0.0)

In [8]:
time_spent

Accumulator<id=0, value=0.0>

In [9]:
def timed_supper_regressor(v):
    before=time.time()
    result=supper_regressor(v)
    after=time.time()
    time_spent.add(after-before)
    return result

In [10]:
estimates=parsed_data.map(lambda x:timed_supper_regressor(x.volume)).collect()

In [11]:
time_spent.value

0.45221829414367676

In [12]:
estimates=parsed_data.map(lambda x:timed_supper_regressor(x.volume)).collect()

In [13]:
time_spent.value

0.9123134613037109

In [14]:
#Custom Accumulator

from pyspark import AccumulatorParam

class MaxAccumulatorParam(AccumulatorParam):
    
    def zero(self,initial_value):
        return initial_value
    
    def addInPlace(self,accumulator,delta):
        return max(accumulator,delta)
    

In [15]:
time_presist=sc.accumulator(0.0,MaxAccumulatorParam())

In [16]:
time_presist

Accumulator<id=1, value=0.0>

In [17]:
def persist_external_storage(iterable):
    for record in iterable:
        before=time.time()
        time.sleep(random.random()/1000)
        after=time.time()
        time_presist.add(after-before)
    

In [18]:
parsed_data.foreachPartition(persist_external_storage)

In [19]:
time_presist.value

0.0021767616271972656

In [20]:
# BroadCast Variables

In [21]:
params=sc.broadcast({"mu":1934242080.342412,"sigma":23153252.115})

In [22]:
params.value["sigma"]

23153252.115

In [23]:
def supper_regressor_broadcast(v):
    time.sleep(random.random()/1000)
    return 0.5*((v-params.value["mu"])/params.value["sigma"])**20
    

In [24]:
parsed_data_broadcast=parsed_data.map(lambda x:supper_regressor_broadcast(x.volume))

In [25]:
parsed_data_broadcast.top(1)

[5.426043746657545e+45]