### Dependencies

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
     ---------------------------------------- 0.0/317.0 MB ? eta -:--:--
     ---------------------------------------- 0.0/317.0 MB ? eta -:--:--
     -------------------------------------- 0.0/317.0 MB 262.6 kB/s eta 0:20:08
     -------------------------------------- 0.1/317.0 MB 655.4 kB/s eta 0:08:04
     ---------------------------------------- 0.3/317.0 MB 1.3 MB/s eta 0:04:01
     ---------------------------------------- 0.6/317.0 MB 2.5 MB/s eta 0:02:10
     ---------------------------------------- 1.2/317.0 MB 4.2 MB/s eta 0:01:17
     ---------------------------------------- 2.3/317.0 MB 6.9 MB/s eta 0:00:46
      -------------------------------------- 4.5/317.0 MB 12.0 MB/s eta 0:00:27
      -------------------------------------- 5.7/317.0 MB 13.6 MB/s eta 0:00:23
      -------------------------------------- 6.8/317.0 MB 14.6 MB/s eta 0:00:22
     - ------------------------------------- 9.1/317.0 MB 17.6 MB/s e

In [11]:
import pyspark
from pyspark.sql import SparkSession

### Dataset

#### Initialize Spark Session

In [18]:
spark = SparkSession.builder.appName('Practice').getOrCreate()

spark

#### Read .csv with headers

In [39]:
df_pyspark = spark.read.csv('test1.csv', header=True, inferSchema=True)

df_pyspark.show()

+----+---+----------+
|Name|Age|Experience|
+----+---+----------+
|   A| 20|         3|
|   B| 30|         2|
|   C| 40|         5|
+----+---+----------+



In [40]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

### Operations

#### 1. Basic Operations

In [42]:
# Check schema

df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



In [51]:
# Describe dataset

df_pyspark.describe().show()

+-------+----+----+------------------+
|summary|Name| Age|        Experience|
+-------+----+----+------------------+
|  count|   3|   3|                 3|
|   mean|NULL|30.0|3.3333333333333335|
| stddev|NULL|10.0|1.5275252316519465|
|    min|   A|  20|                 2|
|    max|   C|  40|                 5|
+-------+----+----+------------------+



In [44]:
# Check column names

df_pyspark.columns

['Name', 'Age', 'Experience']

In [47]:
# Head

df_pyspark.head(3)

[Row(Name='A', Age=20, Experience=3),
 Row(Name='B', Age=30, Experience=2),
 Row(Name='C', Age=40, Experience=5)]

In [48]:
# Select a column

df_pyspark.select('Age').show()

+---+
|Age|
+---+
| 20|
| 30|
| 40|
+---+



In [49]:
# Select multiple columns

df_pyspark.select(['Name', 'Age']).show()

+----+---+
|Name|Age|
+----+---+
|   A| 20|
|   B| 30|
|   C| 40|
+----+---+



In [54]:
# Add column(s)

df_pyspark = df_pyspark.withColumn('Experience in 2 Years', df_pyspark['Experience'] + 2)

df_pyspark.show()

+----+---+----------+---------------------+
|Name|Age|Experience|Experience in 2 Years|
+----+---+----------+---------------------+
|   A| 20|         3|                    5|
|   B| 30|         2|                    4|
|   C| 40|         5|                    7|
+----+---+----------+---------------------+



In [56]:
# Drop column(s)

df_pyspark = df_pyspark.drop('Experience in 2 Years')

df_pyspark.show()

+----+---+----------+
|Name|Age|Experience|
+----+---+----------+
|   A| 20|         3|
|   B| 30|         2|
|   C| 40|         5|
+----+---+----------+



In [58]:
# Rename column(s)

df_pyspark = df_pyspark.withColumnRenamed('Experience', 'Job Experience')

df_pyspark.show()

+----+---+--------------+
|Name|Age|Job Experience|
+----+---+--------------+
|   A| 20|             3|
|   B| 30|             2|
|   C| 40|             5|
+----+---+--------------+



#### 2. Missing Values

In [63]:
df_pyspark = spark.read.csv('test1.csv', header=True, inferSchema=True)

df_pyspark.show()

+----+---+----------+----+
|Name|Age|Experience|Wage|
+----+---+----------+----+
|   A| 20|         3|  30|
|   B| 30|         2|  27|
|   C| 40|         5|  45|
|NULL| 23|      NULL|  28|
|   E| 43|         6|  75|
|   F| 27|         2|NULL|
+----+---+----------+----+



##### Drop NaNs

In [65]:
# Drop NA

df_pyspark.na.drop().show()

+----+---+----------+----+
|Name|Age|Experience|Wage|
+----+---+----------+----+
|   A| 20|         3|  30|
|   B| 30|         2|  27|
|   C| 40|         5|  45|
|   E| 43|         6|  75|
+----+---+----------+----+



In [69]:
# Drop NA with how parameter

df_pyspark.na.drop(how='any').show()

+----+---+----------+----+
|Name|Age|Experience|Wage|
+----+---+----------+----+
|   A| 20|         3|  30|
|   B| 30|         2|  27|
|   C| 40|         5|  45|
|   E| 43|         6|  75|
+----+---+----------+----+



In [92]:
# Drop NA with thresh parameter

# thresh: drop rows that have less than `thresh` non-null values

df_pyspark.na.drop(thresh=3).show()

+----+---+----------+----+
|Name|Age|Experience|Wage|
+----+---+----------+----+
|   A| 20|         3|  30|
|   B| 30|         2|  27|
|   C| 40|         5|  45|
|   E| 43|         6|  75|
|   F| 27|         2|NULL|
+----+---+----------+----+



In [93]:
# Drop NA with subset parameter

df_pyspark.na.drop(subset=['Experience']).show()

+----+---+----------+----+
|Name|Age|Experience|Wage|
+----+---+----------+----+
|   A| 20|         3|  30|
|   B| 30|         2|  27|
|   C| 40|         5|  45|
|   E| 43|         6|  75|
|   F| 27|         2|NULL|
+----+---+----------+----+



##### Fill NaNs

In [97]:
# Fill strings

df_pyspark.na.fill('NOT AVAILABLE', ['Name']).show()

+-------------+---+----------+----+
|         Name|Age|Experience|Wage|
+-------------+---+----------+----+
|            A| 20|         3|  30|
|            B| 30|         2|  27|
|            C| 40|         5|  45|
|NOT AVAILABLE| 23|      NULL|  28|
|            E| 43|         6|  75|
|            F| 27|         2|NULL|
+-------------+---+----------+----+



In [99]:
# Fill integers

df_pyspark.na.fill(0, ['Experience']).show()

+----+---+----------+----+
|Name|Age|Experience|Wage|
+----+---+----------+----+
|   A| 20|         3|  30|
|   B| 30|         2|  27|
|   C| 40|         5|  45|
|NULL| 23|         0|  28|
|   E| 43|         6|  75|
|   F| 27|         2|NULL|
+----+---+----------+----+



##### Impute NaNs

In [106]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['Age', 'Experience', 'Wage'],
    outputCols=["{}_imputed".format(c) for c in ['Age', 'Experience', 'Wage']]
).setStrategy('mean')

In [107]:
imputer.fit(df_pyspark).transform(df_pyspark).show()

+----+---+----------+----+-----------+------------------+------------+
|Name|Age|Experience|Wage|Age_imputed|Experience_imputed|Wage_imputed|
+----+---+----------+----+-----------+------------------+------------+
|   A| 20|         3|  30|         20|                 3|          30|
|   B| 30|         2|  27|         30|                 2|          27|
|   C| 40|         5|  45|         40|                 5|          45|
|NULL| 23|      NULL|  28|         23|                 3|          28|
|   E| 43|         6|  75|         43|                 6|          75|
|   F| 27|         2|NULL|         27|                 2|          41|
+----+---+----------+----+-----------+------------------+------------+



### Filter Operations

In [110]:
# Method 1

df_pyspark.filter('Wage > 30').show()

+----+---+----------+----+
|Name|Age|Experience|Wage|
+----+---+----------+----+
|   C| 40|         5|  45|
|   E| 43|         6|  75|
+----+---+----------+----+



In [111]:
# Method 2

df_pyspark.filter(df_pyspark['Wage'] > 30).show()

+----+---+----------+----+
|Name|Age|Experience|Wage|
+----+---+----------+----+
|   C| 40|         5|  45|
|   E| 43|         6|  75|
+----+---+----------+----+



In [113]:
# Multiple filter operations

df_pyspark.filter((df_pyspark['Wage'] > 20) & (df_pyspark['Wage'] < 30)).show()

+----+---+----------+----+
|Name|Age|Experience|Wage|
+----+---+----------+----+
|   B| 30|         2|  27|
|NULL| 23|      NULL|  28|
+----+---+----------+----+



### Groupby

In [119]:
df_pyspark.groupBy('Experience').mean().show()

+----------+--------+---------------+---------+
|Experience|avg(Age)|avg(Experience)|avg(Wage)|
+----------+--------+---------------+---------+
|      NULL|    23.0|           NULL|     28.0|
|         6|    43.0|            6.0|     75.0|
|         3|    20.0|            3.0|     30.0|
|         5|    40.0|            5.0|     45.0|
|         2|    28.5|            2.0|     27.0|
+----------+--------+---------------+---------+



In [120]:
df_pyspark.groupBy('Experience').count().show()

+----------+-----+
|Experience|count|
+----------+-----+
|      NULL|    1|
|         6|    1|
|         3|    1|
|         5|    1|
|         2|    2|
+----------+-----+



### Aggregate

In [123]:
df_pyspark.agg({'Name': 'count'}).show()

+-----------+
|count(Name)|
+-----------+
|          5|
+-----------+



### Stop Session

In [124]:
spark.stop()