## <font color=blue> Check, if the installation is working properly</font>

In [1]:
import pyspark

In [2]:
pyspark.__version__

'2.1.1+hadoop2.7'

In [3]:
from pyspark import SparkConf, SparkContext

sc = SparkContext(conf=SparkConf().setAppName("MyApp").setMaster("local"))


In [None]:
sc

In [None]:
a = sc.parallelize([1,2,3,4,5])
a

In [None]:
a.getNumPartitions()

In [None]:
a.collect()

In [None]:
b = a.map(lambda x:2*x)
b

In [None]:
b.collect()

## <font color=blue> Working with text files</font>

### Lets work with NASDAQ stock data for 1 year duration (28.09.2018-27.09.2019)

### load the data

In [None]:
raw_data = sc.textFile('nasdaq.csv')

In [None]:
raw_data.take(3)

### Pre-processing. parse the data to human readable format

In [5]:
from collections import namedtuple

Record = namedtuple("Record", ["Date", "Open", "High", "Low", "Close", "Adj_Close", "Volume"])

In [6]:
def parse_record(s):
    fields = s.split(",")
    return Record(fields[0], *map(float, fields[1:6]), Volume=int(fields[6]))


In [None]:
## parse the data and cache it to the in-memory
parsed_data = raw_data.map(parse_record).cache()

In [None]:
parsed_data.take(3)

In [None]:
parsed_data.map(lambda x:x.Date).min()

In [None]:
parsed_data.map(lambda x:x.Date).max()

In [None]:
parsed_data.map(lambda x:x.Volume).sum()

In [None]:
## parse the total volume by month data

by_month_data = parsed_data.map(lambda x:(x.Date[:7],x.Volume)).reduceByKey(lambda x,y:x+y)

In [None]:
by_month_data.top(1,lambda x:x[1])

In [None]:
# change the tuples to string before saving the data to a file

result_data = by_month_data.map(lambda t:",".join(map(str,t)))

In [None]:
result_data.take(1)

In [None]:
result_data.saveAsTextFile("out")

In [None]:
!ls out/

### <font color=blue> Application of Join: return of everyday on NASDAQ stock price </font>

In [7]:
parsed_data = sc.textFile('nasdaq.csv').map(parse_record).cache()

In [8]:
# lets get the next date

from datetime import datetime,timedelta

def get_next_date(s):
    fmt = "%Y-%m-%d"
    return (datetime.strptime(s,fmt)+timedelta(days=1)).strftime(fmt)


In [9]:
get_next_date("2018-11-08")

'2018-11-09'

In [10]:
## closing price of a given date

date_and_close_price = parsed_data.map(lambda r:(r.Date,r.Close))

## date_and_close_price.take(3)

## closing price for previous date

date_and_prev_close_price = parsed_data.map(lambda r:(get_next_date(r.Date),r.Close))

## join the two prices by same key

joined = date_and_close_price.join(date_and_prev_close_price)
joined_left = date_and_close_price.leftOuterJoin(date_and_prev_close_price)
joined_right = date_and_close_price.rightOuterJoin(date_and_prev_close_price)
joined_full = date_and_close_price.fullOuterJoin(date_and_prev_close_price)



In [11]:
# here one can understand the difference btw different types of joins. Whenever values is not availble it will be None
# in right case 'None' from right, in left case 'None' from left and in full both possibility, play with it to understand

# note that lookup is used to search by a particular key

key_date = "2019-01-15"

print(joined.lookup(key_date))
print(joined_right.lookup(key_date))
print(joined_left.lookup(key_date))
print(joined_full.lookup(key_date))


[(7023.830078, 6905.919922)]
[(7023.830078, 6905.919922)]
[(7023.830078, 6905.919922)]
[(7023.830078, 6905.919922)]


In [12]:
# calculate return

returns = joined.mapValues(lambda p:(p[0]/p[1]-1.0)*100.0)

In [13]:
date = "2019-01-11"

print(date_and_prev_close_price.lookup(date))
print(date_and_close_price.lookup(date))
print(returns.lookup(date))

[6986.069824]
[6971.47998]
[-0.20884194357574382]


### <font color=blue> broadcast and accumulator variable </font>


### accumulator variable: to check how long a code takes to persist data on external drive


In [None]:
parsed_data = sc.textFile('nasdaq.csv').map(parse_record).cache()


In [None]:
from pyspark import AccumulatorParam
import time
import random

class MaxAccumulatorParam(AccumulatorParam):
    def zero(self, initial_value):
        return initial_value
    def addInPlace(self, accumulator, delta):
        return max(accumulator, delta)

In [None]:
time_persist = sc.accumulator(0.0, MaxAccumulatorParam())


In [None]:
def persist_to_external_storage(iterable):
    for record in iterable:
        before = time.time()
        time.sleep(random.random()/1000.0)
        after = time.time()
        time_persist.add(after-before)
 

In [None]:
parsed_data.foreachPartition(persist_to_external_storage)   

In [None]:
time_persist.value

### broadcast varible to pass input externally


In [None]:
parsed_data = sc.textFile("nasdaq.csv").map(parse_record).cache()

In [None]:
params = sc.broadcast({"mu":1910949928.057554, "sigma":284610509.115})

In [None]:
def super_regressor(v):
    time.sleep(random.random()/1000.0)
    return 0.5*((v-params.value["mu"])/params.value["sigma"])**2.0

In [None]:
parsed_data.map(lambda x:super_regressor(x.Volume)).top(1)

## Spark UI 

In [None]:
# use join outcome

In [14]:
sc.uiWebUrl

u'http://172.17.0.19:4040'

In [None]:
## paste this url in your browser and toggle between different tabs to know the system conf and job details