In this second set of exercices you will have to develop two Machine Learning procedures. In both cases is mandatory to use Apache Spark 2.x and if you need any necessary library to manage data or to generate features you must use Apache MLlib (DataFrame version). Check the following aspects:

**Problem 1:**

Using the dataset, build a Machine Learning procedure to predict the price of houses having neighbourhood variables. The Boston House Price Dataset involves the prediction of a house price in thousands of dollars given details of the house and its neighborhood. You can download the data from here: https://raw.githubusercontent.com/jbrownlee/Datasets/master/housing.data. More info here: https://www.cs.toronto.edu/~delve/data/boston/bostonDetail.html.

* Make sure that your dataset is technically correct
* Check the consistency of your dataset
* In this exercise is not mandatory to use Pipelines
* Split your data into two sets: 80% of the data for training and 20% of the data for testing
* Provide convenient measures to check how the model is behaving

**Problem 2:**

Using the dataset, build a Machine Learning procedure to classify if the return of a SONAR signal is a Rock or a Mine. You have all the data available at: https://archive.ics.uci.edu/ml/datasets/Connectionist+Bench+%28Sonar%2C+Mines+vs.+Rocks%29. 
Make sure that you use `sonar.all-data` dataset. Check the following aspects:

* Make sure that your dataset is technically correct
* Check the consistency of your dataset
* In this exercise is mandatory to use Pipelines
* Split your data into two sets: 80% of the data for training and 20% of the data for testing
* Check that the labels in both sets are equaly distributed (hint: this is called stratified sampling)
* Provide convenient measures to check how the model is behaving

There are 14 attributes in each case of the dataset. They are:

    CRIM - per capita crime rate by town
    
    ZN - proportion of residential land zoned for lots over 25,000 sq.ft.
    
    INDUS - proportion of non-retail business acres per town.
    
    CHAS - Charles River dummy variable (1 if tract bounds river; 0 otherwise)
    
    NOX - nitric oxides concentration (parts per 10 million)
    
    RM - average number of rooms per dwelling
    
    AGE - proportion of owner-occupied units built prior to 1940
    
    DIS - weighted distances to five Boston employment centres
    
    RAD - index of accessibility to radial highways
    
    TAX - full-value property-tax rate per $10,000
    
    PTRATIO - pupil-teacher ratio by town
    
    B - 1000(Bk - 0.63)^2 where Bk is the proportion of blacks by town
    
    LSTAT - % lower status of the population
    
    MEDV - Median value of owner-occupied homes in $1000's
    

In [1]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import pyspark 

sc = pyspark.SparkContext('local[*]')

In [2]:
sc.textFile("housing.data").take(5)

[' 0.00632  18.00   2.310  0  0.5380  6.5750  65.20  4.0900   1  296.0  15.30 396.90   4.98  24.00',
 ' 0.02731   0.00   7.070  0  0.4690  6.4210  78.90  4.9671   2  242.0  17.80 396.90   9.14  21.60',
 ' 0.02729   0.00   7.070  0  0.4690  7.1850  61.10  4.9671   2  242.0  17.80 392.83   4.03  34.70',
 ' 0.03237   0.00   2.180  0  0.4580  6.9980  45.80  6.0622   3  222.0  18.70 394.63   2.94  33.40',
 ' 0.06905   0.00   2.180  0  0.4580  7.1470  54.20  6.0622   3  222.0  18.70 396.90   5.33  36.20']

In [8]:
def MakeDataConsistent(line):
    for index in range(len(line)):
        if(index != 3 and index != 8):
            line[index] = float(line[index])
        else:
            line[index] = int(line[index])
        if(index == 9):
            line[index] = 10000 * line[index]
        elif(index == 13):
            line[index] = 1000 * line[index]
    return line
        

housing = sc.textFile("housing.data").map(lambda x: x.split()).map(lambda x : MakeDataConsistent(x))
print(type(housing))
housing.first()

<class 'pyspark.rdd.PipelinedRDD'>


[0.00632,
 18.0,
 2.31,
 0,
 0.538,
 6.575,
 65.2,
 4.09,
 1,
 2960000.0,
 15.3,
 396.9,
 4.98,
 24000.0]

In [44]:
spark = SparkSession.builder.getOrCreate()

schema = StructType([
    StructField('CRIM', FloatType()),
    StructField('ZN', FloatType()),
    StructField('INDUS', FloatType()),
    StructField('CHAS', IntegerType()),
    StructField('NOX', FloatType()),
    StructField('RM', FloatType()),
    StructField('AGE', FloatType()),
    StructField('DIS', FloatType()),
    StructField('RAD', IntegerType()),
    StructField('TAX', FloatType()),
    StructField('PTRATIO', FloatType()),
    StructField('B', FloatType()),
    StructField('LSTAT', FloatType()),
    StructField('MEDV', FloatType())
    ])

df = spark.createDataFrame(housing, schema)

df.show()

+-------+----+-----+----+-----+-----+-----+------+---+---------+-------+------+-----+-------+
|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM|  AGE|   DIS|RAD|      TAX|PTRATIO|     B|LSTAT|   MEDV|
+-------+----+-----+----+-----+-----+-----+------+---+---------+-------+------+-----+-------+
|0.00632|18.0| 2.31|   0|0.538|6.575| 65.2|  4.09|  1|2960000.0|   15.3| 396.9| 4.98|24000.0|
|0.02731| 0.0| 7.07|   0|0.469|6.421| 78.9|4.9671|  2|2420000.0|   17.8| 396.9| 9.14|21600.0|
|0.02729| 0.0| 7.07|   0|0.469|7.185| 61.1|4.9671|  2|2420000.0|   17.8|392.83| 4.03|34700.0|
|0.03237| 0.0| 2.18|   0|0.458|6.998| 45.8|6.0622|  3|2220000.0|   18.7|394.63| 2.94|33400.0|
|0.06905| 0.0| 2.18|   0|0.458|7.147| 54.2|6.0622|  3|2220000.0|   18.7| 396.9| 5.33|36200.0|
|0.02985| 0.0| 2.18|   0|0.458| 6.43| 58.7|6.0622|  3|2220000.0|   18.7|394.12| 5.21|28700.0|
|0.08829|12.5| 7.87|   0|0.524|6.012| 66.6|5.5605|  5|3110000.0|   15.2| 395.6|12.43|22900.0|
|0.14455|12.5| 7.87|   0|0.524|6.172| 96.1|5.9505|  5|311000

In [45]:
to_train, to_test = df.randomSplit([0.8, 0.2], seed=12345)
pred_vars = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT']

In [50]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(
    inputCols = pred_vars, 
    outputCol = 'neighbourhood_variables')
train_df = vectorAssembler.transform(to_train)

train_df = train_df.withColumn("price", to_train["MEDV"])

train_df = train_df.select(['neighbourhood_variables', 'price'])
train_df.show(3)

train_df.printSchema()

+-----------------------+-------+
|neighbourhood_variables|  price|
+-----------------------+-------+
|   [0.00632000016048...|24000.0|
|   [0.01310999970883...|35400.0|
|   [0.01360000018030...|18900.0|
+-----------------------+-------+
only showing top 3 rows

root
 |-- neighbourhood_variables: vector (nullable = true)
 |-- price: float (nullable = true)



Row(neighbourhood_variables=DenseVector([0.0063, 18.0, 2.31, 0.0, 0.538, 6.575, 65.2, 4.09, 1.0, 2960000.0, 15.3, 396.9, 4.98]), price=24000.0)

In [47]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'neighbourhood_variables', labelCol='price', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Py4JJavaError: An error occurred while calling o1185.fit.
: java.lang.IllegalArgumentException
	at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
	at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
	at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
	at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:449)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:432)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:432)
	at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
	at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
	at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
	at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
	at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:262)
	at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:261)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:261)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2073)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2770)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2769)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:2769)
	at org.apache.spark.ml.regression.LinearRegressionSummary.numInstances$lzycompute(LinearRegression.scala:921)
	at org.apache.spark.ml.regression.LinearRegressionSummary.numInstances(LinearRegression.scala:921)
	at org.apache.spark.ml.regression.LinearRegressionSummary.<init>(LinearRegression.scala:909)
	at org.apache.spark.ml.regression.LinearRegressionTrainingSummary.<init>(LinearRegression.scala:798)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:346)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:174)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:82)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)


In [24]:
predictions = lrModel.transform(train_df.select(['neighbourhood_variables']))

predictions.show(5)

NameError: name 'lrModel' is not defined