# Spark Intro

A very basic introduction to spark and its features.

## Creating a Spark session

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("sandbox").getOrCreate()

## Reading Data

We can use spark to read data of different types such as `csv`, `json` and `jdbc` into a Spark dataframe.
The `inferSchema=True` tells Pyspark to read the columns with respect to their data types, otherwise Pyspark will read all of the column as string values.

In [2]:
df = spark.read.option('header', 'true').csv('../data/test1.csv')
df2 = spark.read.csv('../data/test2.csv', header=True, inferSchema=True)

## Display Data

In [3]:
df.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



## Get Schema

In [4]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Salary: string (nullable = true)



## Describe

In [5]:
df.describe().show()

+-------+------+------------------+-----------------+------------------+
|summary|  Name|               age|       Experience|            Salary|
+-------+------+------------------+-----------------+------------------+
|  count|     6|                 6|                6|                 6|
|   mean|  null|26.333333333333332|4.666666666666667|21333.333333333332|
| stddev|  null| 4.179314138308661|3.559026084010437| 5354.126134736337|
|    min|Harsha|                21|                1|             15000|
|    max| Sunny|                31|                8|             30000|
+-------+------+------------------+-----------------+------------------+



## Accessing Data

### Get a Column

There are 2 ways to extract a column from a dataframe. The first is with the dot notation and the second is the brackets notation.

In [6]:
experience = df.Experience
experience = df["Experience"]

### Get all Column Name

In [7]:
df.columns

['Name', 'age', 'Experience', 'Salary']

### The Select Statement

This is the Pyspark's equivalent to SQL's select statement that use to select a subset of columns from the dataframe.

In [8]:
df.select(["Name", "Salary"]).show()

+---------+------+
|     Name|Salary|
+---------+------+
|    Krish| 30000|
|Sudhanshu| 25000|
|    Sunny| 20000|
|     Paul| 20000|
|   Harsha| 15000|
|  Shubham| 18000|
+---------+------+



It can also be selected by directly passing in the name of columns without encapsulating them in an array.

In [9]:
df.select("Name", "Salary").show()

+---------+------+
|     Name|Salary|
+---------+------+
|    Krish| 30000|
|Sudhanshu| 25000|
|    Sunny| 20000|
|     Paul| 20000|
|   Harsha| 15000|
|  Shubham| 18000|
+---------+------+



## Manipulating Data

### Adding Column

In [10]:
df = df.withColumn('Experience after 2 years', df.Experience + 2)
df.show()

+---------+---+----------+------+------------------------+
|     Name|age|Experience|Salary|Experience after 2 years|
+---------+---+----------+------+------------------------+
|    Krish| 31|        10| 30000|                    12.0|
|Sudhanshu| 30|         8| 25000|                    10.0|
|    Sunny| 29|         4| 20000|                     6.0|
|     Paul| 24|         3| 20000|                     5.0|
|   Harsha| 21|         1| 15000|                     3.0|
|  Shubham| 23|         2| 18000|                     4.0|
+---------+---+----------+------+------------------------+



### Dropping a Column

In [11]:
df = df.drop('Experience after 2 years')
df.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



### Renaming a Column

In [12]:
df.withColumnRenamed('age', 'Age').show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



## Handling Null Values

### Dropping Rows

Rows with null values can be disposed with the `.na.drop()` method.

In [13]:
# Before dropping
df2.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



The `.na.drop()` is the same as `.na.drop(how="any")` where `any` indicates that if a any data points of a particular row is null, the entire row will be dropped. The other value for `how` parameter is `all` and this tells Pyspark to drop the row if and only if the entire row only consists of null values.

In [14]:
# After dropping
df2.na.drop().show() 

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



### Threshold

Threshold tells Pyspark to retain the row if the number of **non-null** values in the row is at least the number of threshold specified. This will overwrite the `how` parameter.

This is useful to retain meaningful records that has little amount of null values that can be imputed by appropriate values.

In [15]:
df2.na.drop(thresh=3).show() 

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     null| 34|        10| 38000|
+---------+---+----------+------+



### Subset

Subset allows the rows to be dropped if there is null value within the specified column.

In [16]:
df2.na.drop(subset=["Experience"]).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     null| 34|        10| 38000|
+---------+---+----------+------+



### Filling Null Values
Fill the null values for a given column with arbitrary value.

In [17]:
df2.na.fill(0, ['Experience','age']).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|   Mahesh|  0|         0| 40000|
|     null| 34|        10| 38000|
|     null| 36|         0|  null|
+---------+---+----------+------+



### Impute Values

Calculate the best value for filling a null value for a column with *median*, *mean* or *mode*.

In [18]:
from pyspark.ml.feature import Imputer

imputer = (
    Imputer(
        inputCols=['age', 'Experience', 'Salary'], 
        outputCols=["{}_imputed".format(c) for c in ['age', 'Experience', 'Salary']]
    )
    .setStrategy("median")
)

In [19]:
imputer.fit(df2).transform(df2).show()

+---------+----+----------+------+-----------+------------------+--------------+
|     Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
|    Krish|  31|        10| 30000|         31|                10|         30000|
|Sudhanshu|  30|         8| 25000|         30|                 8|         25000|
|    Sunny|  29|         4| 20000|         29|                 4|         20000|
|     Paul|  24|         3| 20000|         24|                 3|         20000|
|   Harsha|  21|         1| 15000|         21|                 1|         15000|
|  Shubham|  23|         2| 18000|         23|                 2|         18000|
|   Mahesh|null|      null| 40000|         29|                 4|         40000|
|     null|  34|        10| 38000|         34|                10|         38000|
|     null|  36|      null|  null|         36|                 4|         20000|
+---------+----+----------+-

## Filtering

Select specific rows that has the given condition met. The `.filter` clause is the same as `.where` clause.

In [20]:
df.filter("Salary <= 20000").show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [21]:
df.filter(df['Salary'] <= 20000).show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [22]:
df.filter((df['Salary'] <= 20000) | 
          (df['Salary'] >= 15000)).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [23]:
df.filter(~(df['Salary'] <= 20000)).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
+---------+---+----------+------+

