In [1]:
# Required libraries
import sys
import datetime
import time 

from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.streaming import StreamingContext

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as sFuncs
from pyspark.sql.window import Window

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

## PA5 Question 1

Moving Averages

Calculation of moving stock price averages are part of many a trading strategies ([reference](https://www.investopedia.com/articles/active-trading/052014/how-use-moving-average-buy-stocks.asp)).

We will be using the two moving averages strategy, with the shorter-term MA being 10-day and the longer average being 40-day. When the shorter-term MA crosses above the longer-term MA, it's a buy signal, as it indicates that the trend is shifting up. This is known as a "golden cross."

Meanwhile, when the shorter-term MA crosses below the longer-term MA, it's a sell signal, as it indicates that the trend is shifting down. This is known as a "dead/death cross."

To simulate a data stream, you are given a python program `stream-feeder.py` which reads in `dj30.csv` file and pipes it, line by line. `dj30.csv` contains a 25-year history of the Dow Jones Industrial Average prices. We will only be concerned with the Close price. The command `stream-feeder.py | nc -lk 9999` can be run on the master machine of your spark cluster to feed the Close data into pyspark.

1. Set up the stream to feed data into a pyspark DStream. Write and submit a summary of the steps you took (in English) and enclose the (cleaned up after editing) output of `history > /tmp/my_session.txt`. This history should include what you typed into the shell outside of the pyspark session. \[2 pts\]
2. Use DStream windowing to separately accumulate the sum and count of prices, thus creating moving average DStreams. Write and submit the (cleaned up after editing) transcript of your session along with your code. \[4 pts\]
3. \[Optional, 4 bonus points\]. Compare the two moving averages to indicate buy and sell signals. Your output should be of the form `[( <date> buy), ( <date> sell), etc]`

#### Load the data
Note that this below cell needs to be run once!

In [None]:
# to unpack the dataset into the current directory
# NOTE that this cell needs to run once
%%bash
cd /home/saberbf/BigData/PA5
sudo apt-get install python3-pip
pip3 install pandas
pip3 install feedparser
gsutil cp gs://datathinks-home/stream-feeder.py .
gsutil cp gs://datathinks-home/dj30.csv .
gsutil cp gs://datathinks-home/headline-extractor.py .
gsutil cp gs://datathinks-home/feed-parser.py .
gsutil cp gs://datathinks-home/2020-headlines.csv .

#### Create Spark Streaming Context

##### Note
In case you are using a single-node cluster, executing the below cell is essential, as otherwise, SparkContext put the sc.master on 'yarn'. The result would be you'll never see a collect() to converge.

In [2]:
# sc._conf.getAll()
sc.stop()
# Create a local StreamingContext with 4 working threads
conf = SparkConf().setMaster('local[4]')
sc = SparkContext(conf=conf, appName='NetworkWordCount')

# test the SparkContext to see if it works
rdd = sc.parallelize([('a',7),('a',2),('b',2)])
rdd.collect()

[('a', 7), ('a', 2), ('b', 2)]

In [3]:
# Create a local StreamingContext with batch interval of 10 seconds
ssc = StreamingContext(sc, 10)

In [4]:
# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()
ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate


-------------------------------------------
Time: 2020-12-03 20:51:07
-------------------------------------------

-------------------------------------------
Time: 2020-12-03 20:51:08
-------------------------------------------

-------------------------------------------
Time: 2020-12-03 20:51:09
-------------------------------------------

-------------------------------------------
Time: 2020-12-03 20:51:10
-------------------------------------------

-------------------------------------------
Time: 2020-12-03 20:51:11
-------------------------------------------

-------------------------------------------
Time: 2020-12-03 20:51:12
-------------------------------------------

-------------------------------------------
Time: 2020-12-03 20:51:13
-------------------------------------------

-------------------------------------------
Time: 2020-12-03 20:51:14
-------------------------------------------

-------------------------------------------
Time: 2020-12-03 20:51:15
----------

KeyboardInterrupt: 

-------------------------------------------
Time: 2020-12-03 20:54:59
-------------------------------------------

-------------------------------------------
Time: 2020-12-03 20:55:00
-------------------------------------------

-------------------------------------------
Time: 2020-12-03 20:55:01
-------------------------------------------

-------------------------------------------
Time: 2020-12-03 20:55:02
-------------------------------------------

-------------------------------------------
Time: 2020-12-03 20:55:03
-------------------------------------------

-------------------------------------------
Time: 2020-12-03 20:55:04
-------------------------------------------

-------------------------------------------
Time: 2020-12-03 20:55:05
-------------------------------------------

-------------------------------------------
Time: 2020-12-03 20:55:06
-------------------------------------------

-------------------------------------------
Time: 2020-12-03 20:55:07
----------

## PA4 Question 3

This question builds on the [UCI Online Retail II dataset](https://archive.ics.uci.edu/ml/datasets/Online+Retail+II) analysis you performed in Quiz 3. This time, however, the R, F, M values should be calculated as follows:

- Recency should be the number of days relative to year-end 2011 (Dec 31). 
- Frequency should simply be the number of transactions in the total period.
- Monetary value should be the log10 of the total dollars spent. Why log10? We use logs to flatten the range — so high-spenders don't skew the analysis.

After calculating RFM values as specified above, run K-means clustering to divide the customers into 6 clusters. How do the number of customers in these 6 clusters compare with the clusters you in Question 2 of Quiz 3?