In [2]:
sc

## Take a look at data (5.2)

In [3]:
from pymongo import MongoClient
import pprint

In [4]:
client=MongoClient('localhost')

In [5]:
db=client.packt

In [6]:
redditCollection=db.reddit

In [7]:
pprint.pprint(redditCollection.find_one())

{u'_id': ObjectId('5a355c33293d03c5e1bf3311'),
 u'archived': True,
 u'author': u'David_ungerer',
 u'author_flair_css_class': None,
 u'author_flair_text': None,
 u'body': u'Good rant, stop looking for a mass movement, if one appears fine, but first change yourself.\nWhile at 54 still a work in process, I am a vegetarian (see corporate tainted food ect.) I shop at farmers markets, small shops and co-ops (member owned) I live in a urban space for two of 900 sq ft. I ride a bike and take MTA. I bank at a Federal Credit Union (small, local, member owned. Most importantly, I choose to live simply.\nAs example . . . Go to a corporate drug store (the checkout girl has no health or retirement  and has to stock shelves and sweep floors for minimum wage or less if the corporation could, and is concerned abouting chatting about the community becouse of the boss.) or ride a mile to an owner run drug store (she welcome me by name, asks about my family and work or the community. She provides health a

## Specify schema

In [8]:
import pyspark.sql.types

In [9]:
from pyspark.sql.types import StructField,StructType, FloatType,StringType,IntegerType

In [10]:
schema=StructType([
                StructField('ups',IntegerType()),
                   StructField('created_utc',StringType()),
                  StructField('postLength',IntegerType())])

## Read Mongo collection into Dataframe

In [11]:
pipeline = "[\
{$project:{'ups':1,'created_utc':1,'_id':0,'postLength':{$size:{$split:['$body',' ']}}}}]"

In [12]:
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri",
"mongodb://127.0.0.1/packt.reddit").option("pipeline",pipeline).schema(schema).load()

In [13]:
df.printSchema()

root
 |-- ups: integer (nullable = true)
 |-- created_utc: string (nullable = true)
 |-- postLength: integer (nullable = true)



## Convert UTC timestamp to hour

In [14]:
from pyspark.sql.functions import udf
import datetime

In [15]:
getHour=udf(lambda x:datetime.datetime.fromtimestamp(int(x)).hour,IntegerType())

In [16]:
dfClean=df.withColumn('hour',getHour('created_utc'))

In [17]:
dfClean.head()

Row(ups=5, created_utc=u'1262304000', postLength=278, hour=0)

## Preparing data for spark.ml (5.3)

In [18]:
from pyspark.ml.feature import VectorAssembler
# We need to convert DataFrame columns into Vectors

In [19]:
assembler=VectorAssembler(
  inputCols=["postLength","hour"], outputCol="features")

In [20]:
vectorDf=assembler.transform(dfClean)

In [21]:
vectorDf.head()

Row(ups=5, created_utc=u'1262304000', postLength=278, hour=0, features=DenseVector([278.0, 0.0]))

## Predicting up votes (5.4)

In [23]:
from pyspark.ml.regression import LinearRegression
# This is our regressor
from pyspark.ml.evaluation import RegressionEvaluator
# Module to evaluate fit

In [24]:
rf=LinearRegression(labelCol="ups", featuresCol="features")
# Create a linear regressor

In [25]:
(trainingData, testData) = vectorDf.randomSplit([0.7, 0.3])

In [None]:
model = rf.fit(trainingData)

In [28]:
predictions = model.transform(testData)

In [None]:
evaluator = RegressionEvaluator(labelCol="ups", \
        predictionCol="prediction", metricName="mae")
rmse = evaluator.evaluate(predictions)


In [31]:
print 'Mean absolute error in number of up votes is %.2f' % rmse
# Add in post length

Mean absolute error in number of up votes is 4.75


In [None]:
stringIndexer = StringIndexer(inputCol="subreddit", outputCol="subredditIndex")
model = stringIndexer.fit(dfClean)
indexed = model.transform(dfClean)

#encoder = OneHotEncoder(inputCol="subredditIndex", outputCol="subredditVec")
#encoded = encoder.transform(indexed)

In [None]:
indexed.head()

In [None]:
indexed.select(['subredditIndex','subreddit']).head(10)

## Convert subredditIndex into vector

In [None]:
assembler=VectorAssembler(
  inputCols=["subredditIndex"], outputCol="subredditIndexV")

In [None]:
indexed=assembler.transform(indexed)

In [None]:
indexed.head(5)

In [None]:
indexer = StringIndexer(inputCol="subreddit", outputCol="subredditI")
indexedFinal = indexer.fit(df).transform(indexed)

In [None]:
indexedFinal.head()

In [None]:
indexedFinal.select('subredditI','subreddit').head(15)

## Set >20 =>21

In [None]:
setIndex=udf(lambda x:int(x) if x<20 else 21,IntegerType())

In [None]:
indexedFinalCut=indexedFinal.withColumn('indexCut',setIndex('subredditI'))

In [None]:
indexedFinalCut.head(5)

## --

In [None]:
indexer = VectorIndexer(inputCol="subredditIndexV", outputCol="subredditIndexLim", maxCategories=20)

indexerModel=indexer.fit(indexed)

indexedFinal=indexerModel.transform(indexed)

In [None]:
indexerModel.extractParamMap()

In [None]:
categoricalFeatures = indexerModel.categoryMaps
print("Chose %d categorical features: %s" %
      (len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))

In [None]:
indexedFinal.head()

In [None]:
dd=indexedFinal.select(['subredditIndexLim'])

In [None]:
indexedFinalCut.head()

## One-hot encode indexcut

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

In [None]:
encoder = OneHotEncoder(inputCol="indexCut", outputCol="indexCutHot")
encoded = encoder.transform(indexedFinalCut)
encoded.show()

## Assemble into vector

In [None]:
inputCols=["indexCutHot","postLength","hour"]
inputCols=["postLength",'hour']

In [None]:
assembler=VectorAssembler(
  inputCols=inputCols, outputCol="featuresFinal")

In [None]:
ff=assembler.transform(encoded)

In [None]:
ff.head()

In [None]:
rf=RandomForestRegressor(labelCol="ups", featuresCol="featuresFinal", numTrees=10)
# Create a random forest regressor

In [None]:
from pyspark.ml.regression import LinearRegression

In [None]:
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8,\
                      labelCol='ups',featuresCol='featuresFinal')

In [None]:
(trainingData, testData) = ff.randomSplit([0.7, 0.3])

In [None]:
model = lr.fit(trainingData)

In [None]:
predictions = model.transform(testData)

In [None]:
evaluator = RegressionEvaluator(\
            labelCol="ups", predictionCol="prediction", metricName="mae")
rmse = evaluator.evaluate(predictions)

In [None]:
print 'Mean absolute error in number of up votes is %.2f' % rmse
# Add in post length

In [None]:
ff.head()

In [None]:
from pyspark.sql.functions import udf
stringToVector=udf(lambda x: DenseVector([x]), VectorUDT())

In [None]:
indexed.head()

In [None]:
final=indexed.withColumn('subredditV',stringToVector(indexed['subreddit']))

In [None]:
final.head()

In [None]:
indexer = VectorIndexer(inputCol="subredditV", outputCol="vSubreddit", maxCategories=10)
indexerModel = indexer.fit(final)

In [None]:
categoricalFeatures = indexerModel.categoryMaps
print("Chose %d categorical features: %s" %
      (len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))

In [None]:
indexedData = indexerModel.transform(final)

In [None]:
dfClean.head()

In [None]:
assembler=VectorAssembler(
  inputCols=["hour","postLength","subreddit"], outputCol="features")

In [None]:
vectorDf=assembler.transform(dfClean)

In [None]:
vectorDf.head()

In [None]:
rf=RandomForestRegressor(labelCol="ups", featuresCol="features", numTrees=1)

In [None]:
(trainingData, testData) = vectorDf.randomSplit([0.7, 0.3])

In [None]:
pipeline = Pipeline(stages=[rf])

In [None]:
model = pipeline.fit(trainingData)

In [None]:
predictions = model.transform(testData)

In [None]:
evaluator = RegressionEvaluator(\
            labelCol="ups", predictionCol="prediction", metricName="mae")
rmse = evaluator.evaluate(predictions)


In [None]:
print 'Mean absolute error in number of up votes is %.2f' % rmse
# Add in post length

4.78 to beat