# **Introduction to PySpark**

# Index

1. PySpark instalation
2. How to create a SparkSession
3. Read a CSV file
4. General info about a data frame
5. Column operations (select, add, drop, rename)
6. working with Null values

## 1. PySpark instalation

First of all, install the *pyspark* library with the following line of code: 

In [1]:
#pip install pyspark

Now, we will import the required libraries:

In [2]:
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt

import pyspark

## 2. How to create a SparkSession

In [3]:
# First of all, we need to import the SparkSession class from the pyspark.sql module.
from pyspark.sql import SparkSession 

In [4]:
# now, we will create an instance of the SparkSession class.
spark = SparkSession.builder.appName('Practice').getOrCreate()
spark

## 3. Read a CSV file

In [5]:
# Now, we will create a DataFrame from a CSV file. We will use the read.csv() method for 
# this purpose.
df = spark.read.csv("C:/Users/eespitia/Documents/VSCode Notebooks/sample_data.csv", 
                    header=True, inferSchema=True)

# The parameters header and inferSchema are set to True to ensure that the first row of 
# the CSV file is treated as the header and the schema is inferred automatically.

df.show()

+------+----+----------+------+----------+
|  Name| Age|Experience|Salary|Department|
+------+----+----------+------+----------+
|Andres|  20|        12|  3000|        IT|
|Camilo|  50|      NULL|  5000|        IT|
|  Juan|NULL|         2|  7000| Analitics|
|Miguel|  24|         6|  NULL| Analitics|
| Pedro|  11|         9| 11000|        IT|
|  NULL|  54|         1|  8000| Analitics|
+------+----+----------+------+----------+



In [6]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Department: string (nullable = true)



## 4. General info about a data frame

In order to get some general information about the dataframe, we have the following commands:

In [7]:
type(df)
df.show()
df.printSchema()
df.columns
df.dtypes
df.head(3)
df.describe().show()

+------+----+----------+------+----------+
|  Name| Age|Experience|Salary|Department|
+------+----+----------+------+----------+
|Andres|  20|        12|  3000|        IT|
|Camilo|  50|      NULL|  5000|        IT|
|  Juan|NULL|         2|  7000| Analitics|
|Miguel|  24|         6|  NULL| Analitics|
| Pedro|  11|         9| 11000|        IT|
|  NULL|  54|         1|  8000| Analitics|
+------+----+----------+------+----------+

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Department: string (nullable = true)

+-------+------+----------------+-----------------+----------------+----------+
|summary|  Name|             Age|       Experience|          Salary|Department|
+-------+------+----------------+-----------------+----------------+----------+
|  count|     5|               5|                5|               5|         6|
|   mean|  NULL|            31.8|              

## 5. Column operations (select, add, drop, rename)

In [8]:
# To select a set of columns:
df.select(["Name", "Experience"]).show()

+------+----------+
|  Name|Experience|
+------+----------+
|Andres|        12|
|Camilo|      NULL|
|  Juan|         2|
|Miguel|         6|
| Pedro|         9|
|  NULL|         1|
+------+----------+



In [9]:
# to add a column
df.withColumn("Expreience after 2 years", df["Experience"]+2).show()

+------+----+----------+------+----------+------------------------+
|  Name| Age|Experience|Salary|Department|Expreience after 2 years|
+------+----+----------+------+----------+------------------------+
|Andres|  20|        12|  3000|        IT|                      14|
|Camilo|  50|      NULL|  5000|        IT|                    NULL|
|  Juan|NULL|         2|  7000| Analitics|                       4|
|Miguel|  24|         6|  NULL| Analitics|                       8|
| Pedro|  11|         9| 11000|        IT|                      11|
|  NULL|  54|         1|  8000| Analitics|                       3|
+------+----+----------+------+----------+------------------------+



In [10]:
# To drop a Column
df.drop("Age").show()

+------+----------+------+----------+
|  Name|Experience|Salary|Department|
+------+----------+------+----------+
|Andres|        12|  3000|        IT|
|Camilo|      NULL|  5000|        IT|
|  Juan|         2|  7000| Analitics|
|Miguel|         6|  NULL| Analitics|
| Pedro|         9| 11000|        IT|
|  NULL|         1|  8000| Analitics|
+------+----------+------+----------+



In [11]:
# to rename a column
df.withColumnRenamed("Name", "Full Name").show()

+---------+----+----------+------+----------+
|Full Name| Age|Experience|Salary|Department|
+---------+----+----------+------+----------+
|   Andres|  20|        12|  3000|        IT|
|   Camilo|  50|      NULL|  5000|        IT|
|     Juan|NULL|         2|  7000| Analitics|
|   Miguel|  24|         6|  NULL| Analitics|
|    Pedro|  11|         9| 11000|        IT|
|     NULL|  54|         1|  8000| Analitics|
+---------+----+----------+------+----------+



## 6. working with Null values

### Dropping missing values

In [12]:
# if we want to drop all rows with missing data:
df.na.drop().show()

# Parameters of this function:
#     - how: It can take either "any" or "all" as the value. If "any" is passed, then it drops
#            the row if any of the values are missing. If "all" is passed, then it drops the
#            row only if all the values are missing.
#     - thresh: It takes an integer value as the threshold. If the number of missing values
#               is more than the threshold, then it drops the row.
#     - subset: It takes a list of column names as the value. If the subset parameter is
#               passed, then it drops the row only if the values are missing in the subset
#               of columns.

# For instance, if we want to drop the rows where the values are missing in the Age column,
# then we can use the subset parameter as follows:
df.na.drop(subset=["Age"]).show()

+------+---+----------+------+----------+
|  Name|Age|Experience|Salary|Department|
+------+---+----------+------+----------+
|Andres| 20|        12|  3000|        IT|
| Pedro| 11|         9| 11000|        IT|
+------+---+----------+------+----------+

+------+---+----------+------+----------+
|  Name|Age|Experience|Salary|Department|
+------+---+----------+------+----------+
|Andres| 20|        12|  3000|        IT|
|Camilo| 50|      NULL|  5000|        IT|
|Miguel| 24|         6|  NULL| Analitics|
| Pedro| 11|         9| 11000|        IT|
|  NULL| 54|         1|  8000| Analitics|
+------+---+----------+------+----------+



### Filling missing values

In [13]:
# If we want to fill the missing values with a specific value, then we can use the fill()
# function. This function takes two parameters:
#     - value: It takes a value as the parameter. The missing values are replaced with this
#              value.
#     - subset: It takes a list of column names as the parameter. The missing values are
#               replaced with the value parameter only in the subset of columns.
df.na.fill(value=100, subset=["Age"]).show()

+------+---+----------+------+----------+
|  Name|Age|Experience|Salary|Department|
+------+---+----------+------+----------+
|Andres| 20|        12|  3000|        IT|
|Camilo| 50|      NULL|  5000|        IT|
|  Juan|100|         2|  7000| Analitics|
|Miguel| 24|         6|  NULL| Analitics|
| Pedro| 11|         9| 11000|        IT|
|  NULL| 54|         1|  8000| Analitics|
+------+---+----------+------+----------+



In [14]:
df = (
    df
    .na.fill(value="Sin Nombre", subset=["Name"])
    .na.fill(value=100, subset=["Age"])
    .na.fill(value=0, subset=["Experience"])
    .na.fill(value=0, subset=["Salary"])
)

### Filter Operations

**Select**

In [15]:
# Example: Select the columns "Name", "Age" and "Salary" from the dataframe df
df.select(["Name", "Age", "Salary"]).show()

+----------+---+------+
|      Name|Age|Salary|
+----------+---+------+
|    Andres| 20|  3000|
|    Camilo| 50|  5000|
|      Juan|100|  7000|
|    Miguel| 24|     0|
|     Pedro| 11| 11000|
|Sin Nombre| 54|  8000|
+----------+---+------+



**Where**

In [16]:
# Example: find people whose age is greater than 30
df.filter(df["Age"] > 30).show()

# Alternatively (Using SQL Syntax)
df.filter("Age>30").show()

+----------+---+----------+------+----------+
|      Name|Age|Experience|Salary|Department|
+----------+---+----------+------+----------+
|    Camilo| 50|         0|  5000|        IT|
|      Juan|100|         2|  7000| Analitics|
|Sin Nombre| 54|         1|  8000| Analitics|
+----------+---+----------+------+----------+

+----------+---+----------+------+----------+
|      Name|Age|Experience|Salary|Department|
+----------+---+----------+------+----------+
|    Camilo| 50|         0|  5000|        IT|
|      Juan|100|         2|  7000| Analitics|
|Sin Nombre| 54|         1|  8000| Analitics|
+----------+---+----------+------+----------+



In [17]:
# Example: find people whose Name is is "Juan" or "Camilo" 
df.filter((df["Name"] == "Juan") | (df["Name"] == "Camilo")).show()

#Alternatively
df.filter("Name in ('Juan', 'Camilo')").show()

#Alternatively
df.filter("Name = 'Juan' or Name = 'Camilo'").show()

+------+---+----------+------+----------+
|  Name|Age|Experience|Salary|Department|
+------+---+----------+------+----------+
|Camilo| 50|         0|  5000|        IT|
|  Juan|100|         2|  7000| Analitics|
+------+---+----------+------+----------+

+------+---+----------+------+----------+
|  Name|Age|Experience|Salary|Department|
+------+---+----------+------+----------+
|Camilo| 50|         0|  5000|        IT|
|  Juan|100|         2|  7000| Analitics|
+------+---+----------+------+----------+

+------+---+----------+------+----------+
|  Name|Age|Experience|Salary|Department|
+------+---+----------+------+----------+
|Camilo| 50|         0|  5000|        IT|
|  Juan|100|         2|  7000| Analitics|
+------+---+----------+------+----------+



**SELECT WHERE**

In [18]:
# Example: Select the Name and Salary columns of the people whose age is greater than 30
df.filter(df["Age"] > 30).select(["Name", "Salary"]).show()

# Alternatively
df.filter("Age>30").select(["Name", "Salary"]).show()

+----------+------+
|      Name|Salary|
+----------+------+
|    Camilo|  5000|
|      Juan|  7000|
|Sin Nombre|  8000|
+----------+------+

+----------+------+
|      Name|Salary|
+----------+------+
|    Camilo|  5000|
|      Juan|  7000|
|Sin Nombre|  8000|
+----------+------+



**Group by**

In [19]:
# Example: 

In [20]:
# Now, in order to obtain the mean of the column Age, we proceed as follows:
mean_age = df.select("Age").agg({"Age": "mean"}).collect()
print(mean_age[0][0])

# In this code, the agg funciton is used to calculate the mean of the column Age. The agg
# function takes a dictionary as the parameter. The key of the dictionary is the name of the
# column and the value is the aggregate function. In this case, the mean function is used
# to calculate the mean of the column Age. There are other aggregate functions such as
# min, max, sum, count, etc. The collect function is used to collect the result as a list.

43.166666666666664
