In [2]:
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [3]:
cRDD = sc.textFile("data\carros.csv")

In [4]:
cRDD.count()

399

In [None]:
# Is possible to let this RDD in cache. When we have a large amount of data to process, it will improve the speed of processing.
#cRDD.cache()

In [10]:
# Filtering RDD  to remove the fisrt line, and create a new one. We need to create a new RDD because RDDs are immutable.
newRDD = cRDD.filter(lambda x: "DISPLACEMENT" not in x)

In [11]:
newRDD.take(3)

['18,8,307,130,3504,12,70,chevrolet chevelle malibu',
 '15,8,350,165,3693,11.5,70,buick skylark 320',
 '18,8,318,150,3436,11,70,plymouth satellite']

# Clean Data

In [6]:
mediaHP = sc.broadcast(75.0)

In [34]:
def cleandata(inputStr):
    global mediaHP
    attList = inputStr.split(',')
    
    hp = attList[3]
    if hp == '?':
        hp = mediaHP.value
        
    line = Row(MPG = float(attList[0]), CYLINDERS = float(attList[1]), DISPLACEMENT = float(attList[2]), HP = float(hp), 
               WEIGHT = float(attList[4]), ACCELERATION = float(attList[5]), MODELYEAR = float(attList[6]), NAME = attList[7])
    
    return line        

In [35]:
#Aplly cleandata function and create a new RDD

cleanRDD = newRDD.map(cleandata)
cleanRDD.take(5)

[Row(ACCELERATION=12.0, CYLINDERS=8.0, DISPLACEMENT=307.0, HP=130.0, MODELYEAR=70.0, MPG=18.0, NAME='chevrolet chevelle malibu', WEIGHT=3504.0),
 Row(ACCELERATION=11.5, CYLINDERS=8.0, DISPLACEMENT=350.0, HP=165.0, MODELYEAR=70.0, MPG=15.0, NAME='buick skylark 320', WEIGHT=3693.0),
 Row(ACCELERATION=11.0, CYLINDERS=8.0, DISPLACEMENT=318.0, HP=150.0, MODELYEAR=70.0, MPG=18.0, NAME='plymouth satellite', WEIGHT=3436.0),
 Row(ACCELERATION=12.0, CYLINDERS=8.0, DISPLACEMENT=304.0, HP=150.0, MODELYEAR=70.0, MPG=16.0, NAME='amc rebel sst', WEIGHT=3433.0),
 Row(ACCELERATION=10.5, CYLINDERS=8.0, DISPLACEMENT=302.0, HP=140.0, MODELYEAR=70.0, MPG=17.0, NAME='ford torino', WEIGHT=3449.0)]

# Exploratory Analisys

In [13]:
#create a spark session to use DataFrames
sprsession = SparkSession.builder.master("local").appName("LinearRegression").config("spark.some.config.option", "session").getOrCreate()

In [36]:
dataFrame = sprsession.createDataFrame(cleanRDD)

In [37]:
dataFrame.select("HP").describe().show()

+-------+------------------+
|summary|                HP|
+-------+------------------+
|  count|               398|
|   mean| 104.0251256281407|
| stddev|38.368022630356165|
|    min|              46.0|
|    max|             230.0|
+-------+------------------+



In [29]:
type(dataFrame)

pyspark.sql.dataframe.DataFrame

In [42]:
for c in dataFrame.drop('NAME').columns:
    print(c, dataFrame.stat.corr("MPG", c))

ACCELERATION 0.4202889121016501
CYLINDERS -0.7753962854205548
DISPLACEMENT -0.8042028248058979
HP -0.7747041523498721
MODELYEAR 0.5792671330833091
MPG 1.0
WEIGHT -0.8317409332443347


# Create a LabeledPoint (target, Vector(features))

In [43]:
# We will take just the columns with a relevant correlation with the target variable (CYLINDERS, DISPLACEMENT, HP, WEIGHT)
def createLabeledPoint(line):
    labeledPoint = (line["MPG"], Vectors.dense([line['CYLINDERS'], line['DISPLACEMENT'], line['HP'], line['WEIGHT']]))
    return labeledPoint

In [44]:
#Apply createLabeledPoint function to cleanRDD and create a new one
finalRDD = cleanRDD.map(createLabeledPoint)
df = sprsession.createDataFrame(finalRDD, ['label', 'features'])
df.select('label', 'features').show(10)

+-----+--------------------+
|label|            features|
+-----+--------------------+
| 18.0|[8.0,307.0,130.0,...|
| 15.0|[8.0,350.0,165.0,...|
| 18.0|[8.0,318.0,150.0,...|
| 16.0|[8.0,304.0,150.0,...|
| 17.0|[8.0,302.0,140.0,...|
| 15.0|[8.0,429.0,198.0,...|
| 14.0|[8.0,454.0,220.0,...|
| 14.0|[8.0,440.0,215.0,...|
| 14.0|[8.0,455.0,225.0,...|
| 15.0|[8.0,390.0,190.0,...|
+-----+--------------------+
only showing top 10 rows



In [45]:
finalRDD.take(2)

[(18.0, DenseVector([8.0, 307.0, 130.0, 3504.0])),
 (15.0, DenseVector([8.0, 350.0, 165.0, 3693.0]))]

# Apply Linear Regression

In [46]:
#Split data
trainData, testData = df.randomSplit([0.7, 0.3])

In [47]:
#Build model
linearReg = LinearRegression()
model = linearReg.fit(trainData)

In [49]:
#Coefficients and Intercept
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))

Coefficients: [-0.12798906238146662,-0.004204549739968073,-0.03309084254915513,-0.005446737456384402]
Intercept: 44.54593365018349


In [50]:
#Test model
predictions = model.transform(testData)
predictions.select("prediction", "features").show()

+------------------+--------------------+
|        prediction|            features|
+------------------+--------------------+
|11.778132761992389|[8.0,307.0,200.0,...|
| 16.13723104310256|[8.0,350.0,180.0,...|
|12.485231828629217|[8.0,350.0,160.0,...|
| 9.592336588413751|[8.0,400.0,167.0,...|
|20.722817441538222|[8.0,302.0,129.0,...|
|15.608684753308115|[8.0,307.0,130.0,...|
|12.400136118381312|[8.0,350.0,155.0,...|
|13.311083832945403|[8.0,350.0,165.0,...|
|13.927907724864738|[8.0,350.0,175.0,...|
|15.405501977796305|[8.0,360.0,175.0,...|
|12.562338867471293|[8.0,400.0,150.0,...|
|10.364542053787787|[8.0,400.0,170.0,...|
|15.703088901721394|[8.0,302.0,137.0,...|
|15.014999341769435|[8.0,318.0,150.0,...|
|14.143521348747932|[8.0,318.0,150.0,...|
| 13.66512176761039|[8.0,350.0,165.0,...|
|10.061698201385404|[8.0,455.0,225.0,...|
|16.081509467560103|[8.0,304.0,150.0,...|
|14.699088569299143|[8.0,318.0,150.0,...|
| 15.01867427555431|[8.0,350.0,145.0,...|
+------------------+--------------

In [51]:
#Evaluate model
RegressionEvaluator(predictionCol = "prediction", labelCol="label", metricName="r2").evaluate(predictions)

0.7174335640194515