In [94]:
import logging
import time
import os

model_name = 'FredTimeseries' 
# path of log file
log_path = "./"

# local or cluster
#run_mode = 'cluster'
run_mode = 'local'

# other stuff
sc_setLogLevel = "WARN"  # ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
seed = 42


#################################################
# ### START
#################################################

# set logging
logger = logging.getLogger(model_name)
hdlr = logging.handlers.RotatingFileHandler(os.path.join(log_path, model_name + ".log"), maxBytes=1000000,
                                            backupCount=5, mode='w')
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(formatter)
logger.addHandler(hdlr) 
logger.setLevel(logging.INFO)


logger.info("Start Spark")

run_mode="local"
sc_setLogLevel = "INFO"  # ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
seed = 42
if run_mode == 'cluster':
    SPARK_SUBMIT_ARGS = "--conf spark.dynamicAllocation.enabled=true " \
                        "--conf spark.shuffle.service.enabled=true " \
                        "--conf spark.dynamicAllocation.maxExecutors=80 " \
                        "--conf spark.dynamicAllocation.minExecutors=10 " \
                        "--conf spark.dynamicAllocation.executorIdleTimeout=120 " \
                        "--queue datascience.normal " \
                        "--driver-memory 16g " \
                        "--executor-memory 8g " \
                        "--conf spark.shuffle.manager=tungsten-sort " \
                        "pyspark-shell "
else:
    SPARK_SUBMIT_ARGS = " --master local[*]"
    SPARK_SUBMIT_ARGS += " --driver-memory 12g --executor-memory 4g --num-executors 8"
    SPARK_SUBMIT_ARGS += "--packages com.databricks:spark-csv_2.11:1.5.0"
    SPARK_SUBMIT_ARGS += "--spark.sql.pivotMaxValues=200000"
    jars_dir = "/Users/guillermobreto/Downloads/jars/"
    SPARK_SUBMIT_ARGS += " --jars "
    SPARK_SUBMIT_ARGS += ("local:" + jars_dir + "/sparkts-0.3.0-jar-with-dependencies.jar")
    SPARK_SUBMIT_ARGS += (",local:" + jars_dir + "sparkts-0.4.0-SNAPSHOT-jar-with-dependencies.jar")
    #SPARK_SUBMIT_ARGS += (",local:" + jars_dir + "/commons-csv-1.1.jar")
    #SPARK_SUBMIT_ARGS += (",local:" + jars_dir + "/univocity-parsers-1.5.1.jar")
    SPARK_SUBMIT_ARGS += " pyspark-shell"

os.environ["PYSPARK_SUBMIT_ARGS"] = SPARK_SUBMIT_ARGS

sc = SparkContext()
sqlContext = HiveContext(sc)
#sqlContext.setConf("spark.sql.parquet.binaryAsString", "true")
sc.setLogLevel(sc_setLogLevel)

logger.info("Default Parallelism: {}, Spark Version: {}".format(sc.defaultParallelism, sc.version))
logger.info("------------------------------------------------")

In [1]:
import logging
import time
import os

model_name = 'FredTimeseries' 
# path of log file
log_path = "./"

# local or cluster
#run_mode = 'cluster'
run_mode = 'local'


#################################################
# ### START
#################################################

# set logging
logger = logging.getLogger(model_name)
hdlr = logging.handlers.RotatingFileHandler(os.path.join(log_path, model_name + ".log"), maxBytes=1000000,
                                            backupCount=5, mode='w')
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(formatter)
logger.addHandler(hdlr) 
logger.setLevel(logging.INFO)


logger.info("Start Spark")

sc_setLogLevel = "INFO"  # ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
seed = 42

from datetime import datetime

from pyspark import SparkContext, SQLContext
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, TimestampType, DoubleType, StringType, IntegerType

from sparkts.datetimeindex import uniform, BusinessDayFrequency
from sparkts.timeseriesrdd import time_series_rdd_from_observations

from pyspark.sql.functions import explode
import pyspark.sql.functions as f
from pyspark.sql.functions import udf
from datetime import datetime

import numpy as np
import pandas as pd

In [2]:
sc.version

u'1.6.2'

In [3]:
import sparkts.datetimeindex as dt

In [4]:
dt.DayFrequency

sparkts.datetimeindex.DayFrequency

In [5]:
def lineToRow(line):
    (year, month, day, symbol, volume, price) = line.split("\t")
    # Python 2.x compatible timestamp generation
    dt = datetime(int(year), int(month), int(day))
    return (dt, symbol, float(price))

def loadObservations(sparkContext, sqlContext, path):
    textFile = sparkContext.textFile(path)
    rowRdd = textFile.map(lineToRow)
    schema = StructType([
        StructField('timestamp', TimestampType(), nullable=True),
        StructField('symbol', StringType(), nullable=True),
        StructField('price', DoubleType(), nullable=True),
    ])
    return sqlContext.createDataFrame(rowRdd, schema);

# Get the data

In [None]:
%%time
rdd = sc.wholeTextFiles("/Users/guillermobreto/Downloads/fred_timeseries/data/fred_codes/")
print("Number of time series: {}".format(rdd.count()))


In [7]:
from sparkts.datetimeindex import DayFrequency

In [8]:
freq = DayFrequency(1,sc)

In [9]:
%matplotlib inline 
import matplotlib.pyplot as plt

In [10]:
#freq = BusinessDayFrequency(1, 1, sc)
dtIndex = uniform(start='2005-02-01T00:00-00:00', end='2005-06-01T00:00-00:00', freq=freq, sc=sc)

In [11]:
rdd_df = rdd.map(lambda r: (r[0].split("/")[-1].strip(".csv"),filter(None, r[1].split("\n")[1:]))).toDF(["symbol","v"])


In [12]:
%%time
rdd_df.select("symbol").distinct().count()

CPU times: user 22.6 ms, sys: 9.43 ms, total: 32.1 ms
Wall time: 2min 3s


40809

In [13]:
#rdd_df=rdd_df.limit(100)
rdd_df.show(3)

+--------------------+--------------------+
|              symbol|                   v|
+--------------------+--------------------+
|FRED_00XALCCHM086...|[2005-01-01,97.0,...|
|FRED_00XALCFIM086...|[1996-01-01,71.22...|
|FRED_00XALCHRM086...|[2004-12-01,81.32...|
+--------------------+--------------------+
only showing top 3 rows



In [14]:
%%time
rdd_df.select("symbol").distinct().count()

CPU times: user 22.7 ms, sys: 9.47 ms, total: 32.2 ms
Wall time: 2min 3s


40809

In [12]:
rdd_df_exp =  rdd_df.select([rdd_df.symbol,explode(rdd_df.v).alias("DATA-VALUE")])

In [13]:
valueUdf = udf(lambda s: float(s.split(",")[1]), DoubleType())
dateUdf = udf(lambda s: s.split(",")[0], StringType())
new_df =rdd_df_exp.withColumn("Date", (f.to_date(f.lit(dateUdf(rdd_df_exp["DATA-VALUE"]))).cast(TimestampType())))
new_df =new_df.withColumn("price", valueUdf(new_df["DATA-VALUE"]))

In [17]:
new_df.show(3)

+--------------------+---------------+--------------------+-----+
|              symbol|     DATA-VALUE|                Date|price|
+--------------------+---------------+--------------------+-----+
|FRED_00XALCCHM086...|2005-01-01,97.0|2005-01-01 00:00:...| 97.0|
|FRED_00XALCCHM086...|2005-02-01,97.3|2005-02-01 00:00:...| 97.3|
|FRED_00XALCCHM086...|2005-03-01,97.4|2005-03-01 00:00:...| 97.4|
+--------------------+---------------+--------------------+-----+
only showing top 3 rows



In [14]:
freq = DayFrequency(1,sc)
dtIndex = uniform(start='2015-01-01T00:00-05:00', end='2016-10-01T00:00-05:00', freq=freq, sc=sc)

In [15]:
dates = ("2000-11-30",  "2016-10-30")
date_from, date_to = [f.to_date(f.lit(s)).cast(TimestampType()) for s in dates]
df_filtered = new_df.where((new_df.Date > date_from) & (new_df.Date < date_to))

In [20]:
df_filtered.show(3)

+--------------------+---------------+--------------------+-----+
|              symbol|     DATA-VALUE|                Date|price|
+--------------------+---------------+--------------------+-----+
|FRED_00XALCCHM086...|2005-01-01,97.0|2005-01-01 00:00:...| 97.0|
|FRED_00XALCCHM086...|2005-02-01,97.3|2005-02-01 00:00:...| 97.3|
|FRED_00XALCCHM086...|2005-03-01,97.4|2005-03-01 00:00:...| 97.4|
+--------------------+---------------+--------------------+-----+
only showing top 3 rows



In [16]:
df = df_filtered.select(["symbol", "Date", "price"])
df = df.withColumnRenamed("Date", "timestamp")

In [22]:
df.show(2, truncate=False)

+---------------------+---------------------+-----+
|symbol               |timestamp            |price|
+---------------------+---------------------+-----+
|FRED_00XALCCHM086NEST|2005-01-01 00:00:00.0|97.0 |
|FRED_00XALCCHM086NEST|2005-02-01 00:00:00.0|97.3 |
+---------------------+---------------------+-----+
only showing top 2 rows



In [17]:
pd.to_datetime(["2000-11-30"])
print(dates[0])

2000-11-30


## Get the S&P 500

In [19]:
import datetime
import pandas as pd
import pandas.io.data
from pandas import Series, DataFrame
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np
%matplotlib inline
plt.rcParams['figure.figsize'] = (10.0, 8.0)

start = pd.to_datetime(dates[0])
end = pd.to_datetime(dates[1])

sp =  pd.io.data.get_data_yahoo('^GSPC', start, end)


sp.columns.values[-1] = 'AdjClose'
sp.columns = sp.columns + '_SP500'
sp['Return_SP500'] = sp['AdjClose_SP500'].pct_change()
sp.columns

sp = sp.reset_index()[["Date","Close_SP500"]]
sp["Date"]=sp["Date"].map(lambda x: str(x))
sp["symbol"]="Close_SP500"

sp.columns = [ "Date", "price", "symbol"]
sp_df = sqlContext.createDataFrame(sp)
sp_df = sp_df.select(sp_df.symbol, f.to_date(sp_df.Date).cast(TimestampType()).alias("timestamp"), sp_df.price)
sp_df_filtered = sp_df.where((sp_df.timestamp > date_from) & (sp_df.timestamp < date_to))
sp_df_filtered.show(3)

The pandas.io.data module is moved to a separate package (pandas-datareader) and will be removed from pandas in a future version.
After installing the pandas-datareader package (https://github.com/pydata/pandas-datareader), you can change the import ``from pandas.io import data, wb`` to ``from pandas_datareader import data, wb``.


+-----------+--------------------+-----------+
|     symbol|           timestamp|      price|
+-----------+--------------------+-----------+
|Close_SP500|2000-12-01 00:00:...| 1315.22998|
|Close_SP500|2000-12-04 00:00:...|1324.969971|
|Close_SP500|2000-12-05 00:00:...|1376.540039|
+-----------+--------------------+-----------+
only showing top 3 rows



In [20]:
unioned = sp_df_filtered.rdd.union(df.rdd)

In [21]:
unioned_df = unioned.toDF()

In [27]:
unioned_df.show(3)

+-----------+--------------------+-----------+
|     symbol|           timestamp|      price|
+-----------+--------------------+-----------+
|Close_SP500|2000-12-01 00:00:...| 1315.22998|
|Close_SP500|2000-12-04 00:00:...|1324.969971|
|Close_SP500|2000-12-05 00:00:...|1376.540039|
+-----------+--------------------+-----------+
only showing top 3 rows



In [22]:
unioned_df.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- price: double (nullable = true)



In [23]:
%%time
tickerTsrdd = time_series_rdd_from_observations(dtIndex, unioned_df, "timestamp", "symbol", "price")

CPU times: user 1.68 ms, sys: 1.06 ms, total: 2.75 ms
Wall time: 37.6 ms


In [24]:
%%time
tickerTsrdd.take(2)

CPU times: user 57.3 ms, sys: 26.9 ms, total: 84.2 ms
Wall time: 20min 4s


[(u'FRED_00XALCCHM086NEST',
  array([ 100.29,     nan,     nan,     nan,     nan,     nan,     nan,
             nan,     nan,     nan,     nan,     nan,     nan,     nan,
             nan,     nan,     nan,     nan,     nan,     nan,     nan,
             nan,     nan,     nan,     nan,     nan,     nan,     nan,
             nan,     nan,     nan,   99.93,     nan,     nan,     nan,
             nan,     nan,     nan,     nan,     nan,     nan,     nan,
             nan,     nan,     nan,     nan,     nan,     nan,     nan,
             nan,     nan,     nan,     nan,     nan,     nan,     nan,
             nan,     nan,     nan,  100.4 ,     nan,     nan,     nan,
             nan,     nan,     nan,     nan,     nan,     nan,     nan,
             nan,     nan,     nan,     nan,     nan,     nan,     nan,
             nan,     nan,     nan,     nan,     nan,     nan,     nan,
             nan,     nan,     nan,     nan,     nan,     nan,  100.13,
             nan,     nan,     nan, 

In [25]:
filled = tickerTsrdd.fill("linear")

In [26]:
filled.take(2)

[(u'FRED_00XALCCHM086NEST',
  array([ 100.29      ,  100.2783871 ,  100.26677419,  100.25516129,
          100.24354839,  100.23193548,  100.22032258,  100.20870968,
          100.19709677,  100.18548387,  100.17387097,  100.16225806,
          100.15064516,  100.13903226,  100.12741935,  100.11580645,
          100.10419355,  100.09258065,  100.08096774,  100.06935484,
          100.05774194,  100.04612903,  100.03451613,  100.02290323,
          100.01129032,   99.99967742,   99.98806452,   99.97645161,
           99.96483871,   99.95322581,   99.9416129 ,   99.93      ,
           99.94678571,   99.96357143,   99.98035714,   99.99714286,
          100.01392857,  100.03071429,  100.0475    ,  100.06428571,
          100.08107143,  100.09785714,  100.11464286,  100.13142857,
          100.14821429,  100.165     ,  100.18178571,  100.19857143,
          100.21535714,  100.23214286,  100.24892857,  100.26571429,
          100.2825    ,  100.29928571,  100.31607143,  100.33285714,
      

In [27]:
rr = filled.map(lambda ts: (ts[0], np.nan_to_num(ts[1])))

In [28]:
rr.take(1)

[(u'FRED_00XALCCHM086NEST',
  array([ 100.29      ,  100.2783871 ,  100.26677419,  100.25516129,
          100.24354839,  100.23193548,  100.22032258,  100.20870968,
          100.19709677,  100.18548387,  100.17387097,  100.16225806,
          100.15064516,  100.13903226,  100.12741935,  100.11580645,
          100.10419355,  100.09258065,  100.08096774,  100.06935484,
          100.05774194,  100.04612903,  100.03451613,  100.02290323,
          100.01129032,   99.99967742,   99.98806452,   99.97645161,
           99.96483871,   99.95322581,   99.9416129 ,   99.93      ,
           99.94678571,   99.96357143,   99.98035714,   99.99714286,
          100.01392857,  100.03071429,  100.0475    ,  100.06428571,
          100.08107143,  100.09785714,  100.11464286,  100.13142857,
          100.14821429,  100.165     ,  100.18178571,  100.19857143,
          100.21535714,  100.23214286,  100.24892857,  100.26571429,
          100.2825    ,  100.29928571,  100.31607143,  100.33285714,
      

In [29]:
previous = filled.fill("previous")

In [30]:
previous.take(3)

[(u'FRED_00XALCCHM086NEST',
  array([ 100.29      ,  100.2783871 ,  100.26677419,  100.25516129,
          100.24354839,  100.23193548,  100.22032258,  100.20870968,
          100.19709677,  100.18548387,  100.17387097,  100.16225806,
          100.15064516,  100.13903226,  100.12741935,  100.11580645,
          100.10419355,  100.09258065,  100.08096774,  100.06935484,
          100.05774194,  100.04612903,  100.03451613,  100.02290323,
          100.01129032,   99.99967742,   99.98806452,   99.97645161,
           99.96483871,   99.95322581,   99.9416129 ,   99.93      ,
           99.94678571,   99.96357143,   99.98035714,   99.99714286,
          100.01392857,  100.03071429,  100.0475    ,  100.06428571,
          100.08107143,  100.09785714,  100.11464286,  100.13142857,
          100.14821429,  100.165     ,  100.18178571,  100.19857143,
          100.21535714,  100.23214286,  100.24892857,  100.26571429,
          100.2825    ,  100.29928571,  100.31607143,  100.33285714,
      

In [31]:
nearest = previous.fill("nearest")

In [32]:
previous.take(1)

[(u'FRED_00XALCCHM086NEST',
  array([ 100.29      ,  100.2783871 ,  100.26677419,  100.25516129,
          100.24354839,  100.23193548,  100.22032258,  100.20870968,
          100.19709677,  100.18548387,  100.17387097,  100.16225806,
          100.15064516,  100.13903226,  100.12741935,  100.11580645,
          100.10419355,  100.09258065,  100.08096774,  100.06935484,
          100.05774194,  100.04612903,  100.03451613,  100.02290323,
          100.01129032,   99.99967742,   99.98806452,   99.97645161,
           99.96483871,   99.95322581,   99.9416129 ,   99.93      ,
           99.94678571,   99.96357143,   99.98035714,   99.99714286,
          100.01392857,  100.03071429,  100.0475    ,  100.06428571,
          100.08107143,  100.09785714,  100.11464286,  100.13142857,
          100.14821429,  100.165     ,  100.18178571,  100.19857143,
          100.21535714,  100.23214286,  100.24892857,  100.26571429,
          100.2825    ,  100.29928571,  100.31607143,  100.33285714,
      

In [33]:
returns = previous.return_rates()

In [34]:
returns.take(1)

[(u'FRED_00XALCCHM086NEST',
  array([ -1.15793232e-04,  -1.15806642e-04,  -1.15820054e-04,
          -1.15833470e-04,  -1.15846889e-04,  -1.15860311e-04,
          -1.15873736e-04,  -1.15887165e-04,  -1.15900596e-04,
          -1.15914030e-04,  -1.15927468e-04,  -1.15940909e-04,
          -1.15954353e-04,  -1.15967800e-04,  -1.15981250e-04,
          -1.15994703e-04,  -1.16008159e-04,  -1.16021619e-04,
          -1.16035081e-04,  -1.16048547e-04,  -1.16062016e-04,
          -1.16075488e-04,  -1.16088963e-04,  -1.16102441e-04,
          -1.16115922e-04,  -1.16129407e-04,  -1.16142894e-04,
          -1.16156385e-04,  -1.16169879e-04,  -1.16183376e-04,
          -1.16196876e-04,   1.67974725e-04,   1.67946514e-04,
           1.67918313e-04,   1.67890121e-04,   1.67861939e-04,
           1.67833766e-04,   1.67805603e-04,   1.67777449e-04,
           1.67749304e-04,   1.67721169e-04,   1.67693043e-04,
           1.67664927e-04,   1.67636820e-04,   1.67608723e-04,
           1.67580635e-04, 

In [35]:
rr = returns.map(lambda ts: (ts[0], np.nan_to_num(ts[1])))

In [36]:
rr.take(1)

[(u'FRED_00XALCCHM086NEST',
  array([ -1.15793232e-04,  -1.15806642e-04,  -1.15820054e-04,
          -1.15833470e-04,  -1.15846889e-04,  -1.15860311e-04,
          -1.15873736e-04,  -1.15887165e-04,  -1.15900596e-04,
          -1.15914030e-04,  -1.15927468e-04,  -1.15940909e-04,
          -1.15954353e-04,  -1.15967800e-04,  -1.15981250e-04,
          -1.15994703e-04,  -1.16008159e-04,  -1.16021619e-04,
          -1.16035081e-04,  -1.16048547e-04,  -1.16062016e-04,
          -1.16075488e-04,  -1.16088963e-04,  -1.16102441e-04,
          -1.16115922e-04,  -1.16129407e-04,  -1.16142894e-04,
          -1.16156385e-04,  -1.16169879e-04,  -1.16183376e-04,
          -1.16196876e-04,   1.67974725e-04,   1.67946514e-04,
           1.67918313e-04,   1.67890121e-04,   1.67861939e-04,
           1.67833766e-04,   1.67805603e-04,   1.67777449e-04,
           1.67749304e-04,   1.67721169e-04,   1.67693043e-04,
           1.67664927e-04,   1.67636820e-04,   1.67608723e-04,
           1.67580635e-04, 

In [37]:
def moving_average(a, n=3) :
    ret = np.cumsum(a, dtype=float)
    ret[n:] = ret[n:] - ret[:-n]
    return ret[n - 1:] / n


def shifting(a, delta):
    from scipy.ndimage.interpolation import shift
    return shift(a, delta, cval=np.NaN)

In [38]:
ma = rr.map(lambda row:  (row[0], moving_average(row[1], 10)))

In [39]:
ma.take(2)

[(u'FRED_00XALCCHM086NEST',
  array([ -1.15853612e-04,  -1.15867036e-04,  -1.15880463e-04,
          -1.15893893e-04,  -1.15907326e-04,  -1.15920762e-04,
          -1.15934201e-04,  -1.15947643e-04,  -1.15961089e-04,
          -1.15974537e-04,  -1.15987989e-04,  -1.16001444e-04,
          -1.16014901e-04,  -1.16028362e-04,  -1.16041827e-04,
          -1.16055294e-04,  -1.16068764e-04,  -1.16082238e-04,
          -1.16095714e-04,  -1.16109194e-04,  -1.16122677e-04,
          -1.16136163e-04,  -8.77311419e-05,  -5.93275942e-05,
          -3.09255188e-05,  -2.52491440e-06,   2.58742202e-05,
           5.42718862e-05,   8.26680850e-05,   1.11062818e-04,
           1.39456086e-04,   1.67847890e-04,   1.67819722e-04,
           1.67791563e-04,   1.67763414e-04,   1.67735274e-04,
           1.67707144e-04,   1.67679023e-04,   1.67650911e-04,
           1.67622809e-04,   1.67594716e-04,   1.67566633e-04,
           1.67538559e-04,   1.67510495e-04,   1.67482440e-04,
           1.67454394e-04, 

In [40]:
sh = rr.map(lambda row:  (row[0] + '_shift', np.nan_to_num(shifting(row[1], 1))))
ma = rr.map(lambda row:  (row[0] + "_mov_avg", moving_average(row[1])))

In [41]:
sh.take(1)

[(u'FRED_00XALCCHM086NEST_shift',
  array([  0.00000000e+00,  -1.15793232e-04,  -1.15806642e-04,
          -1.15820054e-04,  -1.15833470e-04,  -1.15846889e-04,
          -1.15860311e-04,  -1.15873736e-04,  -1.15887165e-04,
          -1.15900596e-04,  -1.15914030e-04,  -1.15927468e-04,
          -1.15940909e-04,  -1.15954353e-04,  -1.15967800e-04,
          -1.15981250e-04,  -1.15994703e-04,  -1.16008159e-04,
          -1.16021619e-04,  -1.16035081e-04,  -1.16048547e-04,
          -1.16062016e-04,  -1.16075488e-04,  -1.16088963e-04,
          -1.16102441e-04,  -1.16115922e-04,  -1.16129407e-04,
          -1.16142894e-04,  -1.16156385e-04,  -1.16169879e-04,
          -1.16183376e-04,  -1.16196876e-04,   1.67974725e-04,
           1.67946514e-04,   1.67918313e-04,   1.67890121e-04,
           1.67861939e-04,   1.67833766e-04,   1.67805603e-04,
           1.67777449e-04,   1.67749304e-04,   1.67721169e-04,
           1.67693043e-04,   1.67664927e-04,   1.67636820e-04,
           1.67608723

In [42]:
total = sc.union([rr, ma, sh])

In [43]:
from pyspark.mllib.linalg import Vectors
total_df = total.map(lambda x: Row(symbol=x[0], feat=Vectors.dense(x[1]))).map(lambda x: [x[1], x[0]]).toDF(["symbol","feat"])

In [44]:
total_df.show(3)

+--------------------+--------------------+
|              symbol|                feat|
+--------------------+--------------------+
|FRED_00XALCCHM086...|[-1.1579323188559...|
|FRED_00XHOUEU27M0...|[1.91253346933484...|
|   FRED_4BIGEURORECM|[0.0,0.0,0.0,0.0,...|
+--------------------+--------------------+
only showing top 3 rows



In [45]:
total_df_clean = total_df

In [46]:
limited =  total_df_clean.map(lambda ts: [ts[0], [float(l) for l in ts[1].toArray()]])

In [47]:
limited.take(2)

[[u'FRED_00XALCCHM086NEST',
  [-0.00011579323188559698,
   -0.00011580664151089604,
   -0.00011582005424237707,
   -0.00011583347008126132,
   -0.00011584688902832596,
   -0.00011586031108501427,
   -0.0001158737362521034,
   -0.0001158871645308146,
   -0.0001159005959222581,
   -0.00011591403042743309,
   -0.00011592746804756082,
   -0.00011594090878352947,
   -0.00011595435263656029,
   -0.00011596779960765247,
   -0.00011598124969802726,
   -0.00011599470290857283,
   -0.00011600815924062147,
   -0.00011602161869495031,
   -0.00011603508127289164,
   -0.00011604854697555567,
   -0.00011606201580383058,
   -0.00011607548775893761,
   -0.00011608896284187598,
   -0.00011610244105386691,
   -0.00011611592239590962,
   -0.00011612940686911433,
   -0.00011614289447459125,
   -0.0001161563852133396,
   -0.00011616987908658061,
   -0.00011618337609531348,
   -0.00011619687623953823,
   0.00016797472516483225,
   0.000167946514395112,
   0.00016791831309981298,
   0.00016789012127382819,
  

In [48]:
ts = limited.toDF(["Symbol", "ts"])

In [49]:
ts.show(1)

+--------------------+--------------------+
|              Symbol|                  ts|
+--------------------+--------------------+
|FRED_00XALCCHM086...|[-1.1579323188559...|
+--------------------+--------------------+
only showing top 1 row



In [50]:
ts_exploded = ts.select([ts.Symbol,explode(ts.ts).alias("values")])

In [51]:
ts_exploded.show(10)

+--------------------+--------------------+
|              Symbol|              values|
+--------------------+--------------------+
|FRED_00XALCCHM086...|-1.15793231885596...|
|FRED_00XALCCHM086...|-1.15806641510896...|
|FRED_00XALCCHM086...|-1.15820054242377...|
|FRED_00XALCCHM086...|-1.15833470081261...|
|FRED_00XALCCHM086...|-1.15846889028325...|
|FRED_00XALCCHM086...|-1.15860311085014...|
|FRED_00XALCCHM086...|-1.15873736252103...|
|FRED_00XALCCHM086...|-1.15887164530814...|
|FRED_00XALCCHM086...|-1.15900595922258...|
|FRED_00XALCCHM086...|-1.15914030427433...|
+--------------------+--------------------+
only showing top 10 rows



In [52]:
from pyspark.sql.functions import monotonicallyIncreasingId

# This will return a new DF with all the columns + id
res = ts_exploded.coalesce(1).withColumn("index", monotonicallyIncreasingId())

In [53]:
res.show(10, truncate = False)

+---------------------+----------------------+-----+
|Symbol               |values                |index|
+---------------------+----------------------+-----+
|FRED_00XALCCHM086NEST|-1.1579323188559698E-4|0    |
|FRED_00XALCCHM086NEST|-1.1580664151089604E-4|1    |
|FRED_00XALCCHM086NEST|-1.1582005424237707E-4|2    |
|FRED_00XALCCHM086NEST|-1.1583347008126132E-4|3    |
|FRED_00XALCCHM086NEST|-1.1584688902832596E-4|4    |
|FRED_00XALCCHM086NEST|-1.1586031108501427E-4|5    |
|FRED_00XALCCHM086NEST|-1.158737362521034E-4 |6    |
|FRED_00XALCCHM086NEST|-1.158871645308146E-4 |7    |
|FRED_00XALCCHM086NEST|-1.159005959222581E-4 |8    |
|FRED_00XALCCHM086NEST|-1.1591403042743309E-4|9    |
+---------------------+----------------------+-----+
only showing top 10 rows



In [54]:
%%time
from pyspark.sql.window import Window
import pyspark.sql.functions as f
ranked = res.select("Symbol", "index", "values",
     f.rowNumber()
     .over(Window
           .partitionBy("Symbol")
           .orderBy(f.col("index"))
            )
     .alias("rank")
    )

CPU times: user 5.16 ms, sys: 2.33 ms, total: 7.49 ms
Wall time: 393 ms


In [55]:
%%time
ranked.show(3)

+-----------+-------+--------------------+----+
|     Symbol|  index|              values|rank|
+-----------+-------+--------------------+----+
|Close_SP500|7488441|                 0.0|   1|
|Close_SP500|7488442|-0.00609270169656...|   2|
|Close_SP500|7488443|-0.00613005026421...|   3|
+-----------+-------+--------------------+----+
only showing top 3 rows

CPU times: user 11.1 ms, sys: 26.8 ms, total: 38 ms
Wall time: 2min 45s


In [56]:
%%time
pivoted = ranked.groupBy("rank").pivot("Symbol").sum("values")

CPU times: user 19.9 ms, sys: 20.5 ms, total: 40.4 ms
Wall time: 4min 26s


In [58]:
type(pivoted)

pyspark.sql.dataframe.DataFrame

In [None]:
pivoted.show(1)

In [None]:
pivoted.persist(StorageLevel.MEMORY_AND_DISK)

In [63]:
%%time
from pyspark.sql.functions import col, round
pivoted.select(*(round(col(c), 8).alias(c) for c in pivoted.columns[:5])).show(10, truncate=False)

+----+-----------+-------------------+-----------------+---------------------+
|rank|Close_SP500|Close_SP500_mov_avg|Close_SP500_shift|FRED_00XALCCHM086NEST|
+----+-----------+-------------------+-----------------+---------------------+
|1   |0.0        |-0.00407425        |0.0              |-1.1579E-4           |
|2   |-0.0060927 |-0.0061302         |0.0              |-1.1581E-4           |
|3   |-0.00613005|-0.00706379        |-0.0060927       |-1.1582E-4           |
|4   |-0.00616786|-0.00114383        |-0.00613005      |-1.1583E-4           |
|5   |-0.00889347|0.00687488         |-0.00616786      |-1.1585E-4           |
|6   |0.01162984 |0.0070381          |-0.00889347      |-1.1586E-4           |
|7   |0.01788828 |0.00226219         |0.01162984       |-1.1587E-4           |
|8   |-0.00840381|-0.0046023         |0.01788828       |-1.1589E-4           |
|9   |-0.0026979 |-0.00270521        |-0.00840381      |-1.159E-4            |
|10  |-0.00270519|-0.00266543        |-0.0026979    

In [66]:
pivoted_flag.printSchema()

root
 |-- rank: integer (nullable = true)
 |-- Close_SP500_mov_avg: double (nullable = true)
 |-- Close_SP500_shift: double (nullable = true)
 |-- FRED_00XALCCHM086NEST: double (nullable = true)
 |-- FRED_00XALCCHM086NEST_mov_avg: double (nullable = true)
 |-- FRED_00XALCCHM086NEST_shift: double (nullable = true)
 |-- FRED_00XALCFIM086NEST: double (nullable = true)
 |-- FRED_00XALCFIM086NEST_mov_avg: double (nullable = true)
 |-- FRED_00XALCFIM086NEST_shift: double (nullable = true)
 |-- FRED_00XALCHRM086NEST: double (nullable = true)
 |-- FRED_00XALCHRM086NEST_mov_avg: double (nullable = true)
 |-- FRED_00XALCHRM086NEST_shift: double (nullable = true)
 |-- FRED_00XAPFEEM086NEST: double (nullable = true)
 |-- FRED_00XAPFEEM086NEST_mov_avg: double (nullable = true)
 |-- FRED_00XAPFEEM086NEST_shift: double (nullable = true)
 |-- FRED_00XE00EU27M086NEST: double (nullable = true)
 |-- FRED_00XE00EU27M086NEST_mov_avg: double (nullable = true)
 |-- FRED_00XE00EU27M086NEST_shift: double (null

In [None]:
pivoted_flag.saveAsParquetFile("pivoted_fred_parquet")



## Modeling

In [None]:
%%time
from pyspark.sql.functions import col
udfFlag = udf(lambda value: 1.0 if value > 0 else 0.0, DoubleType())

pivoted_flag = pivoted.withColumn("flag", udfFlag(pivoted.Close_SP500)).drop("Close_SP500")

In [None]:
pivoted_flag.groupBy("flag").count().show()

In [None]:
pivoted_flag.columns[-1]

In [None]:
features = pivoted_flag.columns[1:-1]

In [None]:
len(features)

In [None]:

from __future__ import division, print_function

import logging
import os
from time import time

from pyspark import SparkContext
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier
from pyspark.ml.feature import StringIndexer

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.feature import VectorAssembler


assembler = (VectorAssembler()
    .setInputCols(features)
    .setOutputCol("features"))



label_col_name = 'label'
target="flag"

pivoted_flag = pivoted_flag.dropna()

print("Number of rows to model: {}".format(pivoted.count()))
df = pivoted_flag.withColumn(label_col_name, f.col(target).cast('double'))

label_idx_col_name = "indexed_label"
label_indexer = StringIndexer(inputCol=label_col_name, outputCol=label_idx_col_name)
pipeline = Pipeline(stages=[label_indexer, assembler])
df = pipeline.fit(df).transform(df)



In [None]:

model_name = 'FredTimeseries' 
# path of log file
log_path = "./"

# local or cluster
#run_mode = 'cluster'
run_mode = 'local'

# other stuff
sc_setLogLevel = "INFO"  # ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
seed = 42


#################################################
# ### START
#################################################

# set logging
logger = logging.getLogger(model_name)
hdlr = logging.handlers.RotatingFileHandler(os.path.join(log_path, model_name + ".log"), maxBytes=1000000,
                                            backupCount=5, mode='w')
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(formatter)
logger.addHandler(hdlr) 
logger.setLevel(logging.INFO)
start = time()

logger.info("Start Spark")

In [None]:
from time import time

from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql import functions as f

def test_performance_cross_validation(dataset, logger, classifier, label_col, n_folds, seed=None):
    """
     Evaluate classifier performance using k-fold cross validation
     https://spark.apache.org/docs/1.6.0/mllib-evaluation-metrics.html
     """
    rand_col = "uid_rand"
    h = 1.0 / n_folds
    df = dataset.select("*", f.rand(seed).alias(rand_col))
#   metrics_dict = {"roc_auc": [],}

    metrics_dict = {"roc_auc": [],  # roc: y=tpr x=fpr
                    "true_pos_rate": [],  # recall = true pos rate 
                    "false_pos_rate": [],
                    "precision": [],
                    "n_true_neg": [],
                    "n_false_neg": [],
                    "n_false_pos": [],
                    "n_true_pos": [], }

    model = None
    for i in range(n_folds):
        if i == 4:
            logger.info("Just keeping model for {} fold".format(i))
        validate_lb = i * h  # lower bound
        validate_ub = (i + 1) * h  # upper bound
        condition = (df[rand_col] >= validate_lb) & (df[rand_col] < validate_ub)
        validation = df.filter(condition)
        train = df.filter(~condition)

        #         # train
        model = classifier.fit(train)

        #         # predict
        prediction = model.transform(validation)

        #         # assess performance metrics
        prediction_and_labels = prediction.map(lambda x: (x['prediction'], x[label_col]))
        #         print(prediction_and_labels)
        metrics = MulticlassMetrics(prediction_and_labels)
        metrics_areas = BinaryClassificationMetrics(prediction_and_labels)  # gets roc and precRecall curves
        metrics_dict['roc_auc'].append(metrics_areas.areaUnderROC)
        #         # a bit slow, have to calc outside loop
        cm = metrics.confusionMatrix().toArray()
        n_true_neg = cm[0, 0]
        n_false_neg = cm[1, 0]
        n_true_pos = cm[1, 1]
        n_false_pos = cm[0, 1]
        #         #
        metrics_dict['n_true_neg'].append(n_true_neg) 
        metrics_dict['n_false_neg'].append(n_false_neg)
        metrics_dict['n_true_pos'].append(n_true_pos)
        metrics_dict['n_false_pos'].append(n_false_pos) 
        metrics_dict['true_pos_rate'].append(n_true_pos / (n_true_pos+n_false_neg))
        metrics_dict['false_pos_rate'].append(n_false_pos / (n_false_pos+n_true_neg))
        metrics_dict['precision'].append(n_true_pos / (n_true_pos+n_false_pos))

    return model, metrics_dict

In [None]:
features_col_name="features"

model_dict = {
    "logistic_regression": LogisticRegression(labelCol=label_idx_col_name, featuresCol=features_col_name,
                                              maxIter=20, standardization=True,
                                              regParam=0.0, elasticNetParam=0.0),
    
    "decision_tree": DecisionTreeClassifier(labelCol=label_idx_col_name, featuresCol=features_col_name,
                                            impurity='gini', maxDepth=5,
                                            minInstancesPerNode=1, minInfoGain=0.0),

    "random_forest": RandomForestClassifier(labelCol=label_idx_col_name, featuresCol=features_col_name,
                                            maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0,
                                            impurity="gini", numTrees=20, featureSubsetStrategy="auto", seed=seed),
    
    "gradient_boosted_tree": GBTClassifier(labelCol=label_idx_col_name, featuresCol=features_col_name,
                                            maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0,
                                            lossType="logistic", maxIter=20, stepSize=0.1),
}


#####################################################
# ### Fit and Assess Models Perfomance
#####################################################

n_folds = 5
columns = ["fold_" + str(fold) for fold in range(n_folds)]
frames = []
t_time = time()
model_dict_fitted = {}
fold = 0

# cache df to speed up fitting loop!
df.cache()

for name, estimator in model_dict.items():
    logger.info("Estimator: {}".format(name))
    model, results = test_performance_cross_validation(df, logger=logger, classifier=estimator,
                                                         label_col=label_idx_col_name, n_folds=n_folds, seed=seed)
    # save model for later use 
    model_dict_fitted[name] = model 
    
    logger.info("Estimator performace metrics {}".format(results))
    
    tmp_df = pd.DataFrame.from_dict(results, orient="index")
    tmp_df.index.name = "metric"
    tmp_df.columns = columns            
    tmp_df["estimator"] = name
    frames.append(tmp_df)
    fold += 1
    
    logger.info("The modeling for estimator {} took: {}".format(name, time()-t_time))
    logger.info("------------------------------------------------")

modeling_results = pd.concat(frames)
modeling_results["mean"] = modeling_results[columns].mean(axis=1) 
modeling_results["std"] = modeling_results[columns].std(axis=1) 
modeling_results = modeling_results.reset_index().set_index(["estimator", "metric"])

logger.info("The modelling for all estimators took: {}".format(time()-t_time))
logger.info("Modelling performance results")
logger.info(modeling_results)

modeling_results.to_csv(model_name+'_performance.csv', index=True)




In [None]:
modeling_results

In [None]:
sc.version