In [1]:
import pandas as pd
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Dataframe').getOrCreate()

In [25]:
df = spark.read.csv('Age.csv', header=True, inferSchema=True)
df.show()

+--------+---+
|    Name|Age|
+--------+---+
|Harrison| 24|
|  Edward| 27|
| Charlie| 25|
|    Luka| 25|
| Kirstin| 51|
|   Harry| 55|
+--------+---+



In [26]:
type(df)

pyspark.sql.dataframe.DataFrame

In [27]:
df.head(3)

[Row(Name='Harrison', Age=24),
 Row(Name='Edward', Age=27),
 Row(Name='Charlie', Age=25)]

In [28]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)



## Selecting and filtering data

In [29]:
df = spark.read.option('header', 'true').csv('Experience.csv')
df.show()

+--------+---+----------+
|    Name|Age|Experience|
+--------+---+----------+
|Harrison| 24|         2|
|  Edward| 27|         2|
| Charlie| 25|         4|
|    Luka| 25|         4|
| Kirstin| 51|        30|
|   Harry| 55|        34|
+--------+---+----------+



In [30]:
df.select(['Name', 'Experience']).show()

+--------+----------+
|    Name|Experience|
+--------+----------+
|Harrison|         2|
|  Edward|         2|
| Charlie|         4|
|    Luka|         4|
| Kirstin|        30|
|   Harry|        34|
+--------+----------+



In [31]:
df['Name']

Column<'Name'>

In [32]:
df.dtypes

[('Name', 'string'), ('Age', 'string'), ('Experience', 'string')]

In [33]:
df.describe().show()

+-------+-------+------------------+------------------+
|summary|   Name|               Age|        Experience|
+-------+-------+------------------+------------------+
|  count|      6|                 6|                 6|
|   mean|   null|              34.5|12.666666666666666|
| stddev|   null|14.418737808837498|15.055453054181621|
|    min|Charlie|                24|                 2|
|    max|   Luka|                55|                 4|
+-------+-------+------------------+------------------+



In [34]:
df = df.withColumn('Experience After 2 years', df['Experience']+2)
df.show()

+--------+---+----------+------------------------+
|    Name|Age|Experience|Experience After 2 years|
+--------+---+----------+------------------------+
|Harrison| 24|         2|                     4.0|
|  Edward| 27|         2|                     4.0|
| Charlie| 25|         4|                     6.0|
|    Luka| 25|         4|                     6.0|
| Kirstin| 51|        30|                    32.0|
|   Harry| 55|        34|                    36.0|
+--------+---+----------+------------------------+



In [35]:
# Drop the columns
df = df.drop('Experience after 2 years')
df.show

<bound method DataFrame.show of DataFrame[Name: string, Age: string, Experience: string]>

In [36]:
# Rename the columns
df = df.withColumnRenamed('Name', 'New Name')
df.show()

+--------+---+----------+
|New Name|Age|Experience|
+--------+---+----------+
|Harrison| 24|         2|
|  Edward| 27|         2|
| Charlie| 25|         4|
|    Luka| 25|         4|
| Kirstin| 51|        30|
|   Harry| 55|        34|
+--------+---+----------+



## Handling missing values

In [37]:
df = spark.read.csv('Salary.csv', header=True, inferSchema=True)
df.show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
|Harrison|  24|         2| 30000|
|  Edward|  27|         2| 30000|
| Charlie|  25|         4| 25000|
|    Luka|  25|         4| 40000|
| Kirstin|  51|        30| 60000|
|   Harry|  55|        34| 34000|
|   Sunny|  29|         4| 20000|
|  Sophia|null|      null| 76000|
|    null|  34|        10| 38000|
|    null|  36|      null|  null|
+--------+----+----------+------+



In [38]:
# Drop the columns
df.na.drop().show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|Harrison| 24|         2| 30000|
|  Edward| 27|         2| 30000|
| Charlie| 25|         4| 25000|
|    Luka| 25|         4| 40000|
| Kirstin| 51|        30| 60000|
|   Harry| 55|        34| 34000|
|   Sunny| 29|         4| 20000|
+--------+---+----------+------+



In [39]:
# how='any' will drop a row where there are any null values
df.na.drop(how='any').show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|Harrison| 24|         2| 30000|
|  Edward| 27|         2| 30000|
| Charlie| 25|         4| 25000|
|    Luka| 25|         4| 40000|
| Kirstin| 51|        30| 60000|
|   Harry| 55|        34| 34000|
|   Sunny| 29|         4| 20000|
+--------+---+----------+------+



In [40]:
df.na.drop(how='any', thresh=3).show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|Harrison| 24|         2| 30000|
|  Edward| 27|         2| 30000|
| Charlie| 25|         4| 25000|
|    Luka| 25|         4| 40000|
| Kirstin| 51|        30| 60000|
|   Harry| 55|        34| 34000|
|   Sunny| 29|         4| 20000|
|    null| 34|        10| 38000|
+--------+---+----------+------+



In [41]:
# Subset to only drop null values where they appear in one column
df.na.drop(how='any', subset=['Experience']).show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|Harrison| 24|         2| 30000|
|  Edward| 27|         2| 30000|
| Charlie| 25|         4| 25000|
|    Luka| 25|         4| 40000|
| Kirstin| 51|        30| 60000|
|   Harry| 55|        34| 34000|
|   Sunny| 29|         4| 20000|
|    null| 34|        10| 38000|
+--------+---+----------+------+



In [42]:
df.show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
|Harrison|  24|         2| 30000|
|  Edward|  27|         2| 30000|
| Charlie|  25|         4| 25000|
|    Luka|  25|         4| 40000|
| Kirstin|  51|        30| 60000|
|   Harry|  55|        34| 34000|
|   Sunny|  29|         4| 20000|
|  Sophia|null|      null| 76000|
|    null|  34|        10| 38000|
|    null|  36|      null|  null|
+--------+----+----------+------+



In [43]:
# Filling missing values
df.na.fill(0, 'Experience').show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
|Harrison|  24|         2| 30000|
|  Edward|  27|         2| 30000|
| Charlie|  25|         4| 25000|
|    Luka|  25|         4| 40000|
| Kirstin|  51|        30| 60000|
|   Harry|  55|        34| 34000|
|   Sunny|  29|         4| 20000|
|  Sophia|null|         0| 76000|
|    null|  34|        10| 38000|
|    null|  36|         0|  null|
+--------+----+----------+------+



In [44]:
df.show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
|Harrison|  24|         2| 30000|
|  Edward|  27|         2| 30000|
| Charlie|  25|         4| 25000|
|    Luka|  25|         4| 40000|
| Kirstin|  51|        30| 60000|
|   Harry|  55|        34| 34000|
|   Sunny|  29|         4| 20000|
|  Sophia|null|      null| 76000|
|    null|  34|        10| 38000|
|    null|  36|      null|  null|
+--------+----+----------+------+



In [45]:
# Fill null values with the mean of the column
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['Age', 'Experience', 'Salary'],
    outputCols=['{}_imputed'.format(c) for c in ['Age', 'Experience', 'Salary']]
    ).setStrategy('mean')

In [46]:
# Add imputation cols to df
imputer.fit(df).transform(df).show()

+--------+----+----------+------+-----------+------------------+--------------+
|    Name| Age|Experience|Salary|Age_imputed|Experience_imputed|Salary_imputed|
+--------+----+----------+------+-----------+------------------+--------------+
|Harrison|  24|         2| 30000|         24|                 2|         30000|
|  Edward|  27|         2| 30000|         27|                 2|         30000|
| Charlie|  25|         4| 25000|         25|                 4|         25000|
|    Luka|  25|         4| 40000|         25|                 4|         40000|
| Kirstin|  51|        30| 60000|         51|                30|         60000|
|   Harry|  55|        34| 34000|         55|                34|         34000|
|   Sunny|  29|         4| 20000|         29|                 4|         20000|
|  Sophia|null|      null| 76000|         34|                11|         76000|
|    null|  34|        10| 38000|         34|                10|         38000|
|    null|  36|      null|  null|       

## Conditional Statements

In [5]:
df = spark.read.csv('Salary.csv', header=True, inferSchema=True)
df.show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
|Harrison|  24|         2| 30000|
|  Edward|  27|         2| 30000|
| Charlie|  25|         4| 25000|
|    Luka|  25|         4| 40000|
| Kirstin|  51|        30| 60000|
|   Harry|  55|        34| 34000|
|   Sunny|  29|         4| 20000|
|  Sophia|null|      null| 76000|
|    null|  34|        10| 38000|
|    null|  36|      null|  null|
+--------+----+----------+------+



In [7]:
# Salary of people less than or equal to 20000
df.filter('Salary<=30000').select(['Name', 'Age']).show()

+--------+---+
|    Name|Age|
+--------+---+
|Harrison| 24|
|  Edward| 27|
| Charlie| 25|
|   Sunny| 29|
+--------+---+



In [10]:
# Multiple conditions with &
df.filter((df['Salary']<=30000) & (df['Age'] <= 25)).show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|Harrison| 24|         2| 30000|
| Charlie| 25|         4| 25000|
+--------+---+----------+------+



In [11]:
# Multiple conditions with |
df.filter((df['Salary']<=30000) | (df['Age'] <= 25)).show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|Harrison| 24|         2| 30000|
|  Edward| 27|         2| 30000|
| Charlie| 25|         4| 25000|
|    Luka| 25|         4| 40000|
|   Sunny| 29|         4| 20000|
+--------+---+----------+------+



In [15]:
# Not condition using ~
df.filter(~(df['Name'] == 'Harrison')).show()

+-------+----+----------+------+
|   Name| Age|Experience|Salary|
+-------+----+----------+------+
| Edward|  27|         2| 30000|
|Charlie|  25|         4| 25000|
|   Luka|  25|         4| 40000|
|Kirstin|  51|        30| 60000|
|  Harry|  55|        34| 34000|
|  Sunny|  29|         4| 20000|
| Sophia|null|      null| 76000|
+-------+----+----------+------+



## GroupBy and Aggregate functions

In [16]:
df = spark.read.csv('Departments.csv', header=True, inferSchema=True)
df.show()

+--------+------------+------+
|    Name| Departments|Salary|
+--------+------------+------+
|Harrison|Data Science| 30000|
|  Edward|Data Science| 30000|
| Charlie|         IOT| 25000|
|    Luka|     Finance| 40000|
| Kirstin|     Product| 60000|
|   Harry|         IOT| 34000|
|   Sunny|    Big Data| 20000|
|  Sophia|Data Science| 76000|
|  Hester|     Product| 38000|
+--------+------------+------+



In [17]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- Salary: integer (nullable = true)



In [24]:
# Groupby operation
df.groupBy('Departments').max().show()

+------------+-----------+
| Departments|max(Salary)|
+------------+-----------+
|         IOT|      34000|
|     Finance|      40000|
|    Big Data|      20000|
|     Product|      60000|
|Data Science|      76000|
+------------+-----------+

