# 02 Basic Operations

In [1]:
from pyspark.sql import SparkSession
import pandas as pd

# initiate spark session
spark = SparkSession.builder.appName("Basic Operations").getOrCreate()

# load data
df_sales = spark.read.csv("../course_materials/Spark_Dataframes/sales_info.csv", header = True, inferSchema = True)

In [2]:
df_sales.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



---

## 2.1 Viewing Dataframes

### The `head()` Method

The `head()` method will return the top $n$ Row objects of a Spark dataframe, where $n$ can be specified by passing an argument. The default behavior will return only the 1st row.

In [10]:
df_sales.head()

Row(Company='GOOG', Person='Sam', Sales=200.0)

### The `show()` Method

The `show()` method will print the top $n$ rows, but this time as a dataframe. Default behavior is to print the top 20 rows.

In [17]:
# print the top 5 rows
df_sales.show(5)

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|  200|
|   GOOG|Charlie|  120|
|   GOOG|  Frank|  340|
|   MSFT|   Tina|  600|
|   MSFT|    Amy|  124|
+-------+-------+-----+
only showing top 5 rows



### The `orderBy()` Method

The `orderBy()` method sorts the rows of the dataframe according to the values of a given column

In [12]:
df_sales.orderBy("Sales").show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



<br>

---

<br>

## 2.2 Subsetting Dataframes

### The `select()` Method

The `select()` method allows us to select and return a subset of columns as a new dataframe. The columns can be passed as a list of strings (SQL syntax) or using dataframe syntax.

In [25]:
# select() using SQL Syntax
df_sales.select("Person", "Sales").show()

+-------+-----+
| Person|Sales|
+-------+-----+
|    Sam|  200|
|Charlie|  120|
|  Frank|  340|
|   Tina|  600|
|    Amy|  124|
|Vanessa|  243|
|   Carl|  870|
|  Sarah|  350|
|   John|  250|
|  Linda|  130|
|   Mike|  750|
|  Chris|  350|
+-------+-----+



In [27]:
# select() using dataframe syntax
df_sales.select(df_sales["Person"], df_sales["Sales"]).show()

+-------+-----+
| Person|Sales|
+-------+-----+
|    Sam|  200|
|Charlie|  120|
|  Frank|  340|
|   Tina|  600|
|    Amy|  124|
|Vanessa|  243|
|   Carl|  870|
|  Sarah|  350|
|   John|  250|
|  Linda|  130|
|   Mike|  750|
|  Chris|  350|
+-------+-----+



Dataframe syntax is useful since we can manipulate the columns before they are returned:

In [29]:
df_sales.select(
    df_sales["Person"],
    (df_sales["Sales"] / 100).alias("Sales (in Hundreds)")
).show()

+-------+-------------------+
| Person|Sales (in Hundreds)|
+-------+-------------------+
|    Sam|                2.0|
|Charlie|                1.2|
|  Frank|                3.4|
|   Tina|                6.0|
|    Amy|               1.24|
|Vanessa|               2.43|
|   Carl|                8.7|
|  Sarah|                3.5|
|   John|                2.5|
|  Linda|                1.3|
|   Mike|                7.5|
|  Chris|                3.5|
+-------+-------------------+



### The `filter()` Method

The `filter()` method allows us to subset the dataframe by rows, returning only the rows which satisfy some specified condition. This method accepts both SQL Syntax and dataframe syntax.

In [30]:
# subset to rows with sales >= 200; 
# SQL syntax
df_sales.filter("Sales >= 200").show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|  200|
|   GOOG|  Frank|  340|
|   MSFT|   Tina|  600|
|   MSFT|Vanessa|  243|
|     FB|   Carl|  870|
|     FB|  Sarah|  350|
|   APPL|   John|  250|
|   APPL|   Mike|  750|
|   APPL|  Chris|  350|
+-------+-------+-----+



In [32]:
# subset to rows with sales >= 200;
# dataframe syntax
df_sales.filter( df_sales["Sales"] >= 200 ).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|  200|
|   GOOG|  Frank|  340|
|   MSFT|   Tina|  600|
|   MSFT|Vanessa|  243|
|     FB|   Carl|  870|
|     FB|  Sarah|  350|
|   APPL|   John|  250|
|   APPL|   Mike|  750|
|   APPL|  Chris|  350|
+-------+-------+-----+



<br>

---

<br>

## 2.3 Manipulating Columns

### The `withColumns()` Method

Often, we will want take an existing column and manipulate it into a new column. This can be done with the `withColumns()` method by passing a dictionary of the form `{ new column : transformation }`

In [37]:
from pyspark.sql.functions import concat, lit

df_sales.withColumns(
    {
        "Full Name" : concat(df_sales["Person"], lit(" Smith")),
        "Sales (in Hundreds)" : df_sales["Sales"] / 100
    }
).show()

+-------+-------+-----+-------------+-------------------+
|Company| Person|Sales|    Full Name|Sales (in Hundreds)|
+-------+-------+-----+-------------+-------------------+
|   GOOG|    Sam|  200|    Sam Smith|                2.0|
|   GOOG|Charlie|  120|Charlie Smith|                1.2|
|   GOOG|  Frank|  340|  Frank Smith|                3.4|
|   MSFT|   Tina|  600|   Tina Smith|                6.0|
|   MSFT|    Amy|  124|    Amy Smith|               1.24|
|   MSFT|Vanessa|  243|Vanessa Smith|               2.43|
|     FB|   Carl|  870|   Carl Smith|                8.7|
|     FB|  Sarah|  350|  Sarah Smith|                3.5|
|   APPL|   John|  250|   John Smith|                2.5|
|   APPL|  Linda|  130|  Linda Smith|                1.3|
|   APPL|   Mike|  750|   Mike Smith|                7.5|
|   APPL|  Chris|  350|  Chris Smith|                3.5|
+-------+-------+-----+-------------+-------------------+



### The `withColumnsRenamed()` Method

The `withColumnsRenamed()` method is a simple method that renames columns. The method can be used by passing a dictionary of the form `{old column name : new column name}`

In [43]:
df_sales.withColumnsRenamed( {"Person" : "Name", "Company" : "Firm"} ).show()

+----+-------+-----+
|Firm|   Name|Sales|
+----+-------+-----+
|GOOG|    Sam|  200|
|GOOG|Charlie|  120|
|GOOG|  Frank|  340|
|MSFT|   Tina|  600|
|MSFT|    Amy|  124|
|MSFT|Vanessa|  243|
|  FB|   Carl|  870|
|  FB|  Sarah|  350|
|APPL|   John|  250|
|APPL|  Linda|  130|
|APPL|   Mike|  750|
|APPL|  Chris|  350|
+----+-------+-----+



<br>

---

<br>

## 2.4 Aggregations and Group By's

### The `agg()` Method

The `agg()` method applies specified aggregation functions to specified columns. There are multiple syntatical ways to use the `agg()` method, but all of them more or less requires specifying a specific aggregation function to a specified column. The two most common approaches is:

1) Passing a dictionary of the form `{ column : aggregation function }`. Note that this syntax uses column names as keys, which means two different aggregations *cannot* be applied to the same column.

2) Passing expressions of the form `aggregation_function(column)`

In [4]:
# using dictionary syntax
df_sales.agg({"Sales" : "sum"}).show()

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+



In [15]:
# using experssions
from pyspark.sql import functions as psf

df_sales.agg(
    psf.count("Sales").alias("Num. Rows"),
    psf.sum("Sales").alias("Total Sales"), 
    psf.min("Sales").alias("Min Sales"),
    psf.max("Sales").alias("Max Sales"),
    psf.mean("Sales").alias("Avg Sales")
).show()

+---------+-----------+---------+---------+-----------------+
|Num. Rows|Total Sales|Min Sales|Max Sales|        Avg Sales|
+---------+-----------+---------+---------+-----------------+
|       12|     4327.0|    120.0|    870.0|360.5833333333333|
+---------+-----------+---------+---------+-----------------+



### The `groupBy()` Method

The `groupBy()` method allows us to restrict aggregation functions on specific groupings of rows.

In [16]:
df_sales.groupBy("Company").agg(
    psf.count("Sales").alias("Num. Rows"),
    psf.sum("Sales").alias("Total Sales"),
    psf.min("Sales").alias("Min Sales"),
    psf.max("Sales").alias("Max Sales"),
    psf.mean("Sales").alias("Avg Sales")
).show()

+-------+---------+-----------+---------+---------+-----------------+
|Company|Num. Rows|Total Sales|Min Sales|Max Sales|        Avg Sales|
+-------+---------+-----------+---------+---------+-----------------+
|   APPL|        4|     1480.0|    130.0|    750.0|            370.0|
|   GOOG|        3|      660.0|    120.0|    340.0|            220.0|
|     FB|        2|     1220.0|    350.0|    870.0|            610.0|
|   MSFT|        3|      967.0|    124.0|    600.0|322.3333333333333|
+-------+---------+-----------+---------+---------+-----------------+



### Wrapping PySpark Aggregation Functions in `select()`

The aggregation functions imported from `pyspark.sql.functions` can be called on columns of a dataframe and will return Colum objects. Column objects can be gathered by a `select()` statement and returned as a dataframe.

In [19]:
# using SQL syntax
df_sales.select( psf.sum("Sales").alias("Total Sales")).show()

+-----------+
|Total Sales|
+-----------+
|     4327.0|
+-----------+



In [20]:
# using dataframe syntax
df_sales.select( psf.sum(df_sales["Sales"]).alias("Total Sales") ).show()

+-----------+
|Total Sales|
+-----------+
|     4327.0|
+-----------+



<br>

---

<br>

## 2.5 Dealing With Missing Values

In [6]:
df_nulls = spark.read.csv("../course_materials/Spark_Dataframes/ContainsNull.csv", header = True, inferSchema = True)

df_nulls.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp2| NULL| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



### Dropping NAs with `na.drop()`

The `na.drop()` method will drop rows with missing values. The default behavior will drop any rows with *any* missing values in any columns

In [7]:
# drop rows with any NULL values in any columns
df_nulls.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



We can control the exact behavior of `na.drop()` using 3 parameters:

The how parameter changes whether to drop rows with any missing values or all missing values: how = "any" will drop rows with a missing value in any colum, while how = "all" will only drop rows that have missing values in all columns.
The thresh parameter specifies how many non-null values must exist for a row to be kept; this will override the how parameter.
The subset parameter specifies which columns to check for missing values.

In [9]:
# drop only the rows that missing Name AND Sales
df_nulls.na.drop(how = "all", subset = ["Name", "Sales"]).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [11]:
# keep only rows that have at least 2 non-NULL value in any column
df_nulls.na.drop(thresh = 2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



### Imputing NAs with `na.fill()`

The `na.fill()` method will fill NULL values with a specified value. The default behavior will fill *all columns with a compatible type* as the fill value.

In [14]:
# fill all numeric columns with 0
df_nulls.na.fill(0).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| NULL|  0.0|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [15]:
# fill all string columns with "unknown"
df_nulls.na.fill("unknown").show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| NULL|
|emp2|unknown| NULL|
|emp3|unknown|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



The exact behavior of `na.fill()` can be controlled by specifying the fill values to use to each specific column. This is done by passing a dictionary of the form `{ column : fill value}`

In [18]:
df_nulls.na.fill(
    {
        "Name" : "unknown", 
        "Sales" : 0 
    }
).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John|  0.0|
|emp2|unknown|  0.0|
|emp3|unknown|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



An alternative syntax is to pass a `value` argument and a `subset` argument:

In [26]:
df_nulls.na.fill(value = "unknown", subset = ["Name"]).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| NULL|
|emp2|unknown| NULL|
|emp3|unknown|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



### Example: Imputing NAs With The Mean

A common imputation method is **mean imputation** which fills NA values in a column with the mean of that column. The intuition is that the mean is the "constant of best fit", in the sense that the mean is the numerical value that minimizes the residual sum of squares. To perform mean imputation, we first have to compute the mean of column, extract that value, and use it as the argument in `na.fill()`

In [29]:
from pyspark.sql import functions as psf

# compute the mean value of the Sales column;
# this will be returned as a 1x1 spark dataframe
mean_sales = df_nulls.agg(psf.mean(df_nulls["Sales"]).alias("mean_sales"))

# extract the value by accessing the Row object and
# then the value in the Row object
mean_sales = mean_sales.collect()[0]["mean_sales"]

# mean_value should now be a python primitive
mean_sales

400.5

In [30]:
# impute the Sales column with the mean
df_nulls.na.fill({"Sales": mean_sales}).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| NULL|400.5|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



<br>

---

<br>