# Practical on Big Data Tools

***Authored by: Prof. Stephen Fashoto, Brian Msane, and Bandile Malaza***

## PySpark DataFrame

A DataFrame is a distributed dataset comprising data arranged in rows and columns with named attributes. It shares similarities with relational database tables or Python data frame but incorporates sophistacated organizations.

If you have a Python background, I would assume you already know what Pandas DataFrames are. A PySpark dataFrame is similar with Pandas DataFrame, with the exception taht DataFrames are distributed in the cluster (meaning the data in the dataFrame is stored in different machines in a cluster), and any operations in PySpark execute in parallel on all machines, whereas Pandas DataFrames store and operates in a single machine.

## Is PySpark faster than Pandas?
PySpark is a distributed computing framework well-suited for processing large-scale datasets that exceed the memory capacity of a single machine. It can leverage paralled processing across a cluster of machines, enabling faster computations on massive datasets.

On the other hand, Pandas, being a single-machine library, is optimized for smaller to medium-sized datasets taht can fit into memory. It typically performs well for data manipulation and analysis tasks on small to medium datasets.


## Starting a PySpark Session

- Common Keywords
  - `appName()` is used to setup your application name
  - `getOrCreate()` returns a sparksession object if it already exists, else creates a new one.
- Uncommon Keywords
  - `master()`. If you are running it on the cluster, you need to use your master name as an argument to the `master()` method. Usually, it would be either **YARN** (yet another resource negotiator) or **Mesos** depending on your cluster setup.
  - `local["*"]`. When operating in a standalone mode, specify `local[x]`, where `x` is an integer greater than 0, to determine the number of partitions for RDD. Ideally, set `x` to match the number of CPU cores available on your system for optimal performance.


from pyspark.sql import SparkSession

spark = SparkSession.builder\
    .appName("simpleapp")\
    .master("local[*]")\
    .getOrCreate()


## Create a DataFrame

Using a `list` is one of the simplest ways to create a DataFrame. If you already have an RDD, you can easily convert it to DataFrame. Use the `createDataFrame()` method from the SparkSession object to create the DataFrame.

Below are the three differnt ways to create a DataFrame with content in PySpark


### Example 1: List of Lists

In [7]:

data = [
    ["Metfula", 25, "Eswatini"],
    ["Elliot", 28, "Zimbabwe"],
    ["Aina", 30, "Namibia"]
]
df = spark.createDataFrame(data=data, schema=["Name", "Age", "Country"])
df.show() # display the content
df.printSchema()

                                                                                

+-------+---+--------+
|   Name|Age| Country|
+-------+---+--------+
|Metfula| 25|Eswatini|
| Elliot| 28|Zimbabwe|
|   Aina| 30| Namibia|
+-------+---+--------+

root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- Country: string (nullable = true)



### Example 2: List of Tuples

In [8]:
data = [
    ("Fashoto", 'Man United', 'Eswatini is a very good country'),
    ("Malaza", 'Man City', 'PySpark is very nice'),
    ("Msane", 'Man United', 'Ubuntu is better than Windows, frankly')
]
df1 = spark.createDataFrame(data=data, schema=['name', 'club', 'statement'])
df1.show(n=3, truncate=True) # truncate strings more than 20 chars
df1.printSchema()

+-------+----------+--------------------+
|   name|      club|           statement|
+-------+----------+--------------------+
|Fashoto|Man United|Eswatini is a ver...|
| Malaza|  Man City|PySpark is very nice|
|  Msane|Man United|Ubuntu is better ...|
+-------+----------+--------------------+

root
 |-- name: string (nullable = true)
 |-- club: string (nullable = true)
 |-- statement: string (nullable = true)



### Example 3: List of pyspark.sql.Row

In [9]:
from pyspark.sql import Row

data = [
    Row(a=1, b="string1", c=5.5),
    Row(a=2, b='string2', c=7.0),
    Row(a=3, b='string3', c=0.0),
    Row(a=4, b='string4', c=9.9)
]
df2 = spark.createDataFrame(data)
df2.show(truncate=3) # you can also specify the number of chars when you truncate
df2.printSchema()

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|str|5.5|
|  2|str|7.0|
|  3|str|0.0|
|  4|str|9.9|
+---+---+---+

root
 |-- a: long (nullable = true)
 |-- b: string (nullable = true)
 |-- c: double (nullable = true)



## Practice Examples

In [10]:
from pyspark.sql.functions import lit, col, when, avg, count

Data = [
    ("Simphiwe", 25, "Siteki", 2000),
    ("Bandile", 26, "Manzini", 3000),
    ("Nombuso", 35, "Matsapha", 5000),
    ("Faith", 17, "Kwaluseni", 1000),
    ("Bunmi", 37, "Tubungu", 3500),
    ("Godwin", None, "Mbabane", 5000)
]
columns = ["Firstname", "Age", "City", "Salary"]
df = spark.createDataFrame(Data, columns)

print("Content of the original DataFrame")
df.show()

Content of the original DataFrame
+---------+----+---------+------+
|Firstname| Age|     City|Salary|
+---------+----+---------+------+
| Simphiwe|  25|   Siteki|  2000|
|  Bandile|  26|  Manzini|  3000|
|  Nombuso|  35| Matsapha|  5000|
|    Faith|  17|Kwaluseni|  1000|
|    Bunmi|  37|  Tubungu|  3500|
|   Godwin|NULL|  Mbabane|  5000|
+---------+----+---------+------+



In [11]:
# Select specific columns from the dataframe using select() method

print("Selecting all rows on only these 3 columns")
df.select("Firstname", "Age", "Salary").show()

print("Alternative method of selecting: selecting all rows which meet this condition (all columns)")
df.where(df.Age < 30).show()

selecting evevery row on these 3 columns
+---------+----+------+
|Firstname| Age|Salary|
+---------+----+------+
| Simphiwe|  25|  2000|
|  Bandile|  26|  3000|
|  Nombuso|  35|  5000|
|    Faith|  17|  1000|
|    Bunmi|  37|  3500|
|   Godwin|NULL|  5000|
+---------+----+------+

Alternative method of selecting: selecting all rows which meet this condition (all columns)
+---------+---+---------+------+
|Firstname|Age|     City|Salary|
+---------+---+---------+------+
| Simphiwe| 25|   Siteki|  2000|
|  Bandile| 26|  Manzini|  3000|
|    Faith| 17|Kwaluseni|  1000|
+---------+---+---------+------+



Note:\
In the example above we see two different methods that we can use for selecting rows and columns in a DataFrame.
- First, we can use the `select()` method of a DataFrame to list the columns we want to select. This gives us a ll the rows but only the listed attributes/features are retrieved.
- Second, we can ue the `where()` method to give a condition we want the rows to be retrieved to meet.
Of course you can combine these two to form an `SQL`-like syntax where in you want to retrieve only a certain colums for rows which meet the condition specified on the `where`. You can think on the `select()` method as the `SELECT` clause and the `where()` as the `WHERE` clause.

Let's try it:\
Select the **Age** and **Firstname** for all people whose salary is _greater than_ 3000.

In [16]:
df.select('Age', 'Firstname').where(col('Salary') > 3000).show()

                                                                                

+----+---------+
| Age|Firstname|
+----+---------+
|  35|  Nombuso|
|  37|    Bunmi|
|NULL|   Godwin|
+----+---------+



In [12]:
# filter rows based on a condition
print("Filter age that is greater than 20")
df.filter(col("Age") > 20).show()

Filter age that is greater than 20
+---------+---+--------+------+
|Firstname|Age|    City|Salary|
+---------+---+--------+------+
| Simphiwe| 25|  Siteki|  2000|
|  Bandile| 26| Manzini|  3000|
|  Nombuso| 35|Matsapha|  5000|
|    Bunmi| 37| Tubungu|  3500|
+---------+---+--------+------+



                                                                                

In [21]:
df.filter(
    (df.Age > 30) & 
    (df.City=="Tubungu") |
    (df.Salary < 1000)
).show()
          

+---------+---+-------+------+
|Firstname|Age|   City|Salary|
+---------+---+-------+------+
|    Bunmi| 37|Tubungu|  3500|
+---------+---+-------+------+



With our comparison to **Structured Query Language** forming up, we can now start adding more and more conditions using 
- AND = &
- OR =  |
- NOT = !

Note that these conditions are specified inside the `filter()` method and are connected using the connectors. For readability, each condition is written in a new line in the example above. That is, we have three conditions, one regarding **Age**, the other regarding **City**, and the last one regarding **Salary**.

## Adding, renaming, and removing columns from the DataFrame

You can remove/drop columns from the DataFrame using the `drop()` method. Likewise, you can add new columns to the DF using the `withColumn()` method. Also, you can rename columns using the `withColumnRenamed()` method.

One important thing to note is that these methods are not _"inplace methods"_. By that we mean they do not directly affect the DF so we need to assign the result of the operation into a variable of type `DataFrame`.

In [31]:
df = df.withColumn("Bonus", col("Salary") * 0.2)
df.show()

+---------+----+---------+------+------+
|Firstname| Age|     City|Salary| Bonus|
+---------+----+---------+------+------+
| Simphiwe|  25|   Siteki|  2000| 400.0|
|  Bandile|  26|  Manzini|  3000| 600.0|
|  Nombuso|  35| Matsapha|  5000|1000.0|
|    Faith|  17|Kwaluseni|  1000| 200.0|
|    Bunmi|  37|  Tubungu|  3500| 700.0|
|   Godwin|NULL|  Mbabane|  5000|1000.0|
+---------+----+---------+------+------+



In [32]:
df = df.withColumn("Region", lit("Manzini"))
df.show()

+---------+----+---------+------+------+-------+
|Firstname| Age|     City|Salary| Bonus| Region|
+---------+----+---------+------+------+-------+
| Simphiwe|  25|   Siteki|  2000| 400.0|Manzini|
|  Bandile|  26|  Manzini|  3000| 600.0|Manzini|
|  Nombuso|  35| Matsapha|  5000|1000.0|Manzini|
|    Faith|  17|Kwaluseni|  1000| 200.0|Manzini|
|    Bunmi|  37|  Tubungu|  3500| 700.0|Manzini|
|   Godwin|NULL|  Mbabane|  5000|1000.0|Manzini|
+---------+----+---------+------+------+-------+



In [33]:
df = df.withColumnRenamed("Region", "Sigodzi")
df.show()
df.printSchema()

+---------+----+---------+------+------+-------+
|Firstname| Age|     City|Salary| Bonus|Sigodzi|
+---------+----+---------+------+------+-------+
| Simphiwe|  25|   Siteki|  2000| 400.0|Manzini|
|  Bandile|  26|  Manzini|  3000| 600.0|Manzini|
|  Nombuso|  35| Matsapha|  5000|1000.0|Manzini|
|    Faith|  17|Kwaluseni|  1000| 200.0|Manzini|
|    Bunmi|  37|  Tubungu|  3500| 700.0|Manzini|
|   Godwin|NULL|  Mbabane|  5000|1000.0|Manzini|
+---------+----+---------+------+------+-------+

root
 |-- Firstname: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- City: string (nullable = true)
 |-- Salary: long (nullable = true)
 |-- Bonus: double (nullable = true)
 |-- Sigodzi: string (nullable = false)



In [34]:
df=df.drop("Sigodzi")
df.show()

+---------+----+---------+------+------+
|Firstname| Age|     City|Salary| Bonus|
+---------+----+---------+------+------+
| Simphiwe|  25|   Siteki|  2000| 400.0|
|  Bandile|  26|  Manzini|  3000| 600.0|
|  Nombuso|  35| Matsapha|  5000|1000.0|
|    Faith|  17|Kwaluseni|  1000| 200.0|
|    Bunmi|  37|  Tubungu|  3500| 700.0|
|   Godwin|NULL|  Mbabane|  5000|1000.0|
+---------+----+---------+------+------+



## Exercise 1
Create a new column called **New Salary** based on the addition of `Salary` and `Bonus`
Hint: 
 - New Salary = Salary + Bonus
 - `col()`

In [36]:
## Your solution here

## Exercise 2

Rename the `Firstname` column to `Name`

In [None]:
## Your solution here

## Update Columns based on Conditions

- Change the city name for **Nombuso** to Ludzeludze

In [41]:
df = df\
    .withColumn("City", when(col("Firstname") == "Nombuso", lit("Ludzeludze"))\
    .otherwise(col("City")))


df.show()

+---------+----+----------+------+------+
|Firstname| Age|      City|Salary| Bonus|
+---------+----+----------+------+------+
| Simphiwe|  25|    Siteki|  2000| 400.0|
|  Bandile|  26|   Manzini|  3000| 600.0|
|  Nombuso|  35|Ludzeludze|  5000|1000.0|
|    Faith|  17| Kwaluseni|  1000| 200.0|
|    Bunmi|  37|   Tubungu|  3500| 700.0|
|   Godwin|NULL|   Mbabane|  5000|1000.0|
+---------+----+----------+------+------+



                                                                                

## Exercise 3

Change the Manzini City to Mbabane for all people who reside in Manzini.\
Hint
- First get/select the people
- Change only their City

In [None]:
## Your solution here

## Groupings and Aggregations

In [43]:
print("average salary by city")

df.groupBy("City").agg(
    avg("Salary").alias("Avg_Salary"), 
    count('*').alias("Num_employees")
).show()



+----------+----------+-------------+
|      City|Avg_Salary|Num_employees|
+----------+----------+-------------+
|    Siteki|    2000.0|            1|
|Ludzeludze|    5000.0|            1|
|   Manzini|    3000.0|            1|
| Kwaluseni|    1000.0|            1|
|   Mbabane|    5000.0|            1|
|   Tubungu|    3500.0|            1|
+----------+----------+-------------+



                                                                                

In [45]:
print("Sort age in descending order")

df.orderBy(col("Age").desc()).show()

Sort age in descending order
+---------+----+----------+------+------+
|Firstname| Age|      City|Salary| Bonus|
+---------+----+----------+------+------+
|    Bunmi|  37|   Tubungu|  3500| 700.0|
|  Nombuso|  35|Ludzeludze|  5000|1000.0|
|  Bandile|  26|   Manzini|  3000| 600.0|
| Simphiwe|  25|    Siteki|  2000| 400.0|
|    Faith|  17| Kwaluseni|  1000| 200.0|
|   Godwin|NULL|   Mbabane|  5000|1000.0|
+---------+----+----------+------+------+



Note:\
This same operation can be done by using the `desc()` function instead of `col().desc()` but the results are pretty much the same thing.

In [47]:
from pyspark.sql.functions import desc
df.orderBy(desc("Age")).show()

+---------+----+----------+------+------+
|Firstname| Age|      City|Salary| Bonus|
+---------+----+----------+------+------+
|    Bunmi|  37|   Tubungu|  3500| 700.0|
|  Nombuso|  35|Ludzeludze|  5000|1000.0|
|  Bandile|  26|   Manzini|  3000| 600.0|
| Simphiwe|  25|    Siteki|  2000| 400.0|
|    Faith|  17| Kwaluseni|  1000| 200.0|
|   Godwin|NULL|   Mbabane|  5000|1000.0|
+---------+----+----------+------+------+



## Handling Missing Values

- Give a specific value for a column
- Use the same value across the dataset
- Dropping missing values
- Use `Imputer` and measures of central tendency

In [48]:
df.fillna({"Age":0}).show() # for age, fill with 0

+---------+---+----------+------+------+
|Firstname|Age|      City|Salary| Bonus|
+---------+---+----------+------+------+
| Simphiwe| 25|    Siteki|  2000| 400.0|
|  Bandile| 26|   Manzini|  3000| 600.0|
|  Nombuso| 35|Ludzeludze|  5000|1000.0|
|    Faith| 17| Kwaluseni|  1000| 200.0|
|    Bunmi| 37|   Tubungu|  3500| 700.0|
|   Godwin|  0|   Mbabane|  5000|1000.0|
+---------+---+----------+------+------+



In [49]:
df.fillna(0).show() # fill using 0 across the dataset

+---------+---+----------+------+------+
|Firstname|Age|      City|Salary| Bonus|
+---------+---+----------+------+------+
| Simphiwe| 25|    Siteki|  2000| 400.0|
|  Bandile| 26|   Manzini|  3000| 600.0|
|  Nombuso| 35|Ludzeludze|  5000|1000.0|
|    Faith| 17| Kwaluseni|  1000| 200.0|
|    Bunmi| 37|   Tubungu|  3500| 700.0|
|   Godwin|  0|   Mbabane|  5000|1000.0|
+---------+---+----------+------+------+



In [50]:
df.na.drop().show() # drop missing values

+---------+---+----------+------+------+
|Firstname|Age|      City|Salary| Bonus|
+---------+---+----------+------+------+
| Simphiwe| 25|    Siteki|  2000| 400.0|
|  Bandile| 26|   Manzini|  3000| 600.0|
|  Nombuso| 35|Ludzeludze|  5000|1000.0|
|    Faith| 17| Kwaluseni|  1000| 200.0|
|    Bunmi| 37|   Tubungu|  3500| 700.0|
+---------+---+----------+------+------+



In [51]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['Age'],
    outputCols=['Age1'],
    strategy='mean'
)
model = imputer.fit(df)
df1 = model.transform(df)
df1.show()

+---------+----+----------+------+------+----+
|Firstname| Age|      City|Salary| Bonus|Age1|
+---------+----+----------+------+------+----+
| Simphiwe|  25|    Siteki|  2000| 400.0|  25|
|  Bandile|  26|   Manzini|  3000| 600.0|  26|
|  Nombuso|  35|Ludzeludze|  5000|1000.0|  35|
|    Faith|  17| Kwaluseni|  1000| 200.0|  17|
|    Bunmi|  37|   Tubungu|  3500| 700.0|  37|
|   Godwin|NULL|   Mbabane|  5000|1000.0|  28|
+---------+----+----------+------+------+----+



## Handle Duplicates

In [53]:
df.select("Firstname").distinct().show()



+---------+
|Firstname|
+---------+
| Simphiwe|
|  Bandile|
|  Nombuso|
|    Faith|
|   Godwin|
|    Bunmi|
+---------+



                                                                                

In [54]:
df.dropDuplicates(['Salary']).show()

                                                                                

+---------+---+----------+------+------+
|Firstname|Age|      City|Salary| Bonus|
+---------+---+----------+------+------+
|    Faith| 17| Kwaluseni|  1000| 200.0|
| Simphiwe| 25|    Siteki|  2000| 400.0|
|  Bandile| 26|   Manzini|  3000| 600.0|
|    Bunmi| 37|   Tubungu|  3500| 700.0|
|  Nombuso| 35|Ludzeludze|  5000|1000.0|
+---------+---+----------+------+------+

