In [1]:
## Spark Application - execute with spark-submit

## Import the packages
import csv
from pyspark.mllib.regression import LabeledPoint, LassoWithSGD
from StringIO import StringIO
from collections import namedtuple
from pyspark import SparkConf, SparkContext
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import RegressionMetrics


## Module Constants
APP_NAME = "Wine Quality Analysis"
fields   = ('fixed_acidity','volatile_acidity','citric_acid','residual_sugar','chlorides','free_sulfur_dioxide','total_sulfur_dioxide','density','pH','sulphates','alcohol','quality')
#Wine   = namedtuple('Wine', fields)
sc = SparkContext(appName="Classification")

# Closures
def split(line):
    """
    Operator function for splitting a line with csv module
    """
    reader = csv.reader(StringIO(line))
    return reader.next()
def parse(row):
    """
    Parses a row and returns a named tuple.
    """
    row[0]  = float(row[0])
    row[1]  = float(row[1])
    row[2]  = float(row[2])
    row[3]  = float(row[3])
    row[4]  = float(row[4])
    row[5]  = float(row[5])
    row[6]  = float(row[6])
    row[7]  = float(row[8])
    row[8]  = float(row[9])
    row[9]  = float(row[10])
    row[10] = row[11]
    
    return LabeledPoint(row[10], row[0:9])

wines = sc.textFile("C:\\Users\\anirudhbedre\\Desktop\\Wine\\wine_quality_pyspark_regression.csv").map(split).map(parse)
(trainingData, testData) = wines.randomSplit([0.7, 0.3])

# Train the model.

model =LassoWithSGD.train(trainingData, iterations=100, step=0.000469,  intercept=True)


# Evaluate the model on training data
train_valuesAndPreds = trainingData.map(lambda p: (p.label, model.predict(p.features)))
MSE = train_valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / train_valuesAndPreds.count()
print("Training Mean Squared Error = " + str(MSE))

 

# Evaluate the model on test data
test_valuesAndPreds = testData.map(lambda p: (p.label, model.predict(p.features)))
MSE = test_valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / test_valuesAndPreds.count()
print("Test Mean Squared Error = " + str(MSE))




sc.stop()