In [1]:
import os, sys
from pyspark.sql import SparkSession

In [2]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [3]:
spark = SparkSession.builder.master('local[*]').appName('app_1').getOrCreate()

In [8]:
my_df = spark.read.csv('headbrain.csv', header=True)

In [9]:
my_df.printSchema()

root
 |-- Gender: string (nullable = true)
 |-- Age Range: string (nullable = true)
 |-- Head Size(cm^3): string (nullable = true)
 |-- Brain Weight(grams): string (nullable = true)



In [10]:
my_df.show(5)

+------+---------+---------------+-------------------+
|Gender|Age Range|Head Size(cm^3)|Brain Weight(grams)|
+------+---------+---------------+-------------------+
|     1|        1|           4512|               1530|
|     1|        1|           3738|               1297|
|     1|        1|           4261|               1335|
|     1|        1|           3777|               1282|
|     1|        1|           4177|               1590|
+------+---------+---------------+-------------------+
only showing top 5 rows



In [13]:
my_df.select('Gender').show(5)

+------+
|Gender|
+------+
|     1|
|     1|
|     1|
|     1|
|     1|
+------+
only showing top 5 rows



In [14]:
my_df.select(['Gender','Age Range']).show(5)

+------+---------+
|Gender|Age Range|
+------+---------+
|     1|        1|
|     1|        1|
|     1|        1|
|     1|        1|
|     1|        1|
+------+---------+
only showing top 5 rows



In [15]:
new_df = my_df.drop(*['Gender' ,'Age Range'])

In [16]:
new_df.show(5)

+---------------+-------------------+
|Head Size(cm^3)|Brain Weight(grams)|
+---------------+-------------------+
|           4512|               1530|
|           3738|               1297|
|           4261|               1335|
|           3777|               1282|
|           4177|               1590|
+---------------+-------------------+
only showing top 5 rows



In [17]:
new_df.count()  # return number of rows

237

In [18]:
len(new_df.columns)  # return number of cols

2

In [19]:
new_df.select('Head Size(cm^3)', 'Brain Weight(grams)').describe()

DataFrame[summary: string, Head Size(cm^3): string, Brain Weight(grams): string]

In [20]:
new_df.select('Head Size(cm^3)', 'Brain Weight(grams)').describe().show()

+-------+------------------+-------------------+
|summary|   Head Size(cm^3)|Brain Weight(grams)|
+-------+------------------+-------------------+
|  count|               237|                237|
|   mean|3633.9915611814345|  1282.873417721519|
| stddev| 365.2614224198132| 120.34044578645734|
|    min|              2720|               1012|
|    max|              4747|                955|
+-------+------------------+-------------------+



## Get count of missing values

In [21]:
import pyspark.sql.functions as fun

In [22]:
# Aggregate Functions : count, avg, sum, min, max
my_df.agg(*[fun.count(fun.when(fun.isnull(col), col)).alias(col) for col in my_df.columns])

DataFrame[Gender: bigint, Age Range: bigint, Head Size(cm^3): bigint, Brain Weight(grams): bigint]

In [23]:
my_df.agg(*[fun.count(fun.when(fun.isnull(col), col)).alias(col) for col in my_df.columns]).show()

+------+---------+---------------+-------------------+
|Gender|Age Range|Head Size(cm^3)|Brain Weight(grams)|
+------+---------+---------------+-------------------+
|     0|        0|              0|                  0|
+------+---------+---------------+-------------------+



In [24]:
my_df.groupBy('Gender')

<pyspark.sql.group.GroupedData at 0x269e14ae520>

In [25]:
my_df.groupBy('Gender').count().show()

+------+-----+
|Gender|count|
+------+-----+
|     1|  134|
|     2|  103|
+------+-----+



In [26]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

In [28]:
genderLabel = StringIndexer(inputCol='Gender', outputCol='GenderLabelEncode')

In [29]:
genderLabel

StringIndexer_8d08e4c41322

In [30]:
my_df = genderLabel.fit(my_df).transform(my_df)

In [31]:
my_df.show(5)

+------+---------+---------------+-------------------+-----------------+
|Gender|Age Range|Head Size(cm^3)|Brain Weight(grams)|GenderLabelEncode|
+------+---------+---------------+-------------------+-----------------+
|     1|        1|           4512|               1530|              0.0|
|     1|        1|           3738|               1297|              0.0|
|     1|        1|           4261|               1335|              0.0|
|     1|        1|           3777|               1282|              0.0|
|     1|        1|           4177|               1590|              0.0|
+------+---------+---------------+-------------------+-----------------+
only showing top 5 rows



In [32]:
my_df.groupBy('GenderLabelEncode').count().show()

+-----------------+-----+
|GenderLabelEncode|count|
+-----------------+-----+
|              0.0|  134|
|              1.0|  103|
+-----------------+-----+



In [33]:
genderOneHot = OneHotEncoder(inputCol='GenderLabelEncode', outputCol='GenderOneHot')

In [34]:
my_df = genderOneHot.fit(my_df).transform(my_df)

In [37]:
my_df.show(5)

+------+---------+---------------+-------------------+-----------------+-------------+
|Gender|Age Range|Head Size(cm^3)|Brain Weight(grams)|GenderLabelEncode| GenderOneHot|
+------+---------+---------------+-------------------+-----------------+-------------+
|     1|        1|           4512|               1530|              0.0|(1,[0],[1.0])|
|     1|        1|           3738|               1297|              0.0|(1,[0],[1.0])|
|     1|        1|           4261|               1335|              0.0|(1,[0],[1.0])|
|     1|        1|           3777|               1282|              0.0|(1,[0],[1.0])|
|     1|        1|           4177|               1590|              0.0|(1,[0],[1.0])|
+------+---------+---------------+-------------------+-----------------+-------------+
only showing top 5 rows



In [41]:
my_df.show(2)

+------+---------+---------------+-------------------+-----------------+-------------+
|Gender|Age Range|Head Size(cm^3)|Brain Weight(grams)|GenderLabelEncode| GenderOneHot|
+------+---------+---------------+-------------------+-----------------+-------------+
|     1|        1|           4512|               1530|              0.0|(1,[0],[1.0])|
|     1|        1|           3738|               1297|              0.0|(1,[0],[1.0])|
+------+---------+---------------+-------------------+-----------------+-------------+
only showing top 2 rows



In [47]:
import pyspark.sql.types as tp

In [48]:
schema = tp.StructType([
    tp.StructField(name='Gender', dataType=tp.IntegerType()),
    tp.StructField(name='Age Range', dataType=tp.IntegerType()),
    tp.StructField(name='Head Size(cm^3)', dataType=tp.IntegerType()),
    tp.StructField(name='Brain Weight(grams)', dataType=tp.IntegerType())
])

In [49]:
my_df = spark.read.csv('headbrain.csv', schema=schema, header=True)

In [50]:
my_df.printSchema()

root
 |-- Gender: integer (nullable = true)
 |-- Age Range: integer (nullable = true)
 |-- Head Size(cm^3): integer (nullable = true)
 |-- Brain Weight(grams): integer (nullable = true)



In [42]:
# Vector Assembler

In [43]:
from pyspark.ml.feature import VectorAssembler

In [51]:
assemble = VectorAssembler(inputCols=['Head Size(cm^3)', 
                                      'Brain Weight(grams)'],
                           outputCol='vector')

In [52]:
new_df_vec = assemble.transform(my_df)

In [53]:
new_df_vec

DataFrame[Gender: int, Age Range: int, Head Size(cm^3): int, Brain Weight(grams): int, vector: vector]

In [55]:
new_df_vec.show(5)

+------+---------+---------------+-------------------+---------------+
|Gender|Age Range|Head Size(cm^3)|Brain Weight(grams)|         vector|
+------+---------+---------------+-------------------+---------------+
|     1|        1|           4512|               1530|[4512.0,1530.0]|
|     1|        1|           3738|               1297|[3738.0,1297.0]|
|     1|        1|           4261|               1335|[4261.0,1335.0]|
|     1|        1|           3777|               1282|[3777.0,1282.0]|
|     1|        1|           4177|               1590|[4177.0,1590.0]|
+------+---------+---------------+-------------------+---------------+
only showing top 5 rows



In [59]:
new_df_vec.select('vector').show(5)

+---------------+
|         vector|
+---------------+
|[4512.0,1530.0]|
|[3738.0,1297.0]|
|[4261.0,1335.0]|
|[3777.0,1282.0]|
|[4177.0,1590.0]|
+---------------+
only showing top 5 rows



In [60]:
sample_df = spark.createDataFrame([
    (101, 'John', 'Male'),
    (102, 'Mac', 'Male'),
    (103, 'Sam', 'Male'),
    (104, 'Nick', 'Male'),
    (105, 'Tom', 'Male'),
    (106, 'Jenny', 'Female'),
    (107, 'Julie', 'Female'),
    (108, 'Linda', 'Female'),
],['id', 'name', 'gender'])

In [61]:
sample_df.show()

+---+-----+------+
| id| name|gender|
+---+-----+------+
|101| John|  Male|
|102|  Mac|  Male|
|103|  Sam|  Male|
|104| Nick|  Male|
|105|  Tom|  Male|
|106|Jenny|Female|
|107|Julie|Female|
|108|Linda|Female|
+---+-----+------+



In [62]:
from pyspark.ml import Pipeline

In [64]:
stage_1 = StringIndexer(inputCol='gender', outputCol='gender_label')
stage_2 = OneHotEncoder(inputCol='gender_label', outputCol='gender_onehot')

pipeline = Pipeline(stages=[stage_1, stage_2])
pipelineModel = pipeline.fit(sample_df)
new_sample_df = pipelineModel.transform(sample_df)

In [65]:
new_sample_df.show()

+---+-----+------+------------+-------------+
| id| name|gender|gender_label|gender_onehot|
+---+-----+------+------------+-------------+
|101| John|  Male|         0.0|(1,[0],[1.0])|
|102|  Mac|  Male|         0.0|(1,[0],[1.0])|
|103|  Sam|  Male|         0.0|(1,[0],[1.0])|
|104| Nick|  Male|         0.0|(1,[0],[1.0])|
|105|  Tom|  Male|         0.0|(1,[0],[1.0])|
|106|Jenny|Female|         1.0|    (1,[],[])|
|107|Julie|Female|         1.0|    (1,[],[])|
|108|Linda|Female|         1.0|    (1,[],[])|
+---+-----+------+------------+-------------+



In [66]:
from pyspark.ml.classification import LogisticRegression

In [67]:
df = spark.createDataFrame([
    (5.0, 55, 'f'),
    (5.8, 75, 'm'),
    (4.9, 48, 'f'),
    (5.9, 85, 'm'),
    (6.0, 95, 'm'),
    (5.5, 65, 'm'),
    (5.2, 50, 'f'),
    (5.3, 60, 'f'),
    (5.2, 65, 'f'),
    (5.7, 75, 'm'),
], ['feature_1', 'feature_2', 'label'])

In [68]:
stage_1 = StringIndexer(inputCol='label', outputCol='gender_label')
stage_2 = VectorAssembler(inputCols=['feature_1', 'feature_2'],
                         outputCol='features')
stage_3 = LogisticRegression(featuresCol='features', labelCol='gender_label')

pipeline = Pipeline(stages=[stage_1, stage_2, stage_3])
pipelineModel = pipeline.fit(df)
new_sample_df = pipelineModel.transform(df)

In [69]:
new_sample_df.show()

+---------+---------+-----+------------+----------+--------------------+--------------------+----------+
|feature_1|feature_2|label|gender_label|  features|       rawPrediction|         probability|prediction|
+---------+---------+-----+------------+----------+--------------------+--------------------+----------+
|      5.0|       55|    f|         0.0|[5.0,55.0]|[60.5499013118219...|           [1.0,0.0]|       0.0|
|      5.8|       75|    m|         1.0|[5.8,75.0]|[-65.043764146401...|[5.64744680681714...|       1.0|
|      4.9|       48|    f|         0.0|[4.9,48.0]|[80.2533863258387...|           [1.0,0.0]|       0.0|
|      5.9|       85|    m|         1.0|[5.9,85.0]|[-87.416767048244...|[1.08487685790069...|       1.0|
|      6.0|       95|    m|         1.0|[6.0,95.0]|[-109.78976995008...|[2.08405291288070...|       1.0|
|      5.5|       65|    m|         1.0|[5.5,65.0]|[-15.721541359713...|[1.48669292918052...|       1.0|
|      5.2|       50|    f|         0.0|[5.2,50.0]|[38.