MLlib is a Spark subproject providing machine learning algorithm. 

In [37]:
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel

In [38]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [34]:
 df = sqlContext.read.format('com.databricks.spark.csv').option("header", 'true').load("s3://aws-logs-161653025481-us-east-1/hour.csv")

In [35]:
df.count()

17379

In [36]:
df = df.drop("instant").drop("dteday").drop("casual").drop("registered")

In [37]:
from pyspark.sql.functions import col  # for indicating a column using a string in the line below
df = df.select([col(c).cast("double").alias(c) for c in df.columns])
df.printSchema()

root
 |-- season: double (nullable = true)
 |-- yr: double (nullable = true)
 |-- mnth: double (nullable = true)
 |-- hr: double (nullable = true)
 |-- holiday: double (nullable = true)
 |-- weekday: double (nullable = true)
 |-- workingday: double (nullable = true)
 |-- weathersit: double (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- hum: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- cnt: double (nullable = true)



In [44]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer
featuresCols = df.columns
featuresCols.remove('cnt')
# This concatenates all feature columns into a single feature vector in a new column "rawFeatures".
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="features")


In [45]:
vectorAssembler.transform(df)

DataFrame[season: double, yr: double, mnth: double, hr: double, holiday: double, weekday: double, workingday: double, weathersit: double, temp: double, atemp: double, hum: double, windspeed: double, cnt: double, features: vector]

In [46]:
train=vectorAssembler.transform(df)

In [47]:
train

DataFrame[season: double, yr: double, mnth: double, hr: double, holiday: double, weekday: double, workingday: double, weathersit: double, temp: double, atemp: double, hum: double, windspeed: double, cnt: double, features: vector]

In [57]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=10, regParam=0, elasticNetParam=0,labelCol="cnt")

In [58]:
lrModel = lr.fit(train)

In [59]:
lrModel.coefficients

[19.8993375636,81.087155699,-0.00864823316966,7.67059662665,-21.8792162012,1.8783541328,3.9392253799,-3.43209756197,78.1497797094,233.157087423,-198.184680754,41.5652146588]

In [65]:
trainingSummary = lrModel.summary

In [66]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

numIterations: 1
objectiveHistory: [0.0]
+--------------------+
|           residuals|
+--------------------+
|   88.67629408320164|
|  108.13083397203091|
|    92.4602373453779|
|  50.773423358032304|
|  31.102826731379295|
|  30.204744182181336|
|    31.7778508387659|
|  42.082002671598175|
|    7.42044022476729|
| -12.649611468935362|
| -24.139621606093257|
|   12.54853097865248|
| -0.9324189074546325|
| -15.890124645524807|
| -10.941399573761672|
|-0.23841624688053287|
|   -9.89279551284092|
|  -48.05105376409513|
|  -66.04868874225578|
|  -71.71928536890881|
+--------------------+
only showing top 20 rows

RMSE: 141.796667
r2: 0.388858


In [34]:
# imports
import pandas as pd
data = pd.read_csv('/mnt/tmp/flights/2008.csv', index_col=0)

In [35]:
data2=data[['Distance','DepDelay','ArrDelay','Month','DayOfWeek']]
data2=data2.dropna()
data2['y']=data2.DepDelay-data2.ArrDelay

In [None]:
df=sqlContext.createDataFrame(data2)

In [None]:
from pyspark.sql.functions import col  # for indicating a column using a string in the line below
df = df.select([col(c).cast("double").alias(c) for c in df.columns])
df.printSchema()

In [None]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer
featuresCols = df.columns
featuresCols.remove('cnt')
# This concatenates all feature columns into a single feature vector in a new column "rawFeatures".
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="features")