In [1]:
# @hidden_cell
# The project token is an authorization token that is used to access project resources like data sources, connections, and used by platform APIs.
from project_lib import Project
project = Project(spark.sparkContext, '*******-****-****-****-************', 'p-************************************')
pc = project.project_context

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200709005250-0001
KERNEL_ID = 0bfaedb9-d14e-4cb1-ae84-b27dabb970ac


In [2]:
%%capture
!pip install findspark

In [3]:
#Environment: Spark 2.4 & Python 3.6

import findspark
findspark.init()
findspark.find()
import pyspark

################# Spark ML #############################
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.regression import LinearRegression
from pyspark.sql.functions import percent_rank
from pyspark.sql import Window

from pyspark.ml.evaluation import RegressionEvaluator

In [4]:
import ibmos2spark
# @hidden_cell
credentials = {
    'endpoint': 'https://s3-api.us-geo.objectstorage.service.networklayer.com',
    'service_id': 'iam-ServiceId-*********-****-****-******-**********',
    'iam_service_endpoint': 'https://iam.cloud.ibm.com/oidc/token',
    'api_key': '******************************************'
}

configuration_name = 'os_*************************_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df_data_3 = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(cos.url('ts_data.csv', '************-donotdelete-**-***************'))

df_ts = df_data_3
df_ts = df_ts[['Date','Itau_open','BVSP_open','USDBRL_open','Itau_Close','lag_1','lag_2','lag_3']]

In [5]:
df_data_4 = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(cos.url('feature_news.csv', '************-donotdelete-**-***************'))


df_news = df_data_4
df_news = df_news[['Date', 'Class']]

In [6]:
df = df_ts.alias('a').join(df_news.alias('b'), on = ['Date'], how = 'outer')

In [7]:
from pyspark.sql.functions import col, unix_timestamp, to_date
df = df.withColumn('Date',to_date(unix_timestamp(col('Date'), 'yyyy-MM-dd').cast("timestamp"))).orderBy('Date')
df = df.withColumn('Itau_open', col('Itau_open').cast('double'))
df = df.withColumn('BVSP_open', col('BVSP_open').cast('double'))
df = df.withColumn('USDBRL_open', col('USDBRL_open').cast('double'))
df = df.withColumn('Itau_Close', col('Itau_Close').cast('double'))
df = df.withColumn('lag_1', col('lag_1').cast('double'))
df = df.withColumn('lag_2', col('lag_2').cast('double'))
df = df.withColumn('lag_3', col('lag_3').cast('double'))

In [9]:
from pyspark.sql import Window
import pyspark.sql.functions as func
import sys

window_ff = Window.orderBy('Date')\
               .rowsBetween(-sys.maxsize, 0)

read_last = func.last(df['Class'],  
                      ignorenulls=True)\
                .over(window_ff)
# add columns to the dataframe
df = df.withColumn('Class', read_last)
df = df.na.fill({'Class': 'NN'})

In [10]:
## Creating Dummies News Classes
from pyspark.sql.functions import udf,col
from pyspark.sql.types import IntegerType

categories = df.select('Class').distinct().rdd.flatMap(lambda x : x).collect()
categories.sort()
for category in categories:
    function = udf(lambda item: 1 if item == category else 0, IntegerType())
    new_column_name = 'class'+'_'+category
    df = df.withColumn(new_column_name, function(col('class')))

In [11]:
df = df[['Date','Itau_open','BVSP_open','USDBRL_open','Itau_Close','class_N','class_NN','class_P','lag_1','lag_2','lag_3']]

FEATURES_COL1 = ['Itau_open','BVSP_open','USDBRL_open','lag_1','lag_2','lag_3']
vectorAssembler = VectorAssembler(inputCols=FEATURES_COL1,outputCol="features")
vdf = vectorAssembler.transform(df.na.drop())
vdf = vdf.select(['Date','Itau_Close','features','class_N','class_NN','class_P'])

In [12]:
scale_features = MinMaxScaler(inputCol= 'features', outputCol= 'scaled_features')
model_scale = scale_features.fit(vdf)
df_scaled = model_scale.transform(vdf)

In [15]:
FEATURES_COL1 = ['scaled_features','class_N','class_NN','class_P']
vectorAssembler = VectorAssembler(inputCols=FEATURES_COL1,outputCol="Col_features")
df_completed = vectorAssembler.transform(df_scaled)
df_completed = df_completed.select(['Date','Itau_Close','Col_features'])

In [16]:
df_completed = df_completed.withColumn("rank", percent_rank().over(Window.partitionBy().orderBy("Date")))

In [17]:
train_df = df_completed.where("rank <= .95").drop("rank")
test_df = df_completed.where("rank > .95").drop("rank")

In [18]:
lr = LinearRegression(featuresCol = 'Col_features', labelCol='Itau_Close')
lr_model = lr.fit(train_df)

In [19]:
lr_predictions = lr_model.transform(test_df)

In [20]:
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="Itau_Close",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

R Squared (R2) on test data = 0.989227


In [21]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 0.605064


In [22]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","Itau_Close","Col_features").show()

+------------------+------------------+--------------------+
|        prediction|        Itau_Close|        Col_features|
+------------------+------------------+--------------------+
| 34.59906994251565|  34.2400016784668|[0.87112550780915...|
| 34.24067705490668| 34.79999923706055|[0.85966997581227...|
|34.750114217700606|35.880001068115234|[0.87589868660350...|
|35.564205125912665| 36.38999938964844|[0.90199178886883...|
|  36.4278997689138|36.560001373291016|[0.92967591026995...|
|36.555032855445546| 36.79999923706055|[0.93253979326917...|
| 36.65360207856043| 37.29999923706055|[0.93572195292773...|
| 37.09134911556927| 36.63999938964844|[0.94972309126448...|
| 36.47460822948598|36.869998931884766|[0.92999418692930...|
| 36.30601535738676| 36.22999954223633|[0.92458457620323...|
| 36.66381350208888|36.349998474121094|[0.93604010820010...|
| 36.45502852253872|36.470001220703125|[0.92935775499758...|
|  37.6014961375709|37.099998474121094|[0.96722454453215...|
|  37.0639706513024|36.5

In [23]:
df_pred = predictions.toPandas()
project.save_data(data=df_pred.to_csv(), file_name='predictions.csv',overwrite=True)

{'file_name': 'predictions.csv',
 'message': 'File saved to project storage.',
 'bucket_name': 'ibmcapstoneitub4tsforecastingusin-donotdelete-pr-mnxk95wtdc2z0q',
 'asset_id': '98e36754-787e-423a-b719-024c459b64f6'}