# I. Data processing part by Frank


In [40]:
## work in pyspark
import json
import os
import pyspark
from datetime import date , datetime
from pyspark.sql.types import StringType, TimestampType, IntegerType
from pyspark.context import SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.storagelevel import StorageLevel
from pyspark.sql import functions as F
from pyspark.sql.functions import *

In [41]:
spark = SparkSession.builder.getOrCreate()

In [42]:
day = '20150217'
num_of_domain = 100
region = "us"


In [43]:
# set path
file_path =  "s3n://log.sharethis.com/amankesarwani/" + region + "/" + str(day) + "/part-000000000001*"
stock_return_path = "s3n://log.sharethis.com/Stock_Proceesed_return_Frank_lag.csv"


## DON'T TOUCH

In [44]:
"""
IT MAGIC

let's clean the stock data
"""
stock_return_raw = spark.read.csv(stock_return_path, sep=",", header=True)
## clean the time
stock_return_cleaned = stock_return_raw.withColumn("TimeStamp", from_utc_timestamp(stock_return_raw.Lagged_Time, "UTC")) \
                        .drop("Lagged_Time").withColumnRenamed("variable", "Ticker")

stock_return_cleaned.cache()

SP500_tickers = [x.Ticker for x in stock_return_cleaned.select("Ticker").distinct().collect()]


In [45]:
len(SP500_tickers)

503

## THIS IS ALSO MAGIC

In [46]:

"""
Then we process the sentiment data
"""
## personalized UDF
ricToTicker = udf(lambda x: x.split('.')[0])

# load data
sharethis_json = spark.read.json(file_path) ## spark 2.0

# data process
temp_json_raw = sharethis_json.select('*',explode(col('companies')).alias('tempCol')).drop('companies')
json_cleaned = (temp_json_raw.select( '*' ,temp_json_raw.tempCol.getItem("count").alias('company_count'),
    temp_json_raw.tempCol.getItem("sentiment_score").alias('company_sentiment_score'),
    temp_json_raw.tempCol.getItem("ric").alias('company_ric'))
    .drop('tempCol').drop('stid').drop('url').drop('userAgent')
    .filter(col('company_ric').isNotNull())
    .filter(col('company_sentiment_score').isNotNull())
    .withColumn("Ticker", ricToTicker(col("company_ric"))).drop("company_ric")
    )

## cache the data
json_cleaned.cache()

## we only need SP500 tickers
json_cleaned = json_cleaned.filter(col("Ticker").isin(SP500_tickers))


device_categories = [u'Personal computer', u'Tablet', u'Smartphone']

refDomain_categories = (json_cleaned.groupBy("refDomain")
                        .count().orderBy(desc("count")).select("refDomain")
                        .rdd.flatMap(lambda x: x).top(num_of_domain))

refDomain_categories_filter = []
for x in refDomain_categories:
    try:
        temp = x.split('.')[-2]
        if temp not in refDomain_categories_filter:
            refDomain_categories_filter.append(temp)
    except:
        pass

exprs_device = [F.when(F.col("deviceType") == category, 1).otherwise(0).alias("is_device_"+category)
        for category in device_categories]
exprs_domain = [F.when(F.col("refDomain") == category, 1).otherwise(0).alias("is_domain_"+category)
        for category in refDomain_categories_filter]

labeled_json_cleaned = json_cleaned.select("*", *exprs_device) \
        .select("*", *exprs_domain).drop("deviceType").drop("refDomain")

## finally we parse the time
parse_sharethis_time = udf(lambda x: datetime.strptime(x, "%Y-%m-%dT%H:%M:%S.%fZ"), TimestampType())
getHours = udf(lambda x: x.hour , IntegerType())
labeled_json_final = labeled_json_cleaned.withColumn("TimeStamp", parse_sharethis_time(col("standardTimestamp"))).drop("standardTimestamp")

#labeled_json_final = json_cleaned.withColumn("TimeStamp", parse_sharethis_time(col("standardTimestamp"))).drop("standardTimestamp")


In [51]:
"""
THIS IS TRUE MAGIC
"""
joined_dataframe = labeled_json_final.join(stock_return_cleaned, ["TimeStamp", "Ticker"], how = "left_outer")

joined_dataframe_filtered = joined_dataframe.filter(col('Return').isNotNull())

In [52]:
joined_dataframe_filtered.cache()

DataFrame[TimeStamp: timestamp, Ticker: string, browserFamily: string, channel: string, mappedEvent: string, os: string, shortIp: string, company_count: string, company_sentiment_score: string, is_device_Personal computer: int, is_device_Tablet: int, is_device_Smartphone: int, is_domain_zoomfreegames: int, is_domain_zooatlanta: int, is_domain_zonable: int, is_domain_zeeklytv: int, is_domain_zamm: int, is_domain_zackhunt: int, is_domain_youtump3: int, is_domain_yonanas: int, is_domain_yandex: int, is_domain_yahoo: int, is_domain_xhamster-proxy: int, is_domain_wavy: int, is_domain_teen: int, is_domain_smartbrief: int, is_domain_alcatel-lucent: int, is_domain_zyrtecprofessional: int, is_domain_zoho: int, is_domain_ziggityzoom: int, is_domain_zetronix: int, is_domain_zergnet: int, is_domain_zapmeta: int, is_domain_zabasearch: int, is_domain_yummymummyclub: int, is_domain_yugatech: int, is_domain_ytvids: int, is_domain_yr: int, is_domain_youtubeproxy: int, is_domain_yoursouthernforddealers:

In [322]:
joined_dataframe_filtered.select("company_count").show(50)

+-------------+
|company_count|
+-------------+
|            3|
|            2|
|            3|
|            1|
|            1|
|            2|
|            1|
|            3|
|            5|
|            1|
|           15|
|            2|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|           15|
|            1|
|            2|
|            2|
|            3|
|            1|
|            4|
|            1|
|            1|
|            5|
|            7|
|            2|
|            1|
|            9|
|            2|
|            2|
|            1|
|            5|
|            1|
|            4|
|            1|
|            1|
|            3|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|            4|
|            1|
+-------------+
only showing top 50 rows



In [292]:
#First we creat T/F labels for return 
joined_dataframe_filtered = joined_dataframe_filtered.withColumn("Label",joined_dataframe_filtered["Return"]>0)
#NOTE THAT WE NEED TO USE A STRING INSTANCE WHEN USING CAST
joined_dataframe_filtered = joined_dataframe_filtered.withColumn("Label",joined_dataframe_filtered["Label"].cast("Double"))
#joined_dataframe_filtered.select("Label").show(50)     #become 0/1 based
#now look at if votes are balanced
joined_dataframe_filtered.filter(joined_dataframe_filtered.Label==1).count()#clearly not balanced...
#437T:17F

31

# II. Machine Learning ðŸ˜¯

## GBT

In [333]:
from pyspark.sql.types import *
from pyspark.sql.types import BooleanType,DoubleType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [334]:
#let's look at what cols we have here...
col=joined_dataframe_filtered.columns
#dropping redundant columns, need a better way...
col.remove("Return")
col.remove("TimeStamp")
col.remove("Ticker")
col.remove("channel")
col.remove("browserFamily")
col.remove("mappedEvent")
col.remove("os")
col.remove("shortIp")
#col.remove("company_count")
#col.remove("company_sentiment_score")
#convert sentiment score into double

joined_dataframe_filtered = joined_dataframe_filtered.withColumn("company_sentiment_score",
                                                                 joined_dataframe_filtered
                                                                 ["company_sentiment_score"].cast("Double"))


#cast company_count into int
joined_dataframe_filtered = joined_dataframe_filtered.withColumn("company_count",
                                                                 joined_dataframe_filtered
                                                                 ["company_count"].cast("Integer"))

#joined_dataframe_filtered.printSchema()
#joined_dataframe_filtered.select("company_sentiment_score").distinct().show()


In [335]:
vectorizer = VectorAssembler()
vectorizer.setInputCols(col)
vectorizer.setOutputCol("Features")

VectorAssembler_470b8209af64067ea556

In [336]:
seed = 1014
(split20DF, split80DF) = joined_dataframe_filtered.randomSplit([0.2,0.8],seed)

# Let's cache these datasets for performance
testSet = split20DF
trainingSet = split80DF

In [337]:
gbt=GBTClassifier()
#print(gbt.explainParams())
"""
lossType: Loss function which GBT tries to minimize (case-insensitive). Supported options: logistic (default: logistic)
maxBins: Max number of bins for discretizing continuous features.  
Must be >=2 and >= number of categories for any categorical feature. (default: 32)
maxDepth: Maximum depth of the tree. (>= 0) 
E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. (default: 5)
maxIter: max number of iterations (>= 0). (default: 20)
minInfoGain: Minimum information gain for a split to be considered at a tree node. (default: 0.0)
minInstancesPerNode: Minimum number of instances each child must have after split.
If a split causes the left or right child to have fewer than minInstancesPerNode, 
the split will be discarded as invalid. Should be >= 1. (default: 1)
stepSize: Step size to be used for each iteration of optimization (>= 0). (default: 0.1)
subsamplingRate: Fraction of the training data used for learning each decision tree, in range (0, 1]. (undefined)
"""

'\nlossType: Loss function which GBT tries to minimize (case-insensitive). Supported options: logistic (default: logistic)\nmaxBins: Max number of bins for discretizing continuous features.  \nMust be >=2 and >= number of categories for any categorical feature. (default: 32)\nmaxDepth: Maximum depth of the tree. (>= 0) \nE.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. (default: 5)\nmaxIter: max number of iterations (>= 0). (default: 20)\nminInfoGain: Minimum information gain for a split to be considered at a tree node. (default: 0.0)\nminInstancesPerNode: Minimum number of instances each child must have after split.\nIf a split causes the left or right child to have fewer than minInstancesPerNode, \nthe split will be discarded as invalid. Should be >= 1. (default: 1)\nstepSize: Step size to be used for each iteration of optimization (>= 0). (default: 0.1)\nsubsamplingRate: Fraction of the training data used for learning each decision tree, in range (0, 1]

In [338]:
gbt.setLabelCol("Label") \
    .setPredictionCol("Predicted_Label") \
    .setFeaturesCol("Features")

GBTClassifier_49c497ea12b0f11262b4

In [339]:
#create a pipeline
gbtPipeline=Pipeline()

In [340]:
# Set the stages of the Pipeline
gbtPipeline.setStages([vectorizer,gbt])

Pipeline_477f9ebec914a91db854

In [341]:
#first glimps
#NOTE THAT PIPELINE/VECTORIZER FIT CURRENTLY ONLY SUPPORT NUMERIC FEATURES??????
gbt_glimps1=gbtPipeline.fit(trainingSet)

In [342]:
#make prediction
test_predictions = gbt_glimps1.transform(testSet)

In [343]:
predicted_label_test=test_predictions.select("Predicted_Label", "Label")

In [344]:
# Select (prediction, true label) and compute test error
evaluator = BinaryClassificationEvaluator(labelCol= "Label", rawPredictionCol="Predicted_Label",metricName="areaUnderROC")
accuracy = evaluator.evaluate(predicted_label_test)
accuracy
#ðŸ˜¯, this is wierd

1.0

In [345]:
predicted_label_test.show(10)

+---------------+-----+
|Predicted_Label|Label|
+---------------+-----+
|            0.0|  0.0|
|            0.0|  0.0|
|            0.0|  0.0|
|            1.0|  1.0|
|            0.0|  0.0|
|            0.0|  0.0|
|            0.0|  0.0|
|            0.0|  0.0|
|            0.0|  0.0|
|            0.0|  0.0|
+---------------+-----+
only showing top 10 rows



In [346]:
#explore feature importanace
#...and there is no feature importance for GBT as for now
#available for random forest

In [305]:
#explore cross validation
# Let's create our CrossValidator with 3 fold cross validation
crossval = CrossValidator(estimator=gbtPipeline, evaluator=evaluator, numFolds=3)

# Let's tune over our regularization parameter subsamplingRate from 0 to 1
subsamplingRate = [x / 10.0 for x in range(1, 3)]

In [306]:
# We'll create a paramter grid using the ParamGridBuilder, and add the grid to the CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(gbt.subsamplingRate, subsamplingRate)
             .build())
crossval.setEstimatorParamMaps(paramGrid)

CrossValidator_45da9dc9281a9ab0a9ef

In [307]:
# Now let's find and return the best model
cvModel = crossval.fit(trainingSet).bestModel

#predict on test set
predicted_label_test_cv=cvModel.transform(testSet).select("Predicted_Label", "Label")

#evaluate the accuracy by AUC
evaluator = BinaryClassificationEvaluator(labelCol= "Label", rawPredictionCol="Predicted_Label",metricName="areaUnderROC")
accuracy = evaluator.evaluate(predicted_label_test_cv)
accuracy

## Random Forest

In [388]:
from pyspark.ml.classification import RandomForestClassifier

In [389]:
dt = RandomForestClassifier()

dt.setLabelCol("Label")\
  .setPredictionCol("Predicted_Label")\
  .setFeaturesCol("Features")\

RandomForestClassifier_4ee6992e918f85de6087

In [390]:
dtPipeline=Pipeline()

In [391]:
dtPipeline.setStages([vectorizer,dt])

Pipeline_4308ab36ea4a8a907f94

In [392]:
print(dt.explainParams())

cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. (default: 10)
featureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: auto, all, onethird, sqrt, log2 (0.0-1.0], [1-n]. (default: auto)
featuresCol: features column name. (default: features, current: Features)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (default: gini)
labelCol: label column name. (default: label, current: Label)
maxBins: Max number of bins for discretizing continuous features.  Must b

In [393]:
#first glimps
#NOTE THAT PIPELINE/VECTORIZER FIT CURRENTLY ONLY SUPPORT NUMERIC FEATURES??????
dt_glimps1=dtPipeline.fit(trainingSet)

In [394]:
#make prediction
test_predictions = dt_glimps1.transform(testSet)

In [395]:
predicted_label_test=test_predictions.select("Predicted_Label", "Label")

In [396]:
# Select (prediction, true label) and compute test error
evaluator = BinaryClassificationEvaluator(labelCol= "Label", rawPredictionCol="Predicted_Label",metricName="areaUnderROC")
accuracy = evaluator.evaluate(predicted_label_test)
accuracy
#ðŸ˜¯, this is wierd

0.5

In [419]:
#look at feature importance
rf_model1=dt_glimps1.stages[1]

rf_model1.featureImportances

In [420]:
#try CV

In [436]:
#explore cross validation
# Let's create our CrossValidator with 3 fold cross validation
crossval = CrossValidator(estimator=dtPipeline, evaluator=evaluator, numFolds=3)

# Let's tune over our regularization parameter featureSubsetStrategy
featureSubsetStrategy = [x for x in ["sqrt","auto"]]

In [437]:
# We'll create a paramter grid using the ParamGridBuilder, and add the grid to the CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(dt.featureSubsetStrategy,featureSubsetStrategy )
             .build())
crossval.setEstimatorParamMaps(paramGrid)

CrossValidator_41db9d871e14b3f3ef4b

In [438]:
# Now let's find and return the best model
cvModel_rf = crossval.fit(trainingSet).bestModel

#predict on test set
predicted_label_test_cv_rf=cvModel.transform(testSet).select("Predicted_Label", "Label")

#evaluate the accuracy by AUC
evaluator = BinaryClassificationEvaluator(labelCol= "Label", rawPredictionCol="Predicted_Label",metricName="areaUnderROC")
accuracy = evaluator.evaluate(predicted_label_test_cv_rf)
accuracy

1.0