# Data Frames With Spark & Pyspark

**Summary**

**Motivation**example I will be going over a small use case of the Data Science Pipline involving Apache Spark.  The goal is to be able to create predicitions at scale for whether users will click on an add or not.  

**GLOSSARY:**

**Show** Shows the content of the data frame, accepts an argument of how many rows to show.  Show is an *action*
* **dataframe.show**(*int*)

**dataframe.printSchema**( ) prints the schema of the data frame in tree format

**dataframe.select**(*column_name*) selects the column based on the column name 

**dataframe.filter**(dataframe['*column_name*] *logical argument*) filters the dataframe based on column id, and logical criteria. 

**dataframe.groupBy**(*column_name*) groups records in the rdd by 

**dataframe.drop**(*column_name*) drop columns in the dataframe by the column name string - transformation

**dataframe.dropDuplicates**([*column1_name*,*column2_name*, ...]) drop columns in the dataframe by the column name string - transformation

**dataframe.dropna**([*column1_name*,*column2_name*, ...]) drop columns in the dataframe by the column name string - transformation

**dataframe.dtypes**([*column1_name*,*column2_name*, ...]) drop columns in the dataframe by the column name string - transformation

**dataframe.fillna**([*column1_name*,*column2_name*, ...]) drop columns in the dataframe by the column name string - transformation


In [1]:
import getspark
from IPython.display import Image
from pyspark import SparkContext 
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row

In [2]:
sc = SparkContext()
sqlContext = SQLContext(sc)

Create a function that will parse through the data and turn the first column into a LabeledPoint.  Next, read in the text file, parse it, and apply the function

In [3]:
# Load and parse the data 
def parsePoint(line):
    values = [float(x) for x in line.split(',')]
    return LabeledPoint(values[0], values[1:])

1. Read in the data (Transform)
2. Take the first five rows of the data (Action)
3. Map the data by spliting each of the rows by comma since its a CSV (Action)
4. Sample 10000 rows from the data
5. Write the sampled rdd to a csv to use in R

In [14]:
data = sc.textFile(r"C:\Spark\clickinfo.csv") # Read in the data -transform

In [15]:
data.take(5) #Take the first five rows to check it out - action

[u'user_id,clicks,impression,signed_in',
 u'1,0,3,1',
 u'2,0,3,1',
 u'3,0,3,1',
 u'4,0,3,1']

In [16]:
rdd = data.map(lambda line: line.split(",")) #split it up by comma -transformation

In [18]:
rdd.take(5) #Check out the first five rows -action

[[u'user_id', u'clicks', u'impression', u'signed_in'],
 [u'1', u'0', u'3', u'1'],
 [u'2', u'0', u'3', u'1'],
 [u'3', u'0', u'3', u'1'],
 [u'4', u'0', u'3', u'1']]

In [None]:
#rdd2 = rdd.sample(False, 0.6) #Sample 10000 rows from the data -transformation

In [None]:
#rdd2.take(5)

In [19]:
header = rdd.first() #extract header
data = rdd.filter(lambda x:x !=header)

In [20]:
df = data.map(lambda line: Row(clicks = line[0], 
                              gender = line[1], impression=line[2], 
                              signedin=line[3])).toDF()

In [21]:
df.show(5)

+------+------+----------+--------+
|clicks|gender|impression|signedin|
+------+------+----------+--------+
|     1|     0|         3|       1|
|     2|     0|         3|       1|
|     3|     0|         3|       1|
|     4|     0|         3|       1|
|     5|     0|        11|       1|
+------+------+----------+--------+
only showing top 5 rows



In [22]:
df.printSchema()

root
 |-- clicks: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- impression: string (nullable = true)
 |-- signedin: string (nullable = true)



Filter

In [23]:
df[df.impression >= 10].show(5)

+------+------+----------+--------+
|clicks|gender|impression|signedin|
+------+------+----------+--------+
|     5|     0|        11|       1|
|     6|     1|        11|       1|
|    72|     0|        10|       1|
|   141|     0|        11|       0|
|   186|     0|        10|       1|
+------+------+----------+--------+
only showing top 5 rows



Select

In [24]:
df.select('gender','impression').show(5)

+------+----------+
|gender|impression|
+------+----------+
|     0|         3|
|     0|         3|
|     0|         3|
|     0|         3|
|     0|        11|
+------+----------+
only showing top 5 rows



Group By and count

In [25]:
df.groupBy("gender").count().show(5)

+------+-----+
|gender|count|
+------+-----+
|     0|56607|
|     1| 5499|
+------+-----+



In [None]:
from pyspark.mllib.classification import SVMWithSGD, SVMModel, LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.util import MLUtils
from pyspark.ml.param import Param, Params
from pyspark.mllib.linalg import Vectors

In [None]:
temp = df.map(lambda line:LabeledPoint(line[0],[line[1:]]))
temp.take(5)

In [None]:
trainingData, testingData = temp.randomSplit([.8,.2],seed=1234)

In [None]:
# Build the model
svmmodel = SVMWithSGD.train(temp, iterations=100)

In [None]:
temp.map(lambda p: (svmmodel.predict(p.features))).take(5)

In [None]:
predictionAndLabels = temp.map(lambda lp: (float(svmmodel.predict(lp.features)), lp.label))

In [None]:
predictionAndLabels.take(5)

In [None]:
trainErr = prediObserRDDin.filter(lambda (v, p): v != p).count() / float(temp.count())
print("Training Error = " + str(trainErr))

In [None]:
svmmodel.weights

In [None]:
svmmodel.predict([1.0, 11.0, 1.0])

In [None]:
temp.take(5)

In [None]:
parsedData = rdd.map(parsePoint)

In [None]:
logmodel = LogisticRegressionWithLBFGS.train(temp)

In [None]:
labelsAndPreds = data.map(lambda p: (p.label, svmmodel.predict(p.features)))


In [None]:
labelsAndPreds = temp.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))

In [None]:
labelsAndPreds.take(5)

In [None]:
predictionAndLabelslog = parsedData.map(lambda lp: (float(logmodel.predict(lp.features)), lp.label))
predictionAndLabelssvm = parsedData.map(lambda lp: (float(svmmodel.predict(lp.features)), lp.label))

In [None]:
metrics = BinaryClassificationMetrics(predictionAndLabelslog)
print("Logistic - Area under ROC = %s" % metrics.areaUnderROC)

In [None]:
metrics = BinaryClassificationMetrics(predictionAndLabelssvm)
print("SVM - Area under ROC = %s" % metrics.areaUnderROC)


In [None]:
# Save and load model
model.save(sc, "myModelPath")
sameModel = SVMModel.load(sc, "myModelPath")

In [None]:
metrics = BinaryClassificationMetrics(predictionAndLabelsrf)
print("RF - Area under ROC = %s" % metrics.areaUnderROC)

In [None]:
data.take(2)

In [None]:
parsedData = data.map(parsePoint)

In [None]:
parsedData