In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Base Functions').getOrCreate()
spark

In [2]:
df = spark.read.csv('../Datasets/dataset.csv', inferSchema=True, header=True)

In [3]:
df.show(5)

+---+---------+----+-----+-------+---+----+-----+
|_c0|StepCount|Kcal|Miles|Weather|Day|Walk|Steps|
+---+---------+----+-----+-------+---+----+-----+
|  1|     2615|   8|  1.4|  shine|  F|   0|2.615|
|  2|     3323|  12|  1.8|  shine|  S|   0|3.323|
|  3|     2721|  13|  1.4|  shine|  U|   0|2.721|
|  4|     2454|  12|  1.3|  shine|  M|   0|2.454|
|  5|     5528| 152|  3.1|   cold|  T|   1|5.528|
+---+---------+----+-----+-------+---+----+-----+
only showing top 5 rows



Counting number of rows

In [4]:
df.count()

223

show first few rows of table

In [5]:
n = 5
df.show(n)

+---+---------+----+-----+-------+---+----+-----+
|_c0|StepCount|Kcal|Miles|Weather|Day|Walk|Steps|
+---+---------+----+-----+-------+---+----+-----+
|  1|     2615|   8|  1.4|  shine|  F|   0|2.615|
|  2|     3323|  12|  1.8|  shine|  S|   0|3.323|
|  3|     2721|  13|  1.4|  shine|  U|   0|2.721|
|  4|     2454|  12|  1.3|  shine|  M|   0|2.454|
|  5|     5528| 152|  3.1|   cold|  T|   1|5.528|
+---+---------+----+-----+-------+---+----+-----+
only showing top 5 rows



select first few rows of table

In [6]:
df.select('Kcal').show(3)

+----+
|Kcal|
+----+
|   8|
|  12|
|  13|
+----+
only showing top 3 rows



In [7]:
df.select(['Kcal','Miles']).show(3)

+----+-----+
|Kcal|Miles|
+----+-----+
|   8|  1.4|
|  12|  1.8|
|  13|  1.4|
+----+-----+
only showing top 3 rows



PySpark to Pandas dataframe

In [8]:
pd_df = df.toPandas()

Columns data types

In [9]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- StepCount: integer (nullable = true)
 |-- Kcal: integer (nullable = true)
 |-- Miles: double (nullable = true)
 |-- Weather: string (nullable = true)
 |-- Day: string (nullable = true)
 |-- Walk: integer (nullable = true)
 |-- Steps: double (nullable = true)



In [10]:
df.dtypes

[('_c0', 'int'),
 ('StepCount', 'int'),
 ('Kcal', 'int'),
 ('Miles', 'double'),
 ('Weather', 'string'),
 ('Day', 'string'),
 ('Walk', 'int'),
 ('Steps', 'double')]

Change column data type

In [11]:
from pyspark.sql.types import FloatType, StringType

df = df.withColumn('Kcal', df['Kcal'].cast(FloatType()))
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- StepCount: integer (nullable = true)
 |-- Kcal: float (nullable = true)
 |-- Miles: double (nullable = true)
 |-- Weather: string (nullable = true)
 |-- Day: string (nullable = true)
 |-- Walk: integer (nullable = true)
 |-- Steps: double (nullable = true)



Rename column

In [12]:
df.withColumnRenamed('Kcal','Calories')

DataFrame[_c0: int, StepCount: int, Calories: float, Miles: double, Weather: string, Day: string, Walk: int, Steps: double]

In [13]:
name_pairs = [('Calories', 'Kcal'),
              ('Miles', 'Distance (Miles)')]

for old_name, new_name in name_pairs:
    df = df.withColumnRenamed(old_name, new_name)

df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- StepCount: integer (nullable = true)
 |-- Kcal: float (nullable = true)
 |-- Distance (Miles): double (nullable = true)
 |-- Weather: string (nullable = true)
 |-- Day: string (nullable = true)
 |-- Walk: integer (nullable = true)
 |-- Steps: double (nullable = true)



In [14]:
df = df.withColumnRenamed('Distance (Miles)', 'Miles')

Summary statistics

In [15]:
df.select(['Kcal','Miles','Steps']).describe().show()

+-------+------------------+------------------+------------------+
|summary|              Kcal|             Miles|             Steps|
+-------+------------------+------------------+------------------+
|  count|               223|               223|               223|
|   mean| 136.1390134529148|3.2152466367713015|  5.83180269058296|
| stddev|119.91995623551091|1.6392540621690515|2.8840272981352184|
|    min|               0.0|               0.0|             0.674|
|    max|             791.0|              10.2|            18.194|
+-------+------------------+------------------+------------------+



Null Values

In [16]:
df = df.na.drop()

In [17]:
df = df.na.drop(how='all') # drop rows where all values are null

In [18]:
df = df.na.drop(thresh=2) # at least 2 non-null values

In [19]:
df = df.na.drop(how='any', subset=['Kcal','Miles','Steps']) # drop rows where any of these columns are null

In [20]:
df = df.na.fill(value='?', subset=['Day']) # fill null values with a string 

In [21]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['Kcal','Miles','Steps'],
    outputCols=['Kcal','Miles','Steps']
).setStrategy('mean')

df = imputer.fit(df).transform(df)
df.show(3)

+---+---------+----+-----+-------+---+----+-----+
|_c0|StepCount|Kcal|Miles|Weather|Day|Walk|Steps|
+---+---------+----+-----+-------+---+----+-----+
|  1|     2615| 8.0|  1.4|  shine|  F|   0|2.615|
|  2|     3323|12.0|  1.8|  shine|  S|   0|3.323|
|  3|     2721|13.0|  1.4|  shine|  U|   0|2.721|
+---+---------+----+-----+-------+---+----+-----+
only showing top 3 rows



Filtering

In [22]:
df.filter('Kcal > 300').show(3)

+---+---------+-----+-----+-------+---+----+------+
|_c0|StepCount| Kcal|Miles|Weather|Day|Walk| Steps|
+---+---------+-----+-----+-------+---+----+------+
| 12|    11251|353.0|  6.3|   cold|  T|   0|11.251|
| 21|    10204|337.0|  5.7|   cold|  R|   0|10.204|
| 39|    11980|378.0|  6.7|   cold|  M|   0| 11.98|
+---+---------+-----+-----+-------+---+----+------+
only showing top 3 rows



In [23]:
df.where('Kcal > 300').show(3)

+---+---------+-----+-----+-------+---+----+------+
|_c0|StepCount| Kcal|Miles|Weather|Day|Walk| Steps|
+---+---------+-----+-----+-------+---+----+------+
| 12|    11251|353.0|  6.3|   cold|  T|   0|11.251|
| 21|    10204|337.0|  5.7|   cold|  R|   0|10.204|
| 39|    11980|378.0|  6.7|   cold|  M|   0| 11.98|
+---+---------+-----+-----+-------+---+----+------+
only showing top 3 rows



In [24]:
df.where(df['Kcal']>300).show(3)

+---+---------+-----+-----+-------+---+----+------+
|_c0|StepCount| Kcal|Miles|Weather|Day|Walk| Steps|
+---+---------+-----+-----+-------+---+----+------+
| 12|    11251|353.0|  6.3|   cold|  T|   0|11.251|
| 21|    10204|337.0|  5.7|   cold|  R|   0|10.204|
| 39|    11980|378.0|  6.7|   cold|  M|   0| 11.98|
+---+---------+-----+-----+-------+---+----+------+
only showing top 3 rows



In [25]:
df.where((df['Kcal']>300)&(df['Miles']>6)).show(3)

+---+---------+-----+-----+-------+---+----+------+
|_c0|StepCount| Kcal|Miles|Weather|Day|Walk| Steps|
+---+---------+-----+-----+-------+---+----+------+
| 12|    11251|353.0|  6.3|   cold|  T|   0|11.251|
| 39|    11980|378.0|  6.7|   cold|  M|   0| 11.98|
| 41|    12787|393.0|  7.2|   cold|  W|   0|12.787|
+---+---------+-----+-----+-------+---+----+------+
only showing top 3 rows



In [26]:
df.where((df['Kcal']>500)|(df['Day']=='T')).show(3)

+---+---------+-----+-----+-------+---+----+------+
|_c0|StepCount| Kcal|Miles|Weather|Day|Walk| Steps|
+---+---------+-----+-----+-------+---+----+------+
|  5|     5528|152.0|  3.1|   cold|  T|   1| 5.528|
| 12|    11251|353.0|  6.3|   cold|  T|   0|11.251|
| 19|     8444|272.0|  4.7|   cold|  T|   0| 8.444|
+---+---------+-----+-----+-------+---+----+------+
only showing top 3 rows



In [27]:
df.filter(~(df['Kcal']>300)).show(3)

+---+---------+----+-----+-------+---+----+-----+
|_c0|StepCount|Kcal|Miles|Weather|Day|Walk|Steps|
+---+---------+----+-----+-------+---+----+-----+
|  1|     2615| 8.0|  1.4|  shine|  F|   0|2.615|
|  2|     3323|12.0|  1.8|  shine|  S|   0|3.323|
|  3|     2721|13.0|  1.4|  shine|  U|   0|2.721|
+---+---------+----+-----+-------+---+----+-----+
only showing top 3 rows



Evaluation a string

In [28]:
from pyspark.sql.functions import expr

expression = 'Kcal + Miles + Steps*0.5'

df.withColumn('Calories', expr(expression)).show(3)

+---+---------+----+-----+-------+---+----+-----+------------------+
|_c0|StepCount|Kcal|Miles|Weather|Day|Walk|Steps|          Calories|
+---+---------+----+-----+-------+---+----+-----+------------------+
|  1|     2615| 8.0|  1.4|  shine|  F|   0|2.615|           10.7075|
|  2|     3323|12.0|  1.8|  shine|  S|   0|3.323|15.461500000000001|
|  3|     2721|13.0|  1.4|  shine|  U|   0|2.721|           15.7605|
+---+---------+----+-----+-------+---+----+-----+------------------+
only showing top 3 rows



Group By

In [29]:
df.groupby('Day').mean().select(['Day','avg(Kcal)','avg(Miles)','avg(Steps)']).show()

+---+------------------+------------------+------------------+
|Day|         avg(Kcal)|        avg(Miles)|        avg(Steps)|
+---+------------------+------------------+------------------+
|  F|154.63636363636363|3.4484848484848487| 6.248121212121212|
|  T|         165.53125| 3.646875000000001|6.5882187499999985|
|  M|157.72413793103448| 3.527586206896552| 6.390896551724138|
|  U| 69.60714285714286| 2.153571428571429| 3.957107142857143|
|  W|         151.96875| 3.312499999999999| 6.034031249999998|
|  S| 86.62857142857143| 2.811428571428571| 5.089285714285713|
|  R|162.97058823529412| 3.514705882352942| 6.356823529411763|
+---+------------------+------------------+------------------+



In [30]:
df.groupby(['Weather','Day']).count().show(3)

+-------+---+-----+
|Weather|Day|count|
+-------+---+-----+
|   cold|  S|   11|
|  shine|  W|   19|
|   rain|  M|    3|
+-------+---+-----+
only showing top 3 rows



In [31]:
from pyspark.sql.functions import desc, asc

df.groupby(['Weather','Day']).count().orderBy(desc('count')).show(5)

+-------+---+-----+
|Weather|Day|count|
+-------+---+-----+
|  shine|  F|   20|
|  shine|  S|   20|
|  shine|  W|   19|
|  shine|  T|   17|
|  shine|  M|   16|
+-------+---+-----+
only showing top 5 rows



In [32]:
from pyspark.sql import functions as F

df.groupby('Day').agg(F.min('Kcal'), F.max('Kcal'), F.count('Day')).show(3)

+---+---------+---------+----------+
|Day|min(Kcal)|max(Kcal)|count(Day)|
+---+---------+---------+----------+
|  F|      0.0|    447.0|        33|
|  T|     10.0|    791.0|        32|
|  M|      0.0|    378.0|        29|
+---+---------+---------+----------+
only showing top 3 rows



SQL Syntax

In [33]:
df.createOrReplaceTempView('Calories')
spark.sql('SELECT * FROM Calories').show(3)

Pivoting

In [34]:
df.show(3)

+---+---------+----+-----+-------+---+----+-----+
|_c0|StepCount|Kcal|Miles|Weather|Day|Walk|Steps|
+---+---------+----+-----+-------+---+----+-----+
|  1|     2615| 8.0|  1.4|  shine|  F|   0|2.615|
|  2|     3323|12.0|  1.8|  shine|  S|   0|3.323|
|  3|     2721|13.0|  1.4|  shine|  U|   0|2.721|
+---+---------+----+-----+-------+---+----+-----+
only showing top 3 rows



In [35]:
df.groupby('Weather').pivot('Day').count().show()

+-------+---+---+---+---+---+---+---+
|Weather|  F|  M|  R|  S|  T|  U|  W|
+-------+---+---+---+---+---+---+---+
|   rain|  6|  3| 11|  4|  4|  3|  4|
|  shine| 20| 16| 15| 20| 17| 12| 19|
|   cold|  7| 10|  8| 11| 11| 13|  9|
+-------+---+---+---+---+---+---+---+



In [36]:
 df.groupby('Weather').pivot('Day',('M','T','W','U','F','S','R')).count().show()

+-------+---+---+---+---+---+---+---+
|Weather|  M|  T|  W|  U|  F|  S|  R|
+-------+---+---+---+---+---+---+---+
|   rain|  3|  4|  4|  3|  6|  4| 11|
|  shine| 16| 17| 19| 12| 20| 20| 15|
|   cold| 10| 11|  9| 13|  7| 11|  8|
+-------+---+---+---+---+---+---+---+



Combining Commands

In [37]:
df.selectExpr(
    'Kcal > 300 as Active',
    'Miles',
    'Weather'
).show(3)

+------+-----+-------+
|Active|Miles|Weather|
+------+-----+-------+
| false|  1.4|  shine|
| false|  1.8|  shine|
| false|  1.4|  shine|
+------+-----+-------+
only showing top 3 rows



In [38]:
df.selectExpr(
    'Kcal > 300 as Active',
    'Miles',
    'Weather'
).groupby('Weather') \
.pivot('Active',('true', 'false')).count().show()

+-------+----+-----+
|Weather|true|false|
+-------+----+-----+
|   rain|   6|   29|
|  shine|   9|  110|
|   cold|   8|   61|
+-------+----+-----+



Split Dataset

In [39]:
train, test = df.randomSplit([0.7, 0.3])

Feature Vectors

In [40]:
col_names = ['Steps','Miles','Walk']

In [41]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=col_names, outputCol='features')

train = assembler.transform(train)
test = assembler.transform(test)

train.show(3)

+---+---------+----+-----+-------+---+----+-----+---------------+
|_c0|StepCount|Kcal|Miles|Weather|Day|Walk|Steps|       features|
+---+---------+----+-----+-------+---+----+-----+---------------+
|  1|     2615| 8.0|  1.4|  shine|  F|   0|2.615|[2.615,1.4,0.0]|
|  3|     2721|13.0|  1.4|  shine|  U|   0|2.721|[2.721,1.4,0.0]|
|  4|     2454|12.0|  1.3|  shine|  M|   0|2.454|[2.454,1.3,0.0]|
+---+---------+----+-----+-------+---+----+-----+---------------+
only showing top 3 rows



Using a Machine Learning Model

In [42]:
from pyspark.ml.regression import LinearRegression

model = LinearRegression(
    featuresCol='features', # features
    labelCol='Kcal', # target
)

model = model.fit(train)

In [43]:
model.coefficients

DenseVector([120.6011, -147.8038, 30.8218])

In [44]:
model.intercept

-101.31479297222

In [45]:
y_preds = model.evaluate(test).predictions;y_preds.show(3)

+---+---------+----+-----+-------+---+----+-----+---------------+------------------+
|_c0|StepCount|Kcal|Miles|Weather|Day|Walk|Steps|       features|        prediction|
+---+---------+----+-----+-------+---+----+-----+---------------+------------------+
|  2|     3323|12.0|  1.8|  shine|  S|   0|3.323|[3.323,1.8,0.0]|33.396032746374004|
|  7|     4988|65.0|  2.7|  shine|  R|   0|4.988|[4.988,2.7,0.0]| 101.1735495830016|
|  9|     4567|35.0|  2.6|  shine|  S|   0|4.567|[4.567,2.6,0.0]|  65.1808461932155|
+---+---------+----+-----+-------+---+----+-----+---------------+------------------+
only showing top 3 rows

