In [1]:
import os, sys
from pyspark.sql import SparkSession

In [2]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [3]:
spark = SparkSession.builder.master('local[*]').appName("ml_example").getOrCreate()

In [4]:
df = spark.read.csv('headbrain.csv', header=True)

In [5]:
df.printSchema()

root
 |-- Gender: string (nullable = true)
 |-- Age Range: string (nullable = true)
 |-- Head Size(cm^3): string (nullable = true)
 |-- Brain Weight(grams): string (nullable = true)



In [6]:
df.show(5)

+------+---------+---------------+-------------------+
|Gender|Age Range|Head Size(cm^3)|Brain Weight(grams)|
+------+---------+---------------+-------------------+
|     1|        1|           4512|               1530|
|     1|        1|           3738|               1297|
|     1|        1|           4261|               1335|
|     1|        1|           3777|               1282|
|     1|        1|           4177|               1590|
+------+---------+---------------+-------------------+
only showing top 5 rows



In [7]:
new_df = df.drop(*['Gender', 'Age Range'])

In [8]:
new_df.show(5)

+---------------+-------------------+
|Head Size(cm^3)|Brain Weight(grams)|
+---------------+-------------------+
|           4512|               1530|
|           3738|               1297|
|           4261|               1335|
|           3777|               1282|
|           4177|               1590|
+---------------+-------------------+
only showing top 5 rows



In [9]:
new_df.count(), len(new_df.columns)

(237, 2)

In [10]:
new_df.select('Head Size(cm^3)', 'Brain Weight(grams)').describe().show()

+-------+------------------+-------------------+
|summary|   Head Size(cm^3)|Brain Weight(grams)|
+-------+------------------+-------------------+
|  count|               237|                237|
|   mean|3633.9915611814345|  1282.873417721519|
| stddev| 365.2614224198132| 120.34044578645734|
|    min|              2720|               1012|
|    max|              4747|                955|
+-------+------------------+-------------------+



In [11]:
# Count Missing Values from DataFrame

In [12]:
import pyspark.sql.functions as fun

In [13]:
df.agg(*[fun.count(fun.when(fun.isnull(col), col)).alias(col) for col in df.columns]).show()

+------+---------+---------------+-------------------+
|Gender|Age Range|Head Size(cm^3)|Brain Weight(grams)|
+------+---------+---------------+-------------------+
|     0|        0|              0|                  0|
+------+---------+---------------+-------------------+



In [14]:
df.groupBy('Gender').count().show()

+------+-----+
|Gender|count|
+------+-----+
|     1|  134|
|     2|  103|
+------+-----+



In [15]:
df.groupBy('Age Range').count().show()

+---------+-----+
|Age Range|count|
+---------+-----+
|        1|  110|
|        2|  127|
+---------+-----+



In [16]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

In [17]:
genderLabel = StringIndexer(inputCol='Gender',  outputCol = 'GenderLabelEncode')

In [18]:
df = genderLabel.fit(df).transform(df)

In [19]:
df.show(5)

+------+---------+---------------+-------------------+-----------------+
|Gender|Age Range|Head Size(cm^3)|Brain Weight(grams)|GenderLabelEncode|
+------+---------+---------------+-------------------+-----------------+
|     1|        1|           4512|               1530|              0.0|
|     1|        1|           3738|               1297|              0.0|
|     1|        1|           4261|               1335|              0.0|
|     1|        1|           3777|               1282|              0.0|
|     1|        1|           4177|               1590|              0.0|
+------+---------+---------------+-------------------+-----------------+
only showing top 5 rows



In [20]:
genderOneHot = OneHotEncoder(inputCol='GenderLabelEncode', outputCol='GenderOneHot')

In [21]:
df = genderOneHot.fit(df).transform(df)

In [22]:
df.show(5)

+------+---------+---------------+-------------------+-----------------+-------------+
|Gender|Age Range|Head Size(cm^3)|Brain Weight(grams)|GenderLabelEncode| GenderOneHot|
+------+---------+---------------+-------------------+-----------------+-------------+
|     1|        1|           4512|               1530|              0.0|(1,[0],[1.0])|
|     1|        1|           3738|               1297|              0.0|(1,[0],[1.0])|
|     1|        1|           4261|               1335|              0.0|(1,[0],[1.0])|
|     1|        1|           3777|               1282|              0.0|(1,[0],[1.0])|
|     1|        1|           4177|               1590|              0.0|(1,[0],[1.0])|
+------+---------+---------------+-------------------+-----------------+-------------+
only showing top 5 rows



In [23]:
from pyspark.sql.functions import col

df = df.withColumn('Head Size(cm^3)', col('Head Size(cm^3)').cast("Integer"))

In [24]:
df = df.withColumn('Brain Weight(grams)', col('Brain Weight(grams)').cast("Integer"))

In [25]:
from pyspark.ml.feature import VectorAssembler

In [26]:
assemble = VectorAssembler(inputCols=['Head Size(cm^3)','Brain Weight(grams)'],
                          outputCol='vector')

In [27]:
df_vec = assemble.transform(df)

In [28]:
df_vec.show(5)

+------+---------+---------------+-------------------+-----------------+-------------+---------------+
|Gender|Age Range|Head Size(cm^3)|Brain Weight(grams)|GenderLabelEncode| GenderOneHot|         vector|
+------+---------+---------------+-------------------+-----------------+-------------+---------------+
|     1|        1|           4512|               1530|              0.0|(1,[0],[1.0])|[4512.0,1530.0]|
|     1|        1|           3738|               1297|              0.0|(1,[0],[1.0])|[3738.0,1297.0]|
|     1|        1|           4261|               1335|              0.0|(1,[0],[1.0])|[4261.0,1335.0]|
|     1|        1|           3777|               1282|              0.0|(1,[0],[1.0])|[3777.0,1282.0]|
|     1|        1|           4177|               1590|              0.0|(1,[0],[1.0])|[4177.0,1590.0]|
+------+---------+---------------+-------------------+-----------------+-------------+---------------+
only showing top 5 rows



In [29]:
df = spark.read.csv('headbrain.csv', header=True)

In [30]:
# df = df.withColumn('Head Size(cm^3)', col('Head Size(cm^3)').cast("Integer"))
# df = df.withColumn('Brain Weight(grams)', col('Brain Weight(grams)').cast("Integer"))

In [31]:
# converting all columns into integer
df = df.select(*(col(c).cast('Integer').alias(c) for c in df.columns))

In [32]:
df.show(5)

+------+---------+---------------+-------------------+
|Gender|Age Range|Head Size(cm^3)|Brain Weight(grams)|
+------+---------+---------------+-------------------+
|     1|        1|           4512|               1530|
|     1|        1|           3738|               1297|
|     1|        1|           4261|               1335|
|     1|        1|           3777|               1282|
|     1|        1|           4177|               1590|
+------+---------+---------------+-------------------+
only showing top 5 rows



In [33]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

In [34]:
# stage_1 = StringIndexer(inputCol='Gender',  outputCol = 'GenderLabelEncode')
# stage_2 = OneHotEncoder(inputCol='GenderLabelEncode', outputCol='GenderOneHot')
# stage_3 = VectorAssembler(inputCols=['Head Size(cm^3)','Brain Weight(grams)'],
#                           outputCol='vector')
# stage_4 = LinearRegression(featuresCol='Head Size(cm^3)', labelCol='Brain Weight(grams)')
# pipeline = Pipeline(stages=[stage_1, stage_2, stage_3, stage_4])
# pipelineModel = pipeline.fit(df)
# new_pred_df = pipelineModel.transform(df)

In [35]:
features = df.drop(*['Gender', 'Age Range'])

In [36]:
features.show(5)

+---------------+-------------------+
|Head Size(cm^3)|Brain Weight(grams)|
+---------------+-------------------+
|           4512|               1530|
|           3738|               1297|
|           4261|               1335|
|           3777|               1282|
|           4177|               1590|
+---------------+-------------------+
only showing top 5 rows



In [37]:
X = features.drop('Brain Weight(grams)')

In [38]:
assembler = VectorAssembler(inputCols=X.columns, outputCol='features')

In [39]:
output = assembler.transform(features).select('features', 'Brain Weight(grams)')

In [40]:
output.show(5)

+--------+-------------------+
|features|Brain Weight(grams)|
+--------+-------------------+
|[4512.0]|               1530|
|[3738.0]|               1297|
|[4261.0]|               1335|
|[3777.0]|               1282|
|[4177.0]|               1590|
+--------+-------------------+
only showing top 5 rows



In [41]:
regression = LinearRegression(featuresCol='features', labelCol='Brain Weight(grams)')

In [42]:
regression_model = regression.fit(output)

In [43]:
regression_model.coefficients

DenseVector([0.2634])

In [44]:
regression_model.intercept

325.5734210494322

In [45]:
regression_model.summary.rootMeanSquaredError

72.12062137837093

In [46]:
regression_model.summary.r2

0.6393117199570006

In [58]:
regression_model.save('reg_model.model')

In [59]:
from pyspark.ml.regression import LinearRegressionModel

In [60]:
reg_model_loaded = LinearRegressionModel.load('reg_model.model')