<a href="https://colab.research.google.com/github/hamza3e/machine-learning-examples/blob/master/Pyspark_example.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.1/spark-2.4.1-bin-hadoop2.7.tgz

In [0]:
!tar xf spark-2.4.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [3]:
  !java -version  

openjdk version "11.0.3" 2019-04-16
OpenJDK Runtime Environment (build 11.0.3+7-Ubuntu-1ubuntu218.04.1)
OpenJDK 64-Bit Server VM (build 11.0.3+7-Ubuntu-1ubuntu218.04.1, mixed mode, sharing)


In [0]:
import os

In [0]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.1-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
train_dataset = spark.read.csv("./sample_data/california_housing_train.csv", header=True, inferSchema=True)
test_dataset  = spark.read.csv("./sample_data/california_housing_test.csv", header=True, inferSchema=True)

In [0]:
from pyspark.ml.regression import LinearRegression

In [10]:
print (type(train_dataset), type(test_dataset))

<class 'pyspark.sql.dataframe.DataFrame'> <class 'pyspark.sql.dataframe.DataFrame'>


In [11]:
print(train_dataset.count(),test_dataset.count())

17000 3000


In [12]:
train_dataset.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [13]:
train_dataset.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|    

In [0]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [0]:
assembler = VectorAssembler(inputCols=['housing_median_age','total_rooms','total_bedrooms','population','households','median_income'],outputCol = 'Attributes')

In [0]:
output = assembler.transform(train_dataset)

train_data = output.select("Attributes","median_house_value")

In [0]:
output = assembler.transform(test_dataset)

test_data = output.select("Attributes","median_house_value")

In [18]:
output.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value',
 'Attributes']

In [0]:
regressor = LinearRegression(featuresCol = 'Attributes', labelCol = 'median_house_value')

In [0]:
regressor = regressor.fit(train_data)


In [21]:
regressor.coefficients

DenseVector([1880.0141, -19.9559, 99.5248, -35.0182, 127.2253, 48033.4593])

In [22]:
regressor.intercept

-47624.111719436914

In [0]:
pred_val = regressor.evaluate(test_data)

In [24]:
pred_val.predictions.show()

+--------------------+------------------+------------------+
|          Attributes|median_house_value|        prediction|
+--------------------+------------------+------------------+
|[27.0,3885.0,661....|          344700.0| 332098.0230497004|
|[43.0,1510.0,310....|          176500.0| 213719.8143106189|
|[27.0,3589.0,507....|          270500.0|271260.04407687474|
|[28.0,67.0,15.0,4...|          330000.0|  299584.195271514|
|[19.0,1241.0,244....|           81700.0|129100.09036393452|
|[37.0,1018.0,213....|           67000.0|105460.59347859942|
|[43.0,1009.0,225....|           67000.0|121990.62891574067|
|[19.0,2310.0,471....|          166900.0|152928.96574902072|
|[15.0,3080.0,617....|          194400.0|182353.82894708295|
|[31.0,2402.0,632....|          164200.0|115313.59293780709|
|[45.0,972.0,249.0...|          125000.0|136396.34838745513|
|[37.0,736.0,166.0...|           58300.0|137659.23453121996|
|[36.0,1089.0,182....|          252600.0| 244608.3787818988|
|[16.0,3936.0,694....|  

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
eval = RegressionEvaluator(predictionCol="prediction", labelCol="median_house_value")

In [27]:
rmse = eval.evaluate(pred_val.predictions, {eval.metricName:"rmse"})
print("%.2f" % rmse )

76216.57


In [28]:
mse = eval.evaluate(pred_val.predictions, {eval.metricName: "mse"})
print("%.2f" % mse)

5808966246.71


In [29]:
mae = eval.evaluate(pred_val.predictions, {eval.metricName: "mae"})
print("%.2f" % mae)

55396.29


In [30]:
r2 = eval.evaluate(pred_val.predictions, {eval.metricName: "r2"})
print("%.2f" %r2)

0.55
