# Loading and Cleaning the Data

Turn on inline matplotlib plotting and import plotting dependencies.

In [None]:
%matplotlib inline

import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import seaborn as sns

Import analytic depedencies. Doc code for [spark-timeseries](http://cloudera.github.io/spark-timeseries/) and source code for [tsanalysis](https://github.com/srowen/ds-for-wall-street).

In [None]:
import numpy as np
import pandas as pd
import tsanalysis.loaddata as ld
import tsanalysis.tsutil as tsutil
import sparkts.timeseriesrdd as tsrdd
import sparkts.datetimeindex as dtindex
from sklearn import linear_model

** Load wiki page view and stock price data into Spark Dataframes.**

`wiki_obs` is a Spark dataframe of (timestamp, page, views) of types (Timestamp, String, Double). `ticker_obs` is a Spark dataframe of (timestamp, symbol, price) of types (Timestamp, String, Double).

In [None]:
wiki_obs = ld.load_wiki_df(sqlCtx, '/user/srowen/wiki.tsv')
ticker_obs = ld.load_ticker_df(sqlCtx, '/user/srowen/ticker.tsv')

** Display the first 5 elements of the `wiki_obs` RDD. **

`wiki_obs` contains Row objects with the fields (timestamp, page, views).

In [None]:
wiki_obs.head(5)

**Display the first 5 elements of the `tickers_obs` RDD.**


`ticker_obs` contains Row objects with the fields (timestamp, symbol, views).

In [None]:
ticker_obs.head(5)

**Create datetime index.**

**Create time series RDD from observations and index. Remove time instants with NaNs.**

**Cache the tsrdd.**

**Examine the first element in the RDD.**

Time series have values and a datetime index. We can create a tsrdd for hourly stock prices from an index and a Spark DataFrame of observations. `ticker_tsrdd` is an RDD of tuples where each tuple has the form (ticker symbol, stock prices) where ticker symbol is a `string` and stock prices is a 1D `np.ndarray`. We create a nicely formatted string representation of this pair in `print_ticker_info()`. Notice how we access the two elements of the tuple.

In [None]:
def print_ticker_info(ticker):
    print ('The first ticker symbol is: {} \nThe first 20 elements of the associated ' +\
    'series are:\n {}').format(ticker[0], ticker[1][:20])

In [None]:
dt_index = dtindex.uniform('2015-08-03', end='2015-09-22', freq=dtindex.HourFrequency(1, sc), sc=sc)
ticker_tsrdd = tsrdd.time_series_rdd_from_observations(dt_index, ticker_obs, 'timestamp', 'symbol',
                                                 'price') \
    .remove_instants_with_nans()
ticker_tsrdd.cache()
first_series = ticker_tsrdd.first()
    
print_ticker_info(first_series)

**Create a wiki page view tsrdd and set the index to match the index of `ticker_tsrdd`.**

** Linearly interpolate to impute missing values.**

`wiki_tsrdd` is an RDD of tuples where each tuple has the form `(page title, wiki views)` where page title is a string and wiki views is a 1D `np.ndarray`. We have cached both RDDs because we will be doing many subsequent operations on them.

In [None]:
wiki_tsrdd = tsrdd.time_series_rdd_from_observations(ticker_tsrdd.index(), wiki_obs, 'timestamp', 'page', 'views') \
    .fill('linear')
wiki_tsrdd.cache()

** Filter out symbols with more than the minimum number of NaNs.**

** Then filter out instants with NaNs. **

In [None]:
def count_nans(vec):
    return np.count_nonzero(np.isnan(vec))

In [None]:
ticker_min_nans = ticker_tsrdd.map(lambda x: count_nans(x[1])).min()

ticker_price_tsrdd = ticker_tsrdd.filter(lambda x: count_nans(x[1]) == ticker_min_nans) \
    .remove_instants_with_nans()
ticker_return_tsrdd = ticker_price_tsrdd.return_rates()
print_ticker_info(ticker_return_tsrdd.first())

# Linking symbols and pages

We need to join together the wiki page and ticker data, but the time series RDDs are not directly joinable on their keys. To overcome this, we have create a dict from wikipage title to stock  ticker symbol.

**Create a dict from ticker symbols to page names.**

**Create another from page names to ticker symbols.**

In [None]:
# a dict from wiki page name to ticker symbol
page_symbols = {}
for line in open('../symbolnames.tsv').readlines():
    tokens = line[:-1].split('\t')
    page_symbols[tokens[1]] = tokens[0]

def get_page_symbol(page_series):
    if page_series[0] in page_symbols:
        return [(page_symbols[page_series[0]], page_series[1])]
    else:
        return []
# reverse keys and values. a dict from ticker symbol to wiki page name.
symbol_pages = dict(zip(page_symbols.values(), page_symbols.keys()))
print page_symbols.items()[0]
print symbol_pages.items()[0]

**Join together wiki_tsrdd and ticker_tsrdd.**

First, we use this dict to look up the corresponding stock ticker symbol and rekey the wiki page view time series. We then join the data sets together. The result is an RDD of tuples where each element is of the form `(ticker_symbol, (wiki_series, ticker_series))`. We count the number of elements in the resulting rdd to see how many matches we have.

In [None]:
joined = wiki_tsrdd.flatMap(get_page_symbol).join(ticker_tsrdd)
joined.cache()
joined.count()

# Correlation and Relationships

**Define a function for computing the pearson r correlation of the stock price and wiki page traffic associated with a  company.**

Here we look up a specific stock and corrsponding wiki page, and provide an example of 
computing the pearson correlation locally. We use [scipy.stats.stats.pearsonr](http://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.pearsonr.html) to compute the pearson correlation and corresponding two sided p value. `wiki_vol_corr` and `corr_with_offset` both return this as a tuple of (corr, p_value).

In [None]:
from scipy.stats.stats import pearsonr

def wiki_vol_corr(page_key):
    # lookup individual time series by key.
    ticker = ticker_tsrdd.find_series(page_symbols[page_key]) # numpy array
    wiki = wiki_tsrdd.find_series(page_key) # numpy array
    return pearsonr(ticker, wiki)

def corr_with_offset(page_key, offset):
    """offset is an integer that describes how many time intervals we have slid
    the wiki series ahead of the ticker series."""
    ticker = ticker_tsrdd.find_series(page_symbols[page_key]) # numpy array
    wiki = wiki_tsrdd.find_series(page_key) # numpy array
    return pearsonr(ticker[offset:], wiki[:-offset])

In [None]:
print wiki_vol_corr('Netflix')
print corr_with_offset('Netflix', 1)

**Create a plot of the joint distribution of wiki trafiic and stock prices for a specific company using seaborn's [jointplot function.](http://stanford.edu/~mwaskom/software/seaborn/generated/seaborn.jointplot.html)**

In [None]:
def joint_plot(page_key, ticker, wiki, offset=0):
    with sns.axes_style("white"):
        sns.jointplot(x=ticker, y=wiki, kind="kde", color="b");
        plt.xlabel('Stock Price')
        plt.ylabel('Wikipedia Page Views')
        plt.title('Joint distribution of {} stock price\n and Wikipedia page views.'\
            +'\nWith a {} day offset'.format(page_key, offset), y=1.20)

In [None]:
def ticker_wiki_joint_plot(page_key, offset=0):
    if (offset == 0):
        ticker = ticker_tsrdd.find_series(page_symbols[page_key]) # numpy array
        wiki = wiki_tsrdd.find_series(page_key)
    else:
        ticker = ticker_tsrdd.find_series(page_symbols[page_key])[offset:]
        wiki = wiki_tsrdd.find_series(page_key)[:-offset]
    joint_plot(page_key, ticker, wiki)
        
ticker_wiki_joint_plot('Apple_Inc.', offset=0)

** Find the companies with the highest correlation between stock prices time series and wikipedia page traffic.**

Note that comparing a tuple means you compare the composite object lexicographically.

In [None]:
joined.mapValues(lambda wiki_ticker: pearsonr(wiki_ticker[0], wiki_ticker[1])) \
    .sortBy(keyfunc=lambda x: x[1]) \
    .collect()

** Add in filtering out less than useful correlation results. **

There are a lot of invalid correlations that get computed, so lets filter those out.

In [None]:
top_correlations = joined.mapValues(lambda wiki_ticker: pearsonr(wiki_ticker[0], wiki_ticker[1])) \
    .filter(lambda corr_tuple: not np.isnan(corr_tuple[1][0]))\
    .sortBy(keyfunc=lambda x: np.abs(x[1][0])) \
    .collect()
top_correlations

** Find the top 10 correlations as defined by the ordering on tuples. **

In [None]:
top_10_corr = top_correlations[-10:]
top_10_corr_tickers = dict(top_10_corr).keys()
top_10_corr_tickers

** Create a joint plot of some of the stronger relationships.**

In [None]:
pages = [symbol_pages[sym] for sym in top_10_corr_tickers]
ticker_wiki_joint_plot(pages[0], offset=1)

# Volatility

** Compute per-day volatility for each symbol. **

In [None]:
ticker_daily_vol = tsutil.sample_daily(ticker_tsrdd, 'std') \
    .remove_instants_with_nans()

print ticker_daily_vol.first()

**Make sure we don't have any NaNs.**

In [None]:
ticker_daily_vol.map(lambda x: count_nans(x[1])).top(1)

# Visualize volatility

**Plot daily volatility in stocks over time.**

In [None]:
daily_vol_df = ticker_daily_vol.to_pandas_dataframe()
daily_vol_df.plot()

What does the distribution of volatility for the whole market look like? Add volatility for individual stocks in a datetime bin.

In [None]:
sns.distplot(daily_vol_df.sum(axis=0), kde=False)
plt.xlabel('Volatility')
plt.ylabel('Fraction of observations')

In [None]:
ax = sns.heatmap(daily_vol_df)
plt.xlabel('Company')
plt.ylabel('Date')

** Find stocks with the highest average daily volatility.**

In [None]:
most_volatile_on_ave_stocks = tsutil.sample_daily(ticker_return_tsrdd, 'std') \
    .remove_instants_with_nans() \
    .mapValues(lambda x: (x, x.mean())) \
    .top(10, lambda x: x[1][1])
[(stock_record[0], stock_record[1]) for stock_record in most_volatile_on_ave_stocks]

** Plot stocks with the highest average daily volatility over time. **

In [None]:
most_volatile_on_ave_stocks_set = set([x[0] for x in most_volatile_on_ave_stocks])
ticker_pandas_df = ticker_daily_vol.filter(lambda x: x[0] in most_volatile_on_ave_stocks_set) \
    .to_pandas_dataframe()
    
ticker_pandas_df.plot()

We first map over `ticker_daily_vol` to find the index of the value with the highest volatility. We then relate that back to the index set on the RDD to find the corresponding datetime.

In [None]:
most_volatile_days = ticker_daily_vol.map(lambda x: (np.argmax(x[1]), 1)) \
    .reduceByKey(lambda x, y: x + y) \
    .top(10, lambda x: x[1])

def lookup_datetime(rdd, loc):
    return rdd.index().datetime_at_loc(loc)

[(lookup_datetime(ticker_daily_vol, day), count ) for day, count in most_volatile_days]

A large number of stock symbols had their most volatile days on August 24th and August 25th of
this year.

# Regress volatility against page views

**Resample the wiki page view data set so we have total pageviews by day.** 

** Cache the wiki page view RDD.**

Resample the wiki page view data set so we have total pageviews by day. This means reindexing the time series and aggregating data together with daily buckets. We use [np.nansum](http://docs.scipy.org/doc/numpy/reference/generated/numpy.nansum.html) to add up numbers while treating nans like zero.

In [None]:
from numpy import nansum

wiki_tsrdd_full = tsrdd.time_series_rdd_from_observations(dt_index, wiki_obs, 'timestamp',
                                                    'page', 'views')
wiki_daily_views = tsutil.sample_daily(wiki_tsrdd_full, nansum) \
    .with_index(ticker_daily_vol.index())
wiki_daily_views.cache()

**Validate data by checking for nans.**

In [None]:
wiki_daily_views.map(lambda x: count_nans(x[1])).top(1)

In [None]:
most_viewy_days = wiki_daily_views.map(lambda x: (np.argmax(x[1]), 1)) \
    .reduceByKey(lambda x, y: x + y) \
    .top(10, lambda x: x[1])

[(lookup_datetime(ticker_daily_vol, day), count ) for day, count in most_viewy_days]

** Fit a linear regression model to every pair in the joined wiki-ticker RDD and extract R^2 scores.**

In [None]:
def regress(X, y):
    model = linear_model.LinearRegression()
    model.fit(X, y)
    score = model.score(X, y)
    return (score, model)

lag = 2
lead = 2

joined = regressions = wiki_daily_views.flatMap(get_page_symbol) \
    .join(ticker_daily_vol)
    
models = joined.mapValues(lambda x: regress(tsutil.lead_and_lag(lead, lag, x[0]), x[1][lag:-lead]))
models.cache()
models.count()

**Print out the symbols with the highest R^2 scores.**

In [None]:
models.map(lambda x: (x[1][0], x[0])).top(5)

**Plot the results of a linear model.**

Plotting a linear model always helps me understand it better. Again, seaborn is super useful with smart defaults built in.

In [None]:
page_views, returns = joined.filter(lambda x: x[0] == 'V').first()[1]
sns.set(color_codes=True)
sns.regplot(x=page_views, y=returns)

# Box plot / Tukey outlier identification

Tukey originally proposed a method for identifying outliers in bow and whisker plots. Eseentially, we find the cut off value for the 75th percentile $P_{75} = percentile(sample, 0.75)$, and add a reasonable buffer (expressed in terms of the interquartile range) $1.5*IQR = 1.5*(P_{75}-P_{25})$ above that cutoff.

**Write a function that returns the high value cutoff for Tukey's boxplot outlier criterion.**

In [None]:
def tukey_high_outlier_cutoff(sample):
    sample = sorted(sample)
    first_quartile = sample[len(sample) // 4]
    third_quartile = sample[len(sample) * 3 // 4]
    iqr = third_quartile - first_quartile
    return third_quartile + iqr * 1.5

**Filter out any values below Tukey's boxplot criterion for outliers.**

In [None]:
cutoff = tukey_high_outlier_cutoff(returns)
filter(lambda x: x > cutoff, returns)

In [None]:
cutoff = tukey_high_outlier_cutoff(page_views)
filter(lambda x: x > cutoff, page_views)

# Black Monday

**Select the date range comprising Black Monday 2015.**

In [None]:
black_monday = ticker_price_tsrdd['2015-08-24 13:00:00':'2015-08-24 20:00:00']

**Which stocks saw the worst return for that day?**

In [None]:
black_monday.mapValues(lambda x: (x[-1] / x[0]) - 1) \
    .takeOrdered(10, lambda x: x[1])

**Plot wiki page views for one of those stocks.**

In [None]:
series = wiki_daily_views \
    .to_pandas_series_rdd() \
    .flatMap(get_page_symbol) \
    .filter(lambda x: x[0] == 'HCA') \
    .first()
series[1].plot()