# CS777 HW1 Using Spark RDDs only

In [1]:
from pyspark import SparkContext

sc = SparkContext("local[4]", "hw1")

In [2]:
import wget

In [3]:
fileName = 'https://people.bu.edu/kalathur/datasets/stocks_2013_2021.txt'
wget.download(fileName)

'stocks_2013_2021 (2).txt'

In [4]:
stocksFile = sc.textFile("stocks_2013_2021.txt")
stocksFile

stocks_2013_2021.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [5]:
stocksFile.take(4)

['AAPL,2013-01-02,19.78,19.82,19.34,16.96,560518000',
 'AAPL,2013-01-03,19.57,19.63,19.32,16.75,352965200',
 'AAPL,2013-01-04,19.18,19.24,18.78,16.28,594333600',
 'AAPL,2013-01-07,18.64,18.9,18.4,16.18,484156400']

**File format**:

Stock,Date,Open,High,Low,Close,Volume

## Part1 (10 points)

**1a)** Map each line of the text file to a list of tokens. Show the first 2 entries.

Sample Output:
<pre>
[['AAPL', '2013-01-02', '19.78', '19.82', '19.34', '16.96', '560518000'],
 ['AAPL', '2013-01-03', '19.57', '19.63', '19.32', '16.75', '352965200']]
</pre>

In [6]:
result_1A = stocksFile.map(lambda x: x.split(','))
result_1A.take(2)

[['AAPL', '2013-01-02', '19.78', '19.82', '19.34', '16.96', '560518000'],
 ['AAPL', '2013-01-03', '19.57', '19.63', '19.32', '16.75', '352965200']]

**1b)** Map each list of tokens from the above step and perform type conversions. The Open. High, Low, and Close values should be converted to *float*. The Volume should be converted to *int*. Show the first 2 entries.

Sample Output:

<pre>
[['AAPL', '2013-01-02', 19.78, 19.82, 19.34, 16.96, 560518000],
 ['AAPL', '2013-01-03', 19.57, 19.63, 19.32, 16.75, 352965200]]
</pre>


In [7]:
result_1B = result_1A.map(lambda x: [x[0],x[1],float(x[2]),float(x[3]),float(x[4]),float(x[5]),int(x[6])])
result_1B.take(2)

[['AAPL', '2013-01-02', 19.78, 19.82, 19.34, 16.96, 560518000],
 ['AAPL', '2013-01-03', 19.57, 19.63, 19.32, 16.75, 352965200]]

**1c)** How many total entries are there in the dataset?

In [8]:
result_1B.count()

13596

**1d)** How many entries are there in the dataset for each Stock?

Sample Output:

<pre>
defaultdict(int,
            {'AAPL': 2266,
             'GOOG': 2266,
             'INTC': 2266,
             'MSFT': 2266,
             'NFLX': 2266,
             'NVDA': 2266})
</pre>

In [9]:
result_1B.map(lambda x: (x[0],1)).countByKey()


defaultdict(int,
            {'AAPL': 2266,
             'GOOG': 2266,
             'INTC': 2266,
             'MSFT': 2266,
             'NFLX': 2266,
             'NVDA': 2266})

## Part2 (10 points)

**2a)** Map the data from 1b) to a new RDD with only the (Stock,Volume) data. Show the first two entries.

Sample Output:

<pre>
[('AAPL', 560518000), ('AAPL', 352965200)]
</pre>

In [10]:
newRDD = result_1B.map(lambda x: (x[0],x[-1]))
newRDD.take(2)

[('AAPL', 560518000), ('AAPL', 352965200)]

**2b)** Using the RDD from 2a), what is the total volume for each stock symbol in the dataset? Show the results.

Sample Output:
<pre>
[('INTC', 65334389200),
 ('AAPL', 409337906700),
 ('GOOG', 4626409870),
 ('MSFT', 73349882000),
 ('NFLX', 28981935000),
 ('NVDA', 98721855200)]
 
 </pre>

In [11]:
result_2B = newRDD.reduceByKey(lambda x,y: x+y)
result_2B.take(6)

[('INTC', 65334389200),
 ('AAPL', 409337906700),
 ('GOOG', 4626409870),
 ('MSFT', 73349882000),
 ('NFLX', 28981935000),
 ('NVDA', 98721855200)]

**2c)** Show the above results in the sorted order based on the Stock symbol using Python *sorted* function on the previous results.



In [12]:
sorted(result_2B.take(6))

[('AAPL', 409337906700),
 ('GOOG', 4626409870),
 ('INTC', 65334389200),
 ('MSFT', 73349882000),
 ('NFLX', 28981935000),
 ('NVDA', 98721855200)]

**2d)** Using Spark *sortBy* operation, sort the resulting RDD from 2b) in the descending order of trading value and show the results. 


In [13]:
result_2D = result_2B.sortBy(lambda x: x[1], ascending=False)
result_2D.take(6)

[('AAPL', 409337906700),
 ('NVDA', 98721855200),
 ('MSFT', 73349882000),
 ('INTC', 65334389200),
 ('NFLX', 28981935000),
 ('GOOG', 4626409870)]

## Part 3 (10 points)

**3a)** Map the data from 1b) to a new RDD with only the (Stock,[Open, Close]) data. Show the first two entries.

Sample Output:
<pre>
[('AAPL', [19.78, 16.96]), ('AAPL', [19.57, 16.75])]
</pre>

In [14]:
result_3A = result_1B.map(lambda x: (x[0],[x[2],x[5]]))
result_3A.take(2)

[('AAPL', [19.78, 16.96]), ('AAPL', [19.57, 16.75])]

**3b)** Using the above RDD, for each stock, how many days over the entire dataset did that stock's closing price was above the opening price? Show the results in descreasing order as shown below. First map the RDD to (Stock, 1/0), i.e., 1 if the condition is true, 0 if false. Then, use reduceByKey followed by sortBy.

Sample Output:
<pre>
[('GOOG', 1166),
 ('NFLX', 1125),
 ('NVDA', 525),
 ('AAPL', 202),
 ('MSFT', 153),
 ('INTC', 55)]
 </pre>

In [15]:
result_3B = result_3A.map(lambda x: (x[0], 1 if x[1][1]-x[1][0]>0 else 0 ) ).reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1], ascending=False)
result_3B.take(6)

[('GOOG', 1166),
 ('NFLX', 1125),
 ('NVDA', 525),
 ('AAPL', 202),
 ('MSFT', 153),
 ('INTC', 55)]

## Part4 (10 points)

**4a)** Map the data from 1b) to a new RDD with only the (Stock,[High, Low]) data. Show the first two entries.

Sample Output:
<pre>
[('AAPL', [19.82, 19.34]), ('AAPL', [19.63, 19.32])]
</pre>

In [16]:
result_4A = result_1B.map(lambda x: (x[0],[x[3],x[4]]))
result_4A.take(2)

[('AAPL', [19.82, 19.34]), ('AAPL', [19.63, 19.32])]

**4b)** Using a single reduceByKey on the above RDD, for each stock, what is the largest High and the smallest Low price? Show the results in sorted order using Python *sorted* function.

Sample Output:
<pre>
[('AAPL', (182.13, 13.75)),
 ('GOOG', (3037.0, 346.46)),
 ('INTC', (69.29, 20.1)),
 ('MSFT', (349.67, 26.28)),
 ('NFLX', (700.99, 12.96)),
 ('NVDA', (346.47, 2.98))]
 </pre>

In [17]:
result_4B = result_4A.reduceByKey(lambda x,y: (max(x[0],y[0]),min(x[1],y[1])))
sorted(result_4B.take(6))

[('AAPL', (182.13, 13.75)),
 ('GOOG', (3037.0, 346.46)),
 ('INTC', (69.29, 20.1)),
 ('MSFT', (349.67, 26.28)),
 ('NFLX', (700.99, 12.96)),
 ('NVDA', (346.47, 2.98))]

## Part5 (10 points)

**5a)** Map the data from 1b) to a new RDD with only the (Stock,[(High, Date), (Low, Date)]) data. Show the first two entries.

Sample Output:
<pre>
[('AAPL', [(19.82, '2013-01-02'), (19.34, '2013-01-02')]),
 ('AAPL', [(19.63, '2013-01-03'), (19.32, '2013-01-03')])]
</pre>

In [18]:
result_5A = result_1B.map(lambda x: (x[0],[(x[3],x[1]),(x[4],x[1])]))
result_5A.take(2)

[('AAPL', [(19.82, '2013-01-02'), (19.34, '2013-01-02')]),
 ('AAPL', [(19.63, '2013-01-03'), (19.32, '2013-01-03')])]

**5b)** Using a single reduceByKey on the above RDD, for each stock, what is the largest High and the smallest Low price along with that respective date? Show the results in sorted order using Python *sorted* function.

Sample Output:

<pre>
[('AAPL', [(182.13, '2021-12-13'), (13.75, '2013-04-19')]),
 ('GOOG', [(3037.0, '2021-11-19'), (346.46, '2013-01-22')]),
 ('INTC', [(69.29, '2020-01-24'), (20.1, '2013-02-22')]),
 ('MSFT', [(349.67, '2021-11-22'), (26.28, '2013-01-11')]),
 ('NFLX', [(700.99, '2021-11-17'), (12.96, '2013-01-02')]),
 ('NVDA', [(346.47, '2021-11-22'), (2.98, '2013-01-15')])]
 </pre>

In [19]:
result_5B = result_5A.reduceByKey(lambda x,y: [(max(x[0][0],y[0][0]), x[0][1] if x[0][0] > y[0][0] else y[0][1]),(min(x[1][0],y[1][0]), x[1][1] if x[1][0] < y[1][0] else y[1][1])])
sorted(result_5B.take(6))

[('AAPL', [(182.13, '2021-12-13'), (13.75, '2013-04-19')]),
 ('GOOG', [(3037.0, '2021-11-19'), (346.46, '2013-01-22')]),
 ('INTC', [(69.29, '2020-01-24'), (20.1, '2013-02-22')]),
 ('MSFT', [(349.67, '2021-11-22'), (26.28, '2013-01-11')]),
 ('NFLX', [(700.99, '2021-11-17'), (12.96, '2013-01-02')]),
 ('NVDA', [(346.47, '2021-11-22'), (2.98, '2013-01-15')])]

## Part 6 (10 points)
 - The following will use (Stock, Year) as the key.

**6a)** Map the data from 1b) to a new RDD with only the ((Stock, Year),Volume) data. Show the first two entries.

Sample Output:
<pre>
[(('AAPL', '2013'), 560518000), (('AAPL', '2013'), 352965200)]
</pre>



In [20]:
result_6A = result_1B.map(lambda x: ( (x[0],x[1][0:4]), x[6] ))
result_6A.take(2)

[(('AAPL', '2013'), 560518000), (('AAPL', '2013'), 352965200)]

**6b)** Using the RDD from 6a), what is the total volume for each stock symbol per each year  in the dataset? Show the results using Python *sorted* function.

Sample Output:
<pre>
[(('AAPL', '2013'), 102421569600),
 (('AAPL', '2014'), 63657952400),
 (('AAPL', '2015'), 52264199600),
 (('AAPL', '2016'), 38729911200),
 (('AAPL', '2017'), 27243106000),
 (('AAPL', '2018'), 34156144800),
 (('AAPL', '2019'), 28254942800),
 (('AAPL', '2020'), 39863855600),
 (('AAPL', '2021'), 22746224700),
 (('GOOG', '2013'), 1055993341),
 (('GOOG', '2014'), 626749011),
 (('GOOG', '2015'), 521501318),
 (('GOOG', '2016'), 461731000),
 (('GOOG', '2017'), 370605100),
 (('GOOG', '2018'), 437233300),
 (('GOOG', '2019'), 356294900),
 (('GOOG', '2020'), 478595100),
 (('GOOG', '2021'), 317706800),
 (('INTC', '2013'), 9856167600),
 (('INTC', '2014'), 7965304500),
 (('INTC', '2015'), 7633745600),
 (('INTC', '2016'), 5763208600),
 (('INTC', '2017'), 5999715600),
 (('INTC', '2018'), 7379750000),
 (('INTC', '2019'), 5519974300),
 (('INTC', '2020'), 7989672500),
 (('INTC', '2021'), 7226850500),
 (('MSFT', '2013'), 12251098000),
 (('MSFT', '2014'), 8399600600),
 (('MSFT', '2015'), 9059455900),
 (('MSFT', '2016'), 7819726800),
 (('MSFT', '2017'), 5631478600),
 (('MSFT', '2018'), 7929137400),
 (('MSFT', '2019'), 6194410500),
 (('MSFT', '2020'), 9527876900),
 (('MSFT', '2021'), 6537097300),
 (('NFLX', '2013'), 6915790700),
 (('NFLX', '2014'), 4898415200),
 (('NFLX', '2015'), 4681123800),
 (('NFLX', '2016'), 3234743600),
 (('NFLX', '2017'), 1654091200),
 (('NFLX', '2018'), 2878875800),
 (('NFLX', '2019'), 1980147700),
 (('NFLX', '2020'), 1754276900),
 (('NFLX', '2021'), 984470100),
 (('NVDA', '2013'), 8914737600),
 (('NVDA', '2014'), 7155693600),
 (('NVDA', '2015'), 7818756400),
 (('NVDA', '2016'), 11159860400),
 (('NVDA', '2017'), 17215246800),
 (('NVDA', '2018'), 13756960800),
 (('NVDA', '2019'), 11501271600),
 (('NVDA', '2020'), 12165638000),
 (('NVDA', '2021'), 9033690000)]
 </pre>

In [21]:
result_6B = result_6A.reduceByKey(lambda x,y: x+y)
sorted(result_6B.collect())

[(('AAPL', '2013'), 102421569600),
 (('AAPL', '2014'), 63657952400),
 (('AAPL', '2015'), 52264199600),
 (('AAPL', '2016'), 38729911200),
 (('AAPL', '2017'), 27243106000),
 (('AAPL', '2018'), 34156144800),
 (('AAPL', '2019'), 28254942800),
 (('AAPL', '2020'), 39863855600),
 (('AAPL', '2021'), 22746224700),
 (('GOOG', '2013'), 1055993341),
 (('GOOG', '2014'), 626749011),
 (('GOOG', '2015'), 521501318),
 (('GOOG', '2016'), 461731000),
 (('GOOG', '2017'), 370605100),
 (('GOOG', '2018'), 437233300),
 (('GOOG', '2019'), 356294900),
 (('GOOG', '2020'), 478595100),
 (('GOOG', '2021'), 317706800),
 (('INTC', '2013'), 9856167600),
 (('INTC', '2014'), 7965304500),
 (('INTC', '2015'), 7633745600),
 (('INTC', '2016'), 5763208600),
 (('INTC', '2017'), 5999715600),
 (('INTC', '2018'), 7379750000),
 (('INTC', '2019'), 5519974300),
 (('INTC', '2020'), 7989672500),
 (('INTC', '2021'), 7226850500),
 (('MSFT', '2013'), 12251098000),
 (('MSFT', '2014'), 8399600600),
 (('MSFT', '2015'), 9059455900),
 (('MSF

## Part7 (20 points)

**7a)** Map the data from 1b) to a new RDD with only the ((Stock, Year), [High, Low]) data. Show the first two entries.

Sample Output:
<pre>
[(('AAPL', '2013'), [19.82, 19.34]), (('AAPL', '2013'), [19.63, 19.32])]
</pre>

In [22]:
result_7A = result_1B.map(lambda x: ( (x[0],x[1][0:4]), [x[3],x[4]] ))
result_7A.take(2)

[(('AAPL', '2013'), [19.82, 19.34]), (('AAPL', '2013'), [19.63, 19.32])]

**7b)** Using a single reduceByKey on the above RDD, for each stock per each year, what is the largest High and the smallest Low price? Show the results in sorted order using Python *sorted* function.

Sample Output:
<pre>
[(('AAPL', '2013'), (20.54, 13.75)),
 (('AAPL', '2014'), (29.94, 17.63)),
 (('AAPL', '2015'), (33.63, 23.0)),
 (('AAPL', '2016'), (29.67, 22.37)),
 (('AAPL', '2017'), (44.3, 28.69)),
 (('AAPL', '2018'), (58.37, 36.65)),
 (('AAPL', '2019'), (73.49, 35.5)),
 (('AAPL', '2020'), (138.79, 53.15)),
 (('AAPL', '2021'), (182.13, 116.21)),
 (('GOOG', '2013'), (558.41, 346.46)),
 (('GOOG', '2014'), (612.15, 487.66)),
 (('GOOG', '2015'), (779.98, 486.23)),
 (('GOOG', '2016'), (816.68, 663.06)),
 (('GOOG', '2017'), (1078.49, 775.8)),
 (('GOOG', '2018'), (1273.89, 970.11)),
 (('GOOG', '2019'), (1365.0, 1014.07)),
 (('GOOG', '2020'), (1847.2, 1013.54)),
 (('GOOG', '2021'), (3037.0, 1699.0)),
 (('INTC', '2013'), (26.04, 20.1)),
 (('INTC', '2014'), (37.9, 23.5)),
 (('INTC', '2015'), (37.49, 24.87)),
 (('INTC', '2016'), (38.36, 27.68)),
 (('INTC', '2017'), (47.64, 33.23)),
 (('INTC', '2018'), (57.6, 42.04)),
 (('INTC', '2019'), (60.48, 42.86)),
 (('INTC', '2020'), (69.29, 43.61)),
 (('INTC', '2021'), (68.49, 47.87)),
 (('MSFT', '2013'), (38.98, 26.28)),
 (('MSFT', '2014'), (50.05, 34.63)),
 (('MSFT', '2015'), (56.85, 39.72)),
 (('MSFT', '2016'), (64.1, 48.04)),
 (('MSFT', '2017'), (87.5, 61.95)),
 (('MSFT', '2018'), (116.18, 83.83)),
 (('MSFT', '2019'), (159.55, 97.2)),
 (('MSFT', '2020'), (232.86, 132.52)),
 (('MSFT', '2021'), (349.67, 211.94)),
 (('NFLX', '2013'), (55.59, 12.96)),
 (('NFLX', '2014'), (69.9, 42.79)),
 (('NFLX', '2015'), (133.27, 45.26)),
 (('NFLX', '2016'), (129.29, 79.95)),
 (('NFLX', '2017'), (204.38, 124.31)),
 (('NFLX', '2018'), (423.21, 195.42)),
 (('NFLX', '2019'), (385.99, 252.28)),
 (('NFLX', '2020'), (575.37, 290.25)),
 (('NFLX', '2021'), (700.99, 478.54)),
 (('NVDA', '2013'), (4.08, 2.98)),
 (('NVDA', '2014'), (5.31, 3.83)),
 (('NVDA', '2015'), (8.48, 4.74)),
 (('NVDA', '2016'), (29.98, 6.19)),
 (('NVDA', '2017'), (54.67, 23.79)),
 (('NVDA', '2018'), (73.19, 31.11)),
 (('NVDA', '2019'), (60.45, 31.92)),
 (('NVDA', '2020'), (147.27, 45.17)),
 (('NVDA', '2021'), (346.47, 115.67))]
 </pre>

In [23]:
result_7B = result_7A.reduceByKey(lambda x,y: (max(x[0],y[0]),min(x[1],y[1])))
sorted(result_7B.collect())

[(('AAPL', '2013'), (20.54, 13.75)),
 (('AAPL', '2014'), (29.94, 17.63)),
 (('AAPL', '2015'), (33.63, 23.0)),
 (('AAPL', '2016'), (29.67, 22.37)),
 (('AAPL', '2017'), (44.3, 28.69)),
 (('AAPL', '2018'), (58.37, 36.65)),
 (('AAPL', '2019'), (73.49, 35.5)),
 (('AAPL', '2020'), (138.79, 53.15)),
 (('AAPL', '2021'), (182.13, 116.21)),
 (('GOOG', '2013'), (558.41, 346.46)),
 (('GOOG', '2014'), (612.15, 487.66)),
 (('GOOG', '2015'), (779.98, 486.23)),
 (('GOOG', '2016'), (816.68, 663.06)),
 (('GOOG', '2017'), (1078.49, 775.8)),
 (('GOOG', '2018'), (1273.89, 970.11)),
 (('GOOG', '2019'), (1365.0, 1014.07)),
 (('GOOG', '2020'), (1847.2, 1013.54)),
 (('GOOG', '2021'), (3037.0, 1699.0)),
 (('INTC', '2013'), (26.04, 20.1)),
 (('INTC', '2014'), (37.9, 23.5)),
 (('INTC', '2015'), (37.49, 24.87)),
 (('INTC', '2016'), (38.36, 27.68)),
 (('INTC', '2017'), (47.64, 33.23)),
 (('INTC', '2018'), (57.6, 42.04)),
 (('INTC', '2019'), (60.48, 42.86)),
 (('INTC', '2020'), (69.29, 43.61)),
 (('INTC', '2021'), (

**7c)** For the above problem, show the dates for the largest High and the smallest Low for each stock per each year.

In [24]:
result_7C_1 = result_1B.map(lambda x: ( (x[0],x[1][0:4]), [x[3],x[1]] )).reduceByKey(lambda x,y: (max(x[0],y[0]),x[1] if x[0]>y[0] else y[1]))
result_7C_2 = result_1B.map(lambda x: ( (x[0],x[1][0:4]), [x[4],x[1]] )).reduceByKey(lambda x,y: (min(x[0],y[0]),x[1] if x[0]<y[0] else y[1]))
result_7C = result_7C_1.join(result_7C_2)
sorted(result_7C.collect())


[(('AAPL', '2013'), ((20.54, '2013-12-05'), (13.75, '2013-04-19'))),
 (('AAPL', '2014'), ((29.94, '2014-11-25'), (17.63, '2014-01-31'))),
 (('AAPL', '2015'), ((33.63, '2015-04-28'), (23.0, '2015-08-24'))),
 (('AAPL', '2016'), ((29.67, '2016-10-11'), (22.37, '2016-05-12'))),
 (('AAPL', '2017'), ((44.3, '2017-12-18'), (28.69, '2017-01-03'))),
 (('AAPL', '2018'), ((58.37, '2018-10-03'), (36.65, '2018-12-24'))),
 (('AAPL', '2019'), ((73.49, '2019-12-27'), (35.5, '2019-01-03'))),
 (('AAPL', '2020'), ((138.79, '2020-12-29'), (53.15, '2020-03-23'))),
 (('AAPL', '2021'), ((182.13, '2021-12-13'), (116.21, '2021-03-08'))),
 (('GOOG', '2013'), ((558.41, '2013-12-31'), (346.46, '2013-01-22'))),
 (('GOOG', '2014'), ((612.15, '2014-02-26'), (487.66, '2014-12-16'))),
 (('GOOG', '2015'), ((779.98, '2015-12-29'), (486.23, '2015-01-12'))),
 (('GOOG', '2016'), ((816.68, '2016-10-25'), (663.06, '2016-02-08'))),
 (('GOOG', '2017'), ((1078.49, '2017-12-18'), (775.8, '2017-01-03'))),
 (('GOOG', '2018'), ((12

### Part 8 (20 points)

**8a)** Filter the dataset for Google (GOOG) and map to a new RDD with the following structure:

(Date, [Stock, Open, Close, net_percent_change])

Show the first 4 entries.

Sample Output:
<pre>
[('2013-01-02', ['GOOG', 358.37, 360.27, 0.53]),
 ('2013-01-03', ['GOOG', 361.11, 360.48, -0.17]),
 ('2013-01-04', ['GOOG', 363.31, 367.61, 1.18]),
 ('2013-01-07', ['GOOG', 366.35, 366.0, -0.1])]
 </pre>

In [25]:
result_8A = result_1B.filter(lambda x: x[0]=='GOOG').map(lambda x: (x[1], [x[0],x[2],x[5],round((x[5]-x[2])/x[2]*100,2)]))
result_8A.take(4)

[('2013-01-02', ['GOOG', 358.37, 360.27, 0.53]),
 ('2013-01-03', ['GOOG', 361.11, 360.48, -0.17]),
 ('2013-01-04', ['GOOG', 363.31, 367.61, 1.18]),
 ('2013-01-07', ['GOOG', 366.35, 366.0, -0.1])]

**8b)** Filter the dataset for Netflix (NFLX) and map to a new RDD with the following structure:

(Date, [Stock, Open, Close, net_percent_change])

Show the first 4 entries.

<pre>
[('2013-01-02', ['NFLX', 13.6, 13.14, -3.38]),
 ('2013-01-03', ['NFLX', 13.14, 13.8, 5.02]),
 ('2013-01-04', ['NFLX', 13.79, 13.71, -0.58]),
 ('2013-01-07', ['NFLX', 13.77, 14.17, 2.9])]
 </pre>

In [26]:
result_8B = result_1B.filter(lambda x: x[0]=='NFLX').map(lambda x: (x[1], [x[0],x[2],x[5],round((x[5]-x[2])/x[2]*100,2)]))
result_8B.take(4)

[('2013-01-02', ['NFLX', 13.6, 13.14, -3.38]),
 ('2013-01-03', ['NFLX', 13.14, 13.8, 5.02]),
 ('2013-01-04', ['NFLX', 13.79, 13.71, -0.58]),
 ('2013-01-07', ['NFLX', 13.77, 14.17, 2.9])]

**8c)** Create a new RDD by joining the Google RDD with Netflix RDD.
Show the first 4 entries using *takeOrdered* action.

Sample Output:
<pre>
[('2013-01-02',
  (['GOOG', 358.37, 360.27, 0.53], ['NFLX', 13.6, 13.14, -3.38])),
 ('2013-01-03',
  (['GOOG', 361.11, 360.48, -0.17], ['NFLX', 13.14, 13.8, 5.02])),
 ('2013-01-04',
  (['GOOG', 363.31, 367.61, 1.18], ['NFLX', 13.79, 13.71, -0.58])),
 ('2013-01-07', (['GOOG', 366.35, 366.0, -0.1], ['NFLX', 13.77, 14.17, 2.9]))]
</pre>


In [27]:
result_8C = result_8A.join(result_8B)
result_8C.takeOrdered(4)

[('2013-01-02',
  (['GOOG', 358.37, 360.27, 0.53], ['NFLX', 13.6, 13.14, -3.38])),
 ('2013-01-03',
  (['GOOG', 361.11, 360.48, -0.17], ['NFLX', 13.14, 13.8, 5.02])),
 ('2013-01-04',
  (['GOOG', 363.31, 367.61, 1.18], ['NFLX', 13.79, 13.71, -0.58])),
 ('2013-01-07', (['GOOG', 366.35, 366.0, -0.1], ['NFLX', 13.77, 14.17, 2.9]))]

**8d)** Map the above RDD to keep only the entry with the larger net percent change between the two stocks for each date.
Show the first 4 entries using takeOrdered action.

Sample Output:
<pre>
[('2013-01-02', ['GOOG', 358.37, 360.27, 0.53]),
 ('2013-01-03', ['NFLX', 13.14, 13.8, 5.02]),
 ('2013-01-04', ['GOOG', 363.31, 367.61, 1.18]),
 ('2013-01-07', ['NFLX', 13.77, 14.17, 2.9])]
 </pre>

In [28]:
result_8D = result_8C.map(lambda x: (x[0],x[1][0] if x[1][0][3] > x[1][1][3] else x[1][1] ))
result_8D.takeOrdered(4)

[('2013-01-02', ['GOOG', 358.37, 360.27, 0.53]),
 ('2013-01-03', ['NFLX', 13.14, 13.8, 5.02]),
 ('2013-01-04', ['GOOG', 363.31, 367.61, 1.18]),
 ('2013-01-07', ['NFLX', 13.77, 14.17, 2.9])]

**8e**) Map the above RDD to the following RDD structure.

((Year, Stock), net_percent_change)

Show the results using takeSample(False, 10, seed=123) action.

Sample Output:
<pre>
[(('2013', 'NFLX'), 4.64),
 (('2016', 'NFLX'), 2.05),
 (('2019', 'NFLX'), -0.46),
 (('2014', 'NFLX'), 0.16),
 (('2019', 'NFLX'), 0.51),
 (('2020', 'NFLX'), 1.73),
 (('2018', 'GOOG'), -0.43),
 (('2018', 'GOOG'), 0.5),
 (('2013', 'GOOG'), 0.04),
 (('2015', 'GOOG'), 0.02)]
 </pre>

In [29]:
result_8e = result_8D.map(lambda x: ((x[0][:4],x[1][0]),x[1][3]))
result_8e.takeSample(False,10,seed=123)

[(('2013', 'NFLX'), 4.64),
 (('2016', 'NFLX'), 2.05),
 (('2019', 'NFLX'), -0.46),
 (('2014', 'NFLX'), 0.16),
 (('2019', 'NFLX'), 0.51),
 (('2020', 'NFLX'), 1.73),
 (('2018', 'GOOG'), -0.43),
 (('2018', 'GOOG'), 0.5),
 (('2013', 'GOOG'), 0.04),
 (('2015', 'GOOG'), 0.02)]

**8f)** For the above RDD, show the sum of the net_percent_change values for each Stock per each year. Use Python *sorted* function to show the results.

Sample Output:
<pre>
[(('2013', 'GOOG'), 26.02),
 (('2013', 'NFLX'), 241.04999999999998),
 (('2014', 'GOOG'), -4.030000000000002),
 (('2014', 'NFLX'), 141.55000000000004),
 (('2015', 'GOOG'), 30.110000000000003),
 (('2015', 'NFLX'), 185.94000000000003),
 (('2016', 'GOOG'), 0.7200000000000029),
 (('2016', 'NFLX'), 168.39999999999998),
 (('2017', 'GOOG'), 17.07),
 (('2017', 'NFLX'), 109.73000000000002),
 (('2018', 'GOOG'), -8.899999999999999),
 (('2018', 'NFLX'), 190.29000000000002),
 (('2019', 'GOOG'), 31.82),
 (('2019', 'NFLX'), 139.82999999999998),
 (('2020', 'GOOG'), 70.9),
 (('2020', 'NFLX'), 167.45),
 (('2021', 'GOOG'), 46.09),
 (('2021', 'NFLX'), 99.57)]
 </pre>

In [30]:
result_8F = result_8e.reduceByKey(lambda x,y: x+y)
sorted(result_8F.collect())

[(('2013', 'GOOG'), 26.02),
 (('2013', 'NFLX'), 241.04999999999998),
 (('2014', 'GOOG'), -4.030000000000002),
 (('2014', 'NFLX'), 141.55000000000004),
 (('2015', 'GOOG'), 30.110000000000003),
 (('2015', 'NFLX'), 185.94000000000003),
 (('2016', 'GOOG'), 0.7200000000000029),
 (('2016', 'NFLX'), 168.39999999999998),
 (('2017', 'GOOG'), 17.07),
 (('2017', 'NFLX'), 109.73000000000002),
 (('2018', 'GOOG'), -8.899999999999999),
 (('2018', 'NFLX'), 190.29000000000002),
 (('2019', 'GOOG'), 31.82),
 (('2019', 'NFLX'), 139.82999999999998),
 (('2020', 'GOOG'), 70.9),
 (('2020', 'NFLX'), 167.45),
 (('2021', 'GOOG'), 46.09),
 (('2021', 'NFLX'), 99.57)]

In [31]:
sc.stop()