In [7]:
from pyspark import SparkConf, SparkContext
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
from pyspark.mllib.regression import LassoWithSGD,RidgeRegressionWithSGD
sc = SparkContext(appName="Regression")

In [8]:
dataFile = sc.textFile("C:\Users\Bellamkonda\Desktop\Spark\wine_regression.csv")


In [9]:
dataFile.take(15)

[u'6,7 0.27 0.36 20.7 0.045 45 170 1.001 3 0.45 8.8',
 u'6,6.3 0.3 0.34 1.6 0.049 14 132 0.994 3.3 0.49 9.5',
 u'6,8.1 0.28 0.4 6.9 0.05 30 97 0.9951 3.26 0.44 10.1',
 u'6,7.2 0.23 0.32 8.5 0.058 47 186 0.9956 3.19 0.4 9.9',
 u'6,7.2 0.23 0.32 8.5 0.058 47 186 0.9956 3.19 0.4 9.9',
 u'6,8.1 0.28 0.4 6.9 0.05 30 97 0.9951 3.26 0.44 10.1',
 u'6,6.2 0.32 0.16 7 0.045 30 136 0.9949 3.18 0.47 9.6',
 u'6,7 0.27 0.36 20.7 0.045 45 170 1.001 3 0.45 8.8',
 u'6,6.3 0.3 0.34 1.6 0.049 14 132 0.994 3.3 0.49 9.5',
 u'6,8.1 0.22 0.43 1.5 0.044 28 129 0.9938 3.22 0.45 11',
 u'5,8.1 0.27 0.41 1.45 0.033 11 63 0.9908 2.99 0.56 12',
 u'5,8.6 0.23 0.4 4.2 0.035 17 109 0.9947 3.14 0.53 9.7',
 u'5,7.9 0.18 0.37 1.2 0.04 16 75 0.992 3.18 0.63 10.8',
 u'7,6.6 0.16 0.4 1.5 0.044 48 143 0.9912 3.54 0.52 12.4',
 u'5,8.3 0.42 0.62 19.25 0.04 41 172 1.0002 2.98 0.67 9.7']

In [10]:
#Parsing function and creating a labeled point for dependent and independent variables
def line_to_array(line):
    string_array = line.split(',')
    return LabeledPoint(string_array[0], string_array[1].split(' '))


In [11]:
#Using map function to parse the data
parsedData = dataFile.map(line_to_array)

In [10]:
parsedData.take(5)

[LabeledPoint(6.0, [7.0,0.27,0.36,20.7,0.045,45.0,170.0,1.001,3.0,0.45,8.8]),
 LabeledPoint(6.0, [6.3,0.3,0.34,1.6,0.049,14.0,132.0,0.994,3.3,0.49,9.5]),
 LabeledPoint(6.0, [8.1,0.28,0.4,6.9,0.05,30.0,97.0,0.9951,3.26,0.44,10.1]),
 LabeledPoint(6.0, [7.2,0.23,0.32,8.5,0.058,47.0,186.0,0.9956,3.19,0.4,9.9]),
 LabeledPoint(6.0, [7.2,0.23,0.32,8.5,0.058,47.0,186.0,0.9956,3.19,0.4,9.9])]

In [12]:
#using randomSplit function to split data into trainingRDD and testRDD 
(trainingData, testData) = parsedData.randomSplit([0.7, 0.3])

In [12]:
#printing count of trainingRDD
print trainingData.count()

3455


In [13]:
#printing count of testRDD
print testData.count()

1443


In [21]:
# Build the model LinearRegression with SGD and no regularization
model_linear_none = LinearRegressionWithSGD.train(trainingData, iterations=500, step=0.001)

In [22]:
# Evaluate the model on test data
valuesAndPreds =  testData.map(lambda p: (p.label, model_linear_none.predict(p.features)))
MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))
                                      

Mean Squared Error = 2.45898381738e+52


In [43]:
# Build the model RidgeRegression with SGD and L2 regularization
model_Ridge = RidgeRegressionWithSGD.train(trainingData, iterations=500, step=0.0001)

In [44]:
# Evaluate the model on test data
valuesAndPreds =  testData.map(lambda p: (p.label, model_Ridge.predict(p.features)))
MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))

Mean Squared Error = 3.48632960958


In [47]:
# Build the model LassoWithSGD and L1 regularization
model_Lasso = LinearRegressionWithSGD.train(trainingData, iterations=500, step=0.0001)

In [48]:
# Evaluate the model on test data
valuesAndPreds =  testData.map(lambda p: (p.label, model_Lasso.predict(p.features)))
MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))                               

Mean Squared Error = 3.48631947737
