# SPARK API 



## Documentation: 

- RDD API
    - https://spark.apache.org/docs/latest/rdd-programming-guide.html
    - https://spark.apache.org/docs/latest/api/python/reference/pyspark.html#rdd-apis
    - https://intellipaat.com/blog/tutorial/spark-tutorial/programming-with-rdds/
    
- Spark SQL, Dataframe API: https://spark.apache.org/docs/latest/sql-programming-guide.html


In [1]:
import findspark
findspark.init()
import pyspark
import random

from pyspark.sql import SparkSession

sc = pyspark.SparkContext(appName="SPARK_API")
spark = SparkSession(sc)


.

## RDD API

RDD (Resilient Distributed Dataset) is the core data structure of Spark, which allows distributed processing of data on the Spark cluster. An RDD is a non-modifiable, partitioned dataset with no strict ordering between its elements.

The RDD supports the following basic operations:

- Transformation: RDDs can be transformed using transformation operations. RDDs cannot be modified, so each transformation operation results in a new RDD.
- Action: RDDs can be subject to action operations, the result of which is returned to the Spark control program.

RDDs can be created as follows:

- RDDs can be created from data sources such as text files, databases, or Hadoop HDFS.
- RDDs can be created by transformations of existing RDDs.
- RDDs can be created from data in the Spark driver.

The RDD API is part of PySpark and provides the following functionality for managing RDDs (Resilient Distributed Datasets):

- Transformations: are used to modify the data in RDDs. For example, map(), filter(), flatMap(), reduceByKey(), etc.

- Actions: are used to write out the data of RDDs or to retrieve the results. For example, collect (), take (), first (), count (), etc.

- Persistence: can be used to cache RDDs or to save data. For example, cache(), persist(), saveAsTextFile(), etc.

- Partitioning: RDDs are used to partition and move data in a cluster. Examples are repartition (), coalesce (), etc.


- RDD operations: RDDs provide basic operations for processing data. For example, union (), intersection (), subtract (), etc.

In [2]:
titanic_rdd = spark.read.format("parquet").load("titanic_parquet").rdd

In [4]:
#titanic_rdd.show()

# no show function available

In [5]:
#titanic_rdd.printSchema()

# no function printSchema

In [3]:
titanic_rdd.foreach(print)

### RDD API

| Function | description |
|---|---|
| map(func) | Returns a new RDD while executing the function passed in the parameter |
| flatMap(func) | |
| filter(func) | Returns with a new RDD, while only those elements will be included for which the function returned True |
| reduceByKey(func) | Aggregate values by key |
| groupByKey() | Convert Key/Value pair to Key/List |
| union(otherDataset) | Combines both the current RDD and the RDD passed to the parameter |
| intersection(otherDataset) | Returns the common elements |
| distinct() | Removes duplicate rows |
| coalesce(numPartitions) | Change the partitions to the specified number without reshuffling |
| repartition(numPartitions) | Change the partitions to the specified number, with reshuffling |
| reduce( func) | Aggregate mapped data |
| collect() | Retrieve data in rdd into python list |
| count() | Return the number of rows in RDD in python |
| first() | Returns the first element |
| take( n) | Returns N elements |
| foreach( func) | Executes the func function on all elements of the RDD |

### map function

The map() function is a basic transformation of PySpark RDDs. Using map(), all elements in an RDD can be transformed into a list of elements in a new RDD.

The new RDD returned by map() is the transformed version of the original RDD. The original RDD is not modified.

The map() function has a single parameter, which can be a lambda expression, a function or a method. The lambda expression is usually the simplest solution, especially if the transformation is short and simple. A function or method may also be used if the transformation is more complex and/or requires more parameters.

The use of the map() function is very useful in preparing the data and converting, extracting and transforming dates or numbers into the desired format. When using the map() function, it is important to note that transformations are only performed when action functions (e.g. collect(), count(), saveAsTextFile()) are called with RDD. This means that the map() function does not perform all computations immediately, but only when the RDD is evaluated in some way.

### filter function
The filter() function is used to select from the elements in the RDD only those that satisfy a given condition, which is a lambda expression or a function. Those elements that match the condition are kept in the RDD, while those that do not are discarded.



### count function
The count() function of PySpark RDDs is one of the basic actions to count the number of elements in the RDD.

The count() function has no parameters, and returns an integer number that specifies the number of elements in the RDD. This is useful because it allows the size of the RDD to be checked, which can be particularly important when analysing and processing data.

The count() function is very fast, as it does not check all the elements in the RDD, but only their number. This function can be used to check the size of the RDD and to review and examine data from the RDD.

If the RDD is empty, the result will be 0.



In [7]:
# Extract only the names from the full data, which is the fourth column

names = titanic_rdd.map(lambda x: x[3])
print(names.collect())



['Braund, Mr. Owen Harris', 'Cumings, Mrs. John Bradley (Florence Briggs Thayer)', 'Heikkinen, Miss. Laina', 'Futrelle, Mrs. Jacques Heath (Lily May Peel)', 'Allen, Mr. William Henry', 'Moran, Mr. James', 'McCarthy, Mr. Timothy J', 'Palsson, Master. Gosta Leonard', 'Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)', 'Nasser, Mrs. Nicholas (Adele Achem)', 'Sandstrom, Miss. Marguerite Rut', 'Bonnell, Miss. Elizabeth', 'Saundercock, Mr. William Henry', 'Andersson, Mr. Anders Johan', 'Vestrom, Miss. Hulda Amanda Adolfina', 'Hewlett, Mrs. (Mary D Kingcome) ', 'Rice, Master. Eugene', 'Williams, Mr. Charles Eugene', 'Vander Planke, Mrs. Julius (Emelia Maria Vandemoortele)', 'Masselmani, Mrs. Fatima', 'Fynney, Mr. Joseph J', 'Beesley, Mr. Lawrence', '"McGowan, Miss. Anna ""Annie"""', 'Sloper, Mr. William Thompson', 'Palsson, Miss. Torborg Danira', 'Asplund, Mrs. Carl Oscar (Selma Augusta Emilia Johansson)', 'Emir, Mr. Farred Chehab', 'Fortune, Mr. Charles Alexander', '"O\'Dwyer, Miss. Ellen "

In [4]:
# Total number of passengers in the dataset (.count())

passenger_count = titanic_rdd.count()
print(passenger_count)

891


In [5]:
# How many people were in first class? (Pclass, third column) (.count() function)

titanic_first_class_count = titanic_rdd.filter(lambda row: row[2] == 1) \
                    .count()
print(titanic_first_class_count)

216


In [6]:
# Hány ember volt, akik nem élték túl a Titanic tragédiáját? (.count() függvény segítségével)

titanic_rdd.map(lambda row: int(row[1])) \
           .filter(lambda survived: survived == 0) \
           .count()

549

In [7]:
# What proportion of first class passengers survived compared to second and third class? (.count())

first_class_survived_count = titanic_rdd \
                            .filter(lambda row: row[2] == 1 and row[1] == 1) \
                            .count()
second_class_survived_count = titanic_rdd \
                            .filter(lambda row: row[2] == 2 and row[1] == 1) \
                            .count()
third_class_survived_count = titanic_rdd \
                            .filter(lambda row: row[2] == 3 and row[1] == 1) \
                            .count()
total_survived_count = titanic_rdd \
                      .map(lambda row: int(row[1])) \
                      .filter(lambda survived: survived == 1) \
                      .count()
print(f"First class survival rate: {first_class_survived_count / total_survived_count}")
print(f"Second class survival rate: {second_class_survived_count / total_survived_count}")
print(f"Third class survival rate: {third_class_survived_count / total_survived_count}")

First class survival rate: 0.39766081871345027
Second class survival rate: 0.2543859649122807
Third class survival rate: 0.347953216374269


In [8]:
# What was the ratio of women to men in first class? (.count() function)

first_class_male_count = titanic_rdd \
                         .filter(lambda row: row[2] == 1 and row[4] == "male") \
                         .count()
first_class_female_count = titanic_rdd \
                           .filter(lambda row: row[2] == 1 and row[4] == "female") \
                           .count()
total_first_class_count = titanic_rdd \
                           .filter(lambda row: row[2] == 1) \
                           .count()
print(f"male: {first_class_male_count/total_first_class_count}")
print(f"female: {first_class_female_count/total_first_class_count}")

male: 0.5648148148148148
female: 0.4351851851851852


## Practice



In [13]:
# Extract only the names from the total data, which is the fourth column

names = titanic_rdd.map(lambda x: x[3])
print(names.collect())

['Braund, Mr. Owen Harris', 'Cumings, Mrs. John Bradley (Florence Briggs Thayer)', 'Heikkinen, Miss. Laina', 'Futrelle, Mrs. Jacques Heath (Lily May Peel)', 'Allen, Mr. William Henry', 'Moran, Mr. James', 'McCarthy, Mr. Timothy J', 'Palsson, Master. Gosta Leonard', 'Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)', 'Nasser, Mrs. Nicholas (Adele Achem)', 'Sandstrom, Miss. Marguerite Rut', 'Bonnell, Miss. Elizabeth', 'Saundercock, Mr. William Henry', 'Andersson, Mr. Anders Johan', 'Vestrom, Miss. Hulda Amanda Adolfina', 'Hewlett, Mrs. (Mary D Kingcome) ', 'Rice, Master. Eugene', 'Williams, Mr. Charles Eugene', 'Vander Planke, Mrs. Julius (Emelia Maria Vandemoortele)', 'Masselmani, Mrs. Fatima', 'Fynney, Mr. Joseph J', 'Beesley, Mr. Lawrence', '"McGowan, Miss. Anna ""Annie"""', 'Sloper, Mr. William Thompson', 'Palsson, Miss. Torborg Danira', 'Asplund, Mrs. Carl Oscar (Selma Augusta Emilia Johansson)', 'Emir, Mr. Farred Chehab', 'Fortune, Mr. Charles Alexander', '"O\'Dwyer, Miss. Ellen "

In [14]:
# Total number of passengers in the dataset? 

passenger_count = titanic_rdd.count()
print(passenger_count)

891


In [15]:
# How many people were in first class? (Pclass, third column) 

titanic_first_class_count = titanic_rdd.filter(lambda row: row[2] == 1) \
                    .count()
print(titanic_first_class_count)

216


In [16]:
# How many people did not survive the Titanic tragedy? (.count() function) 

titanic_rdd.map(lambda row: int(row[1])) \
           .filter(lambda survived: survived == 0) \
           .count()

549

In [18]:
# What proportion of first class passengers survived compared to second and third class? (.count())

first_class_survived_count = titanic_rdd \
                            .filter(lambda row: row[2] == 1 and row[1] == 1) \
                            .count()
second_class_survived_count = titanic_rdd \
                            .filter(lambda row: row[2] == 2 and row[1] == 1) \
                            .count()
third_class_survived_count = titanic_rdd \
                            .filter(lambda row: row[2] == 3 and row[1] == 1) \
                            .count()
total_survived_count = titanic_rdd \
                      .map(lambda row: int(row[1])) \
                      .filter(lambda survived: survived == 1) \
                      .count()
print(f"First class survival rate: {first_class_survived_count / total_survived_count}")
print(f"Second class survival rate: {second_class_survived_count / total_survived_count}")
print(f"Third class survival rate: {third_class_survived_count / total_survived_count}")

First class survival rate: 0.39766081871345027
Second class survival rate: 0.2543859649122807
Third class survival rate: 0.347953216374269


In [19]:
# What was the ratio of women to men in first class? (.count() function)

first_class_male_count = titanic_rdd \
                         .filter(lambda row: row[2] == 1 and row[4] == "male") \
                         .count()
first_class_female_count = titanic_rdd \
                           .filter(lambda row: row[2] == 1 and row[4] == "female") \
                           .count()
total_first_class_count = titanic_rdd \
                           .filter(lambda row: row[2] == 1) \
                           .count()
print(f"male: {first_class_male_count/total_first_class_count}")
print(f"female: {first_class_female_count/total_first_class_count}")

male: 0.5648148148148148
female: 0.4351851851851852


### map and reduce function together

PySpark RDDs use the Hadoop-like __MapReduce__ pattern to transform and aggregate data. The transformation is the Map operation, while the aggregation is the Reduce operation. The transformation and aggregation operations are performed in a sequence on PySpark RDDs.

The steps in the MapReduce pattern are as follows:

- Convert: the RDD is converted to another RDD using the Map operation.
- Aggregate: The RDD is aggregated using the Reduce operation.

The general syntax of the Map and Reduce operations is as follows:

```python
rdd.map(lambda x: f(x)).reduce(lambda x, y: g(x, y))
```

Where rdd is an RDD, f(x) is a transformation function, g(x,y) is an aggregation function, and lambda is a lambda expression representing the functions f and g.

The map operation applies the function f to each element of the RDD and creates a new RDD from the results. The reduce operation converts the new RDD to a single value using the function g.

For example, if we want to add an RDD, the MapReduce pattern looks like this:


```python
rdd.map(lambda x: x).reduce(lambda x, y: x + y)
```

This simple example sums all the elements of the RDD. The elements of the RDD are not modified in the map operation, they are just passed to the reduce operation where all elements are summed.

This is a simple example, but the MapReduce pattern is very efficient for aggregating and transforming large data sets. The PySpark RDDs provide very powerful tools for using the MapReduce pattern, allowing users to analyze large data sets.


In [20]:
# What was the age of the youngest passenger on board? (using map and reduce functions)

youngest_age = titanic_rdd \
               .map(lambda row: row[5]) \
               .filter(lambda age: age != None ) \
               .reduce(lambda age1, age2: min(age1, age2))
print(f"Youngest passenger age: {youngest_age}")

Youngest passenger age: 0.42


In [21]:
# How many passengers whose age could not be mapped on board?

traveller_count = titanic_rdd \
                    .map(lambda row: 1) \
                    .reduce(lambda count1, count2: count1 + count2)
print(f"Traveller count: {traveller_count}")

Traveller count: 891


In [22]:
# How many passengers whose age could not be mapped on board?

unknown_age_count = titanic_rdd \
                    .map(lambda row: row[5]) \
                    .filter(lambda age: age == None) \
                    .map(lambda age: 1) \
                    .reduce(lambda count1, count2: count1 + count2)
print(f"Unknown age count: {unknown_age_count}")

Unknown age count: 177


In [23]:
# Please show the names of all passengers in the Titanic database, 
# and then convert them to upper case using the map function.

passenger_names = titanic_rdd \
                 .map(lambda row: row[3]) \
                 .map(lambda name: name.upper()) \
                 .collect()
print(passenger_names)

['BRAUND, MR. OWEN HARRIS', 'CUMINGS, MRS. JOHN BRADLEY (FLORENCE BRIGGS THAYER)', 'HEIKKINEN, MISS. LAINA', 'FUTRELLE, MRS. JACQUES HEATH (LILY MAY PEEL)', 'ALLEN, MR. WILLIAM HENRY', 'MORAN, MR. JAMES', 'MCCARTHY, MR. TIMOTHY J', 'PALSSON, MASTER. GOSTA LEONARD', 'JOHNSON, MRS. OSCAR W (ELISABETH VILHELMINA BERG)', 'NASSER, MRS. NICHOLAS (ADELE ACHEM)', 'SANDSTROM, MISS. MARGUERITE RUT', 'BONNELL, MISS. ELIZABETH', 'SAUNDERCOCK, MR. WILLIAM HENRY', 'ANDERSSON, MR. ANDERS JOHAN', 'VESTROM, MISS. HULDA AMANDA ADOLFINA', 'HEWLETT, MRS. (MARY D KINGCOME) ', 'RICE, MASTER. EUGENE', 'WILLIAMS, MR. CHARLES EUGENE', 'VANDER PLANKE, MRS. JULIUS (EMELIA MARIA VANDEMOORTELE)', 'MASSELMANI, MRS. FATIMA', 'FYNNEY, MR. JOSEPH J', 'BEESLEY, MR. LAWRENCE', '"MCGOWAN, MISS. ANNA ""ANNIE"""', 'SLOPER, MR. WILLIAM THOMPSON', 'PALSSON, MISS. TORBORG DANIRA', 'ASPLUND, MRS. CARL OSCAR (SELMA AUGUSTA EMILIA JOHANSSON)', 'EMIR, MR. FARRED CHEHAB', 'FORTUNE, MR. CHARLES ALEXANDER', '"O\'DWYER, MISS. ELLEN "

## Practice



In [24]:
# What was the age of the youngest passenger on board? (using map and reduce functions)
youngest_age = titanic_rdd \
               .map(lambda row: row[5]) \
               .filter(lambda age: age != None ) \
               .reduce(lambda age1, age2: min(age1, age2))
print(f"Youngest passenger age: {youngest_age}")

Youngest passenger age: 0.42


In [10]:
# What was the age of the oldest passenger on board? (using map and reduce functions)
oldest_age = titanic_rdd \
               .map(lambda row: row[5]) \
               .filter(lambda age: age != None ) \
               .reduce(lambda age1, age2: max(age1, age2))
print(f"Oldest passenger age: {oldest_age}")

Oldest passenger age: 80.0


In [11]:
# Total number of passengers (using map and reduce functions)

traveller_count = titanic_rdd \
                    .map(lambda row: 1) \
                    .reduce(lambda count1, count2 : count1 + count2)
    
print(f"Traveller count: {traveller_count}")

Traveller count: 891


In [13]:
# How many passengers whose age could not be mapped on board?

unknown_age_count = titanic_rdd \
                    .map(lambda row: row[5]) \
                    .filter(lambda age: age == None)\
                    .map(lambda age: 1) \
                    .reduce(lambda count1, count2 : count1 + count2)
    
    
print(f"Unknown age count: {unknown_age_count}")

Unknown age count: 177


In [16]:
# Please show the names of all passengers in the Titanic database, 
# and then convert them to lowercase using the map function.

passenger_names = titanic_rdd \
                 .map(lambda row: row[3])\
                 .map(lambda name: name.lower())\
                 .collect()

print(passenger_names)

['braund, mr. owen harris', 'cumings, mrs. john bradley (florence briggs thayer)', 'heikkinen, miss. laina', 'futrelle, mrs. jacques heath (lily may peel)', 'allen, mr. william henry', 'moran, mr. james', 'mccarthy, mr. timothy j', 'palsson, master. gosta leonard', 'johnson, mrs. oscar w (elisabeth vilhelmina berg)', 'nasser, mrs. nicholas (adele achem)', 'sandstrom, miss. marguerite rut', 'bonnell, miss. elizabeth', 'saundercock, mr. william henry', 'andersson, mr. anders johan', 'vestrom, miss. hulda amanda adolfina', 'hewlett, mrs. (mary d kingcome) ', 'rice, master. eugene', 'williams, mr. charles eugene', 'vander planke, mrs. julius (emelia maria vandemoortele)', 'masselmani, mrs. fatima', 'fynney, mr. joseph j', 'beesley, mr. lawrence', '"mcgowan, miss. anna ""annie"""', 'sloper, mr. william thompson', 'palsson, miss. torborg danira', 'asplund, mrs. carl oscar (selma augusta emilia johansson)', 'emir, mr. farred chehab', 'fortune, mr. charles alexander', '"o\'dwyer, miss. ellen "

### sortBy function

The sortBy function is one of the functions of the PySpark RDD class, which can be used to sort RDD elements by key or value. The first parameter of the sortBy function is the key function, which is used to specify the sorting criteria. The second parameter can be set to False if descending ordering is required, and the default is ascending ordering.

```python
# Create an RDD containing strings
rdd = sc.parallelize(["apple", "banana", "cherry", "date", "elderberry", "fig"])

# Arrange the RDD in ascending order by the length of the strings
sorted_rdd = rdd.sortBy(lambda x: len(x))

# Print the result
print(sorted_rdd.collect())
```
Result:
```
['fig', 'date', 'apple', 'banana', 'cherry', 'elderberry']
```
In the above example, we have sorted the elements of the RDD in ascending order by the length of the strings using the sortBy function. In the result, the elements of the RDD are sorted by the initial letter if they are of the same length. The collect function is used to print the result.

### take function

The take function is a PySpark operation that returns the first n elements of the RDD in the form of a list. This function can be useful when examining the RDD, as it is not necessary to iterate over all the elements, but only the first few. Here is an example of using the take function:

```python
# Create an RDD with numbers
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

# Take the first 3 elements from the RDD
result = rdd.take(3)

# Write the result
print(result)
```
Result:
```python
[1, 2, 3]
```



In [17]:
# Make a list of the ages of the first 10 passengers in the Titanic dataset! 
# Use the map function to retrieve the ages, filter out those with None, 
# then sort them in ascending order and select the first 10.

first_10_ages = titanic_rdd \
                .map(lambda row: row[5]) \
                .filter(lambda age: age != None) \
                .map(lambda age: age) \
                .sortBy(lambda age: age) \
                .take(10)
print(first_10_ages)

[0.42, 0.67, 0.75, 0.75, 0.83, 0.83, 0.92, 1.0, 1.0, 1.0]


## Practice

In [18]:
# Make a list of the ages of the first 5 passengers in the Titanic dataset! 
# Use the map function to retrieve the ages, filter out those with None, 
# then sort them in descending order and select the first 5.

first_10_ages = titanic_rdd \
                .map(lambda row: row[5]) \
                .filter(lambda age: age != None) \
                .map(lambda age: age) \
                .sortBy(lambda age: age, False) \
                .take(5)
print(first_10_ages)

[80.0, 74.0, 71.0, 71.0, 70.5]


### partitioning

PySpark RDDs can be divided into parts, called partitions, into which the elements of the RDD are grouped. The number of particles can be set when the RDD is created or will be the default number set in the Spark context.

The role of partitioning is to divide the RDD into independent parts in the Spark cluster that can be processed in parallel. This allows Spark to make efficient use of available resources and process RDD elements faster.

Partitioning the RDD can be advantageous when the RDD is large or when you want to perform operations that allow parallel execution.

Partitioning of the RDD can be performed using the repartition or coalesce functions. The repartition function reorganizes the RDD to the specified number of partitions, while the coalesce function reorganizes the RDD to the default partitioning level.

```python
# Create an RDD with numbers
rdd = sc.parallelize(range(100), numSlices=4)

# Set the partitioning number to 2
rdd = rdd.repartition(2)

# Examine the number of partitions
print(rdd.getNumPartitions())

```
Result:

```python
2

```

In the above example, we first create an RDD with numbers, and then set the number of partitions to 4 using the numSlices parameter of the parallelize function. Then, using the repartition function, we transform the RDD to contain only 2 partitions. Finally, the getNumPartitions function is used to check the number of partitions and make sure that it is set to 2.

In [19]:
# Retrieve the number of partitions

titanic_rdd.getNumPartitions()

1

In [20]:
titanic_rdd.repartition(15).getNumPartitions()

15

In [21]:
# Please describe how much data the particles contain

partition_sizes = titanic_rdd.repartition(15).repartition(3).mapPartitions(lambda partition: [len(list(partition))]).collect()
print(partition_sizes)

[300, 300, 291]


### distinct function

The distinct function in PySpark is an operation on the RDD that removes duplicate elements and keeps only unique elements. The function does not perform any aggregation during its operation, i.e. the order and number of elements in the result is the same as the unique elements in the RDD.

The use of the distinct function is very simple, just call it on the RDD you want to apply it to and you will get the RDD with the unique elements. In the following example we will filter out the unique elements of the RDD:
    
```python
# Create RDD
rdd = sc.parallelize([1, 2, 3, 2, 1, 4, 5, 3, 6, 7, 6, 8, 9, 9])

# Filter RDD to individual elements
unique_rdd = rdd.distinct()

# Write out the unique elements
print(unique_rdd.collect())

```
Result:

```python
[1, 2, 3, 4, 5, 6, 7, 8, 9]

```



In [23]:
# Please describe the different passenger classes without repeating

titanic_rdd.map(lambda x: x[2]) \
           .distinct()          \
           .foreach(print)

### union function

The union function is used in PySpark to join two RDDs. When the function operates, the elements of the first RDD in the result will follow the elements of the second RDD, and the result RDD will be of the same type as the two input RDDs.

Using the union function is very simple, just call it on the two RDDs you want to apply it to, and the elements of the first RDD will follow the elements of the second RDD in the result. In the following example, two RDDs are merged:  
```python
# Create the first RDD
rdd1 = sc.parallelize([1, 2, 3, 4, 5])

# Create the second RDD
rdd2 = sc.parallelize([6, 7, 8, 9, 10])

# Merge the RDDs
union_rdd = rdd1.union(rdd2)

# Print the result
print(union_rdd.collect())


```
Result:

```python
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

```


In [24]:
# Please describe men and women separately and then combine the two RDDs with a union function.

male_rdd = titanic_rdd.filter(lambda row: row[4] == "male")
female_rdd = titanic_rdd.filter(lambda row: row[4] == "female")

male_and_female = male_rdd.union(female_rdd)

print(male_and_female.collect())

[Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S'), Row(PassengerId=5, Survived=0, Pclass=3, Name='Allen, Mr. William Henry', Sex='male', Age=35.0, SibSp=0, Parch=0, Ticket='373450', Fare=8.05, Cabin=None, Embarked='S'), Row(PassengerId=6, Survived=0, Pclass=3, Name='Moran, Mr. James', Sex='male', Age=None, SibSp=0, Parch=0, Ticket='330877', Fare=8.4583, Cabin=None, Embarked='Q'), Row(PassengerId=7, Survived=0, Pclass=1, Name='McCarthy, Mr. Timothy J', Sex='male', Age=54.0, SibSp=0, Parch=0, Ticket='17463', Fare=51.8625, Cabin='E46', Embarked='S'), Row(PassengerId=8, Survived=0, Pclass=3, Name='Palsson, Master. Gosta Leonard', Sex='male', Age=2.0, SibSp=3, Parch=1, Ticket='349909', Fare=21.075, Cabin=None, Embarked='S'), Row(PassengerId=13, Survived=0, Pclass=3, Name='Saundercock, Mr. William Henry', Sex='male', Age=20.0, SibSp=0, Parch=0, Ticket='A/5. 2151', Fare=8.0

### reduceByKey function

The reduceByKey() function in PySpark is a transformation that can be applied to key-value pairs. The function takes an RDD stored in a key-value pair and returns another RDD in which the values associated with the keys are aggregated.

The reduceByKey() function is extremely simple to use. First, an RDD with key-value pairs must be created. Then apply the reduceByKey() function to the RDD, passing the reducing function as the first argument. The reducing function has two inputs and returns an output of the same type.

In the example below, an RDD is created in which the frequency of occurrence of each letter is calculated:  
```python
# Create RDD with key-value pairs
rdd = sc.parallelize(['a', 'b', 'c', 'a', 'b', 'a', 'd', 'b', 'c'])

# Create key-value pairs
key_value_rdd = rdd.map(lambda x: (x, 1))

# Aggregate keys by values
sums_rdd = key_value_rdd.reduceByKey(lambda a, b: a + b)

# Print the result
print(sums_rdd.collect())


```
In the result, all occurrences are summed up by key:


```python
[('a', 3), ('b', 3), ('c', 2), ('d', 1)]

```

### reduceByKey comparison with groupByKey

The reduceByKey() and groupByKey() functions are both used for aggregation in Spark. However, there are differences between the two functions:

groupByKey(): aggregates all the values for a given key into a given list and returns the keys and their associated values. It groups all the values associated with a key into a single iterable object (for example, a list) and returns this object with the keys. If the input RDD is very large, the groupByKey() operation may cause memory problems, since it puts all values into a single list.

reduceByKey(): Collect the values associated with the given keys and then apply the reduce() function to all the values assigned to the key. It then returns an RDD containing the keys and the aggregated values. The reduceByKey() operation allows Spark to pre-aggregate the data by key during partitioning, thus reducing communication between partitions and the memory required.

So the main difference is that reduceByKey() pre-partitions the data before aggregation and reduces memory requirements, whereas groupByKey() stores all values in a single list, which can cause memory problems for large amounts of data. In addition, reduceByKey() may be more efficient if the function used for aggregation is computationally intensive.

In [25]:
# Use Map-reduceByKey to count how many people travelled in which class?

class_travelers = titanic_rdd.filter(lambda row: row[2] != None) \
                             .map(lambda x: (x[2],1)) \
                             .reduceByKey(lambda a, b: a + b) \
                             .collect()

print(class_travelers)

[(3, 491), (1, 216), (2, 184)]


In [26]:
# Use Map-groupByKey to count how many people travelled in which class?

class_travelers = titanic_rdd.filter(lambda row: row[2] != None) \
                             .map(lambda x: (x[2],1)) \
                             .groupByKey().mapValues(sum) \
                             .collect()

print(class_travelers)

[(3, 491), (1, 216), (2, 184)]


## Practice

In [29]:
# Partition the titanic dataset into 15 partitions, then partition it back into 1 partition. 

# At the end, ask how many particles it consists of. 

fifteen_part_rdd = titanic_rdd.repartition(15).repartition(1)

fifteen_part_rdd.getNumPartitions()



1

In [31]:
# Please describe the different passenger classes without repeating

titanic_rdd.map(lambda x: x[2]) \
           .distinct() \
           .foreach(print)

In [32]:
# Please describe men and women separately and then combine the two RDDs with a union function.

male_rdd = titanic_rdd.filter(lambda row: row[4] == "male")
female_rdd = titanic_rdd.filter(lambda row: row[4] == "female")

male_and_female = female_rdd.union(male_rdd)

print(male_and_female.collect())

[Row(PassengerId=2, Survived=1, Pclass=1, Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age=38.0, SibSp=1, Parch=0, Ticket='PC 17599', Fare=71.2833, Cabin='C85', Embarked='C'), Row(PassengerId=3, Survived=1, Pclass=3, Name='Heikkinen, Miss. Laina', Sex='female', Age=26.0, SibSp=0, Parch=0, Ticket='STON/O2. 3101282', Fare=7.925, Cabin=None, Embarked='S'), Row(PassengerId=4, Survived=1, Pclass=1, Name='Futrelle, Mrs. Jacques Heath (Lily May Peel)', Sex='female', Age=35.0, SibSp=1, Parch=0, Ticket='113803', Fare=53.1, Cabin='C123', Embarked='S'), Row(PassengerId=9, Survived=1, Pclass=3, Name='Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)', Sex='female', Age=27.0, SibSp=0, Parch=2, Ticket='347742', Fare=11.1333, Cabin=None, Embarked='S'), Row(PassengerId=10, Survived=1, Pclass=2, Name='Nasser, Mrs. Nicholas (Adele Achem)', Sex='female', Age=14.0, SibSp=1, Parch=0, Ticket='237736', Fare=30.0708, Cabin=None, Embarked='C'), Row(PassengerId=11, Survived=1, Pclas

In [33]:
# Use Map-reduceByKey to count how many people travelled in which class?

class_travelers = titanic_rdd.filter(lambda row: row[2] != None) \
                             .map(lambda x: (x[2],1)) \
                             .reduceByKey(lambda a, b: a + b  ) \
                             .collect()

print(class_travelers)

[(3, 491), (1, 216), (2, 184)]


## PySpark Dataframe API

The PySpark DataFrame is a structured data store representing a table of data consisting of columns, similar to the Pandas DataFrame. DataFrames have several advantages over RDDs, such as structured data management, faster operations, and support for Python Pandas-like language elements.

The PySpark DataFrame API provides various functions for manipulating data, including:

- Loading and saving data: the DataFrame API provides a number of functions for loading and saving data from various data sources such as CSV, JSON, Parquet, avro, etc. For example: spark.read.csv(), spark.write.parquet().

 - Data transformations: The DataFrame API allows you to transform data using standard SQL operations such as SELECT, WHERE, GROUP BY, JOIN, UNION, etc. For example: df.select(), df.filter(), df.groupBy(), df.join().

- Aggregation and Aggregation: The DataFrame API allows aggregation and aggregation of data using various methods such as agg() and groupBy(). For example: df.groupBy().agg()

- Data handling and processing: The DataFrame API allows data handling and processing, such as handling missing data, renaming columns, converting column type, etc. For example: df.na.fill(), df.withColumnRenamed(), df.withColumn()

- Window functions: the DataFrame API allows you to sort and aggregate data into windows using the window() function.

The DataFrame API provides additional functions, such as PySpark SQL functions. The PySpark DataFrame API can be explored in detail in the official PySpark documentation.

In [35]:
#Read our source file

titanic_df = spark.read.format("parquet").load("titanic_parquet")

### select function

The select() method is one of the data transformations that can be used in the PySpark DataFrame API to select or rename the columns of a given DataFrame.

The select() method can be used to select multiple columns at once:

```python
titanic_df.select("name", "age").show()
```

The select() method can also be used to rename columns:
```python
titanic_df.select(df["name"], df["age"].alias("years")).show()
```


In [36]:
# Please deselect the Survived and Name columns

selected = titanic_df.select("Survived", "Name")
selected.show()

+--------+--------------------+
|Survived|                Name|
+--------+--------------------+
|       0|Braund, Mr. Owen ...|
|       1|Cumings, Mrs. Joh...|
|       1|Heikkinen, Miss. ...|
|       1|Futrelle, Mrs. Ja...|
|       0|Allen, Mr. Willia...|
|       0|    Moran, Mr. James|
|       0|McCarthy, Mr. Tim...|
|       0|Palsson, Master. ...|
|       1|Johnson, Mrs. Osc...|
|       1|Nasser, Mrs. Nich...|
|       1|Sandstrom, Miss. ...|
|       1|Bonnell, Miss. El...|
|       0|Saundercock, Mr. ...|
|       0|Andersson, Mr. An...|
|       0|Vestrom, Miss. Hu...|
|       1|Hewlett, Mrs. (Ma...|
|       0|Rice, Master. Eugene|
|       1|Williams, Mr. Cha...|
|       0|Vander Planke, Mr...|
|       1|Masselmani, Mrs. ...|
+--------+--------------------+
only showing top 20 rows



In [37]:
# Please describe those who survived and their names where the name includes "Mrs."
selected = titanic_df.select("Survived", "Name") \
                     .filter(titanic_df.Survived == 1) \
                     .filter(titanic_df.Name.contains("Mrs"))
selected.show()

+--------+--------------------+
|Survived|                Name|
+--------+--------------------+
|       1|Cumings, Mrs. Joh...|
|       1|Futrelle, Mrs. Ja...|
|       1|Johnson, Mrs. Osc...|
|       1|Nasser, Mrs. Nich...|
|       1|Hewlett, Mrs. (Ma...|
|       1|Masselmani, Mrs. ...|
|       1|Asplund, Mrs. Car...|
|       1|Spencer, Mrs. Wil...|
|       1|Harper, Mrs. Henr...|
|       1|Faunthorpe, Mrs. ...|
|       1|Nye, Mrs. (Elizab...|
|       1|Backstrom, Mrs. K...|
|       1|Doling, Mrs. John...|
|       1|Weisz, Mrs. Leopo...|
|       1|Hakkarainen, Mrs....|
|       1|Pears, Mrs. Thoma...|
|       1|"Watt, Mrs. James...|
|       1|Chibnall, Mrs. (E...|
|       1|"O'Brien, Mrs. Th...|
|       1| Pinsky, Mrs. (Rosa)|
+--------+--------------------+
only showing top 20 rows



In [38]:
# Select and rename multiple columns with alias():
selected = titanic_df.select(titanic_df["Survived"].alias("is_alive"), titanic_df["Name"].alias("full_name"))
selected.show()

+--------+--------------------+
|is_alive|           full_name|
+--------+--------------------+
|       0|Braund, Mr. Owen ...|
|       1|Cumings, Mrs. Joh...|
|       1|Heikkinen, Miss. ...|
|       1|Futrelle, Mrs. Ja...|
|       0|Allen, Mr. Willia...|
|       0|    Moran, Mr. James|
|       0|McCarthy, Mr. Tim...|
|       0|Palsson, Master. ...|
|       1|Johnson, Mrs. Osc...|
|       1|Nasser, Mrs. Nich...|
|       1|Sandstrom, Miss. ...|
|       1|Bonnell, Miss. El...|
|       0|Saundercock, Mr. ...|
|       0|Andersson, Mr. An...|
|       0|Vestrom, Miss. Hu...|
|       1|Hewlett, Mrs. (Ma...|
|       0|Rice, Master. Eugene|
|       1|Williams, Mr. Cha...|
|       0|Vander Planke, Mr...|
|       1|Masselmani, Mrs. ...|
+--------+--------------------+
only showing top 20 rows



Columns can also be referenced using the col function, but first you need to import


In [39]:
from pyspark.sql.functions import col

In [40]:
# Let's select the names and ages of all passengers:

titanic_df.select(col("Name"), col("Age")).show()

+--------------------+----+
|                Name| Age|
+--------------------+----+
|Braund, Mr. Owen ...|22.0|
|Cumings, Mrs. Joh...|38.0|
|Heikkinen, Miss. ...|26.0|
|Futrelle, Mrs. Ja...|35.0|
|Allen, Mr. Willia...|35.0|
|    Moran, Mr. James|null|
|McCarthy, Mr. Tim...|54.0|
|Palsson, Master. ...| 2.0|
|Johnson, Mrs. Osc...|27.0|
|Nasser, Mrs. Nich...|14.0|
|Sandstrom, Miss. ...| 4.0|
|Bonnell, Miss. El...|58.0|
|Saundercock, Mr. ...|20.0|
|Andersson, Mr. An...|39.0|
|Vestrom, Miss. Hu...|14.0|
|Hewlett, Mrs. (Ma...|55.0|
|Rice, Master. Eugene| 2.0|
|Williams, Mr. Cha...|null|
|Vander Planke, Mr...|31.0|
|Masselmani, Mrs. ...|null|
+--------------------+----+
only showing top 20 rows



In [41]:
#Let's select the names, ages and embarkation dates of all passengers:

titanic_df.select(col("Name"), col("Age"), col("Embarked")).show()

+--------------------+----+--------+
|                Name| Age|Embarked|
+--------------------+----+--------+
|Braund, Mr. Owen ...|22.0|       S|
|Cumings, Mrs. Joh...|38.0|       C|
|Heikkinen, Miss. ...|26.0|       S|
|Futrelle, Mrs. Ja...|35.0|       S|
|Allen, Mr. Willia...|35.0|       S|
|    Moran, Mr. James|null|       Q|
|McCarthy, Mr. Tim...|54.0|       S|
|Palsson, Master. ...| 2.0|       S|
|Johnson, Mrs. Osc...|27.0|       S|
|Nasser, Mrs. Nich...|14.0|       C|
|Sandstrom, Miss. ...| 4.0|       S|
|Bonnell, Miss. El...|58.0|       S|
|Saundercock, Mr. ...|20.0|       S|
|Andersson, Mr. An...|39.0|       S|
|Vestrom, Miss. Hu...|14.0|       S|
|Hewlett, Mrs. (Ma...|55.0|       S|
|Rice, Master. Eugene| 2.0|       Q|
|Williams, Mr. Cha...|null|       S|
|Vander Planke, Mr...|31.0|       S|
|Masselmani, Mrs. ...|null|       C|
+--------------------+----+--------+
only showing top 20 rows



In [42]:
# Let's select the names of all passengers, their age and whether they survived the shipwreck:
titanic_df.select(col("Name"), col("Age"), col("Survived")).show()

+--------------------+----+--------+
|                Name| Age|Survived|
+--------------------+----+--------+
|Braund, Mr. Owen ...|22.0|       0|
|Cumings, Mrs. Joh...|38.0|       1|
|Heikkinen, Miss. ...|26.0|       1|
|Futrelle, Mrs. Ja...|35.0|       1|
|Allen, Mr. Willia...|35.0|       0|
|    Moran, Mr. James|null|       0|
|McCarthy, Mr. Tim...|54.0|       0|
|Palsson, Master. ...| 2.0|       0|
|Johnson, Mrs. Osc...|27.0|       1|
|Nasser, Mrs. Nich...|14.0|       1|
|Sandstrom, Miss. ...| 4.0|       1|
|Bonnell, Miss. El...|58.0|       1|
|Saundercock, Mr. ...|20.0|       0|
|Andersson, Mr. An...|39.0|       0|
|Vestrom, Miss. Hu...|14.0|       0|
|Hewlett, Mrs. (Ma...|55.0|       1|
|Rice, Master. Eugene| 2.0|       0|
|Williams, Mr. Cha...|null|       1|
|Vander Planke, Mr...|31.0|       0|
|Masselmani, Mrs. ...|null|       1|
+--------------------+----+--------+
only showing top 20 rows



The selectExpr() method allows you to use SQL-like expressions to select, transform and rename columns in a DataFrame.

Example of using selectExpr() on Titanic data:

```python
from pyspark.sql.functions import expr

# Create DataFrame
df = spark.read.format("csv").option("header", "true").load("titanic.csv")

# use selectExpr
df.selectExpr("Survived as label", "Age", "Fare", "Sex", "Embarked", "Pclass", "SibSp + Parch as FamilySize") \
  .show()


```

In this example, the Age, Fare, Sex, Embarked, Pclass columns are simply taken from the DataFrame, the SibSp and Parch columns are added together and renamed to the FamilySize column, and the Survived column is renamed to label.

When using selectExpr(), you can specify columns as space-separated strings and modify column names with SQL-like expressions. You can also use the expr() function to specify expressions.

In [43]:
# Name and ID of passengers with more than 5 siblings or spouses on board:

titanic_df.select("Name", "PassengerId", "SibSp").filter("SibSp > 5").show()


+--------------------+-----------+-----+
|                Name|PassengerId|SibSp|
+--------------------+-----------+-----+
|Sage, Master. Tho...|        160|    8|
|Sage, Miss. Const...|        181|    8|
| Sage, Mr. Frederick|        202|    8|
|Sage, Mr. George ...|        325|    8|
|Sage, Miss. Stell...|        793|    8|
|Sage, Mr. Douglas...|        847|    8|
|"Sage, Miss. Doro...|        864|    8|
+--------------------+-----------+-----+



In [44]:
#Name and age of passenger younger than 50 who bought the most expensive ticket:

from pyspark.sql.functions import desc

titanic_df.select("Name", "Age", "Fare").filter("Age < 50").orderBy(desc("Fare")).limit(1).show()



+----------------+----+--------+
|            Name| Age|    Fare|
+----------------+----+--------+
|Ward, Miss. Anna|35.0|512.3292|
+----------------+----+--------+



## Practice



In [45]:
# Please select the Survived and Name columns

selected = titanic_df.select("Survived", "Name")
selected.show()

+--------+--------------------+
|Survived|                Name|
+--------+--------------------+
|       0|Braund, Mr. Owen ...|
|       1|Cumings, Mrs. Joh...|
|       1|Heikkinen, Miss. ...|
|       1|Futrelle, Mrs. Ja...|
|       0|Allen, Mr. Willia...|
|       0|    Moran, Mr. James|
|       0|McCarthy, Mr. Tim...|
|       0|Palsson, Master. ...|
|       1|Johnson, Mrs. Osc...|
|       1|Nasser, Mrs. Nich...|
|       1|Sandstrom, Miss. ...|
|       1|Bonnell, Miss. El...|
|       0|Saundercock, Mr. ...|
|       0|Andersson, Mr. An...|
|       0|Vestrom, Miss. Hu...|
|       1|Hewlett, Mrs. (Ma...|
|       0|Rice, Master. Eugene|
|       1|Williams, Mr. Cha...|
|       0|Vander Planke, Mr...|
|       1|Masselmani, Mrs. ...|
+--------+--------------------+
only showing top 20 rows



In [49]:
# Please describe those who survived and their names where the name includes "Mrs"

selected = titanic_df.select("Survived", "Name") \
                     .filter(titanic_df.Survived == 1) \
                     .filter(titanic_df.Name.contains("Mrs"))
selected.show()

+--------+--------------------+
|Survived|                Name|
+--------+--------------------+
|       1|Cumings, Mrs. Joh...|
|       1|Futrelle, Mrs. Ja...|
|       1|Johnson, Mrs. Osc...|
|       1|Nasser, Mrs. Nich...|
|       1|Hewlett, Mrs. (Ma...|
|       1|Masselmani, Mrs. ...|
|       1|Asplund, Mrs. Car...|
|       1|Spencer, Mrs. Wil...|
|       1|Harper, Mrs. Henr...|
|       1|Faunthorpe, Mrs. ...|
|       1|Nye, Mrs. (Elizab...|
|       1|Backstrom, Mrs. K...|
|       1|Doling, Mrs. John...|
|       1|Weisz, Mrs. Leopo...|
|       1|Hakkarainen, Mrs....|
|       1|Pears, Mrs. Thoma...|
|       1|"Watt, Mrs. James...|
|       1|Chibnall, Mrs. (E...|
|       1|"O'Brien, Mrs. Th...|
|       1| Pinsky, Mrs. (Rosa)|
+--------+--------------------+
only showing top 20 rows



In [51]:
# Select multiple columns and rename them with alias():

selected = titanic_df.select(titanic_df["Survived"].alias("is_alive"), titanic_df["Name"].alias("full_name"))
selected.show()

+--------+--------------------+
|is_alive|           full_name|
+--------+--------------------+
|       0|Braund, Mr. Owen ...|
|       1|Cumings, Mrs. Joh...|
|       1|Heikkinen, Miss. ...|
|       1|Futrelle, Mrs. Ja...|
|       0|Allen, Mr. Willia...|
|       0|    Moran, Mr. James|
|       0|McCarthy, Mr. Tim...|
|       0|Palsson, Master. ...|
|       1|Johnson, Mrs. Osc...|
|       1|Nasser, Mrs. Nich...|
|       1|Sandstrom, Miss. ...|
|       1|Bonnell, Miss. El...|
|       0|Saundercock, Mr. ...|
|       0|Andersson, Mr. An...|
|       0|Vestrom, Miss. Hu...|
|       1|Hewlett, Mrs. (Ma...|
|       0|Rice, Master. Eugene|
|       1|Williams, Mr. Cha...|
|       0|Vander Planke, Mr...|
|       1|Masselmani, Mrs. ...|
+--------+--------------------+
only showing top 20 rows



### withColumn function

Apache Spark's withColumn() method modifies the columns in the DataFrame or adds a new column to the DataFrame.

Example of adding a new column to a DataFrame:

```python

from pyspark.sql.functions import col

# Create DataFrame
data = [("John", 25), ("Jane", 22), ("Bob", 30)]
df = spark.createDataFrame(data, ["Name", "Age"])

# Add a new column to the DataFrame
df = df.withColumn("AgePlusTen", col("Age") + 10)

# Show result
df.show()

```

In this example, a new column named "AgePlusTen" has been added to the DataFrame, adding 10 to the value of the column "Age". The withColumn() method uses the name "AgePlusTen" as the third parameter for the mathematical expression required to create the new column.

Example of modifying an existing column:

```python
# Create DataFrame
data = [("John", 25), ("Jane", 22), ("Bob", 30)]
df = spark.createDataFrame(data, ["Name", "Age"])

# Change column in DataFrame
df = df.withColumn("Age", col("Age") + 10)

# Show result
df.show()

```

In this example, the value of column "Age" is incremented by 10. The withColumn() method uses the name "Age" as the first parameter, which specifies the name of the column to modify, then uses the value of the existing column as the second parameter using the col() function, and then adds 10 to get the modified value.

In [53]:
# Query Name, Age and Embarked columns using col function

titanic_df.select(("Name"), ("Age"),("Embarked")).show()

+--------------------+----+--------+
|                Name| Age|Embarked|
+--------------------+----+--------+
|Braund, Mr. Owen ...|22.0|       S|
|Cumings, Mrs. Joh...|38.0|       C|
|Heikkinen, Miss. ...|26.0|       S|
|Futrelle, Mrs. Ja...|35.0|       S|
|Allen, Mr. Willia...|35.0|       S|
|    Moran, Mr. James|null|       Q|
|McCarthy, Mr. Tim...|54.0|       S|
|Palsson, Master. ...| 2.0|       S|
|Johnson, Mrs. Osc...|27.0|       S|
|Nasser, Mrs. Nich...|14.0|       C|
|Sandstrom, Miss. ...| 4.0|       S|
|Bonnell, Miss. El...|58.0|       S|
|Saundercock, Mr. ...|20.0|       S|
|Andersson, Mr. An...|39.0|       S|
|Vestrom, Miss. Hu...|14.0|       S|
|Hewlett, Mrs. (Ma...|55.0|       S|
|Rice, Master. Eugene| 2.0|       Q|
|Williams, Mr. Cha...|null|       S|
|Vander Planke, Mr...|31.0|       S|
|Masselmani, Mrs. ...|null|       C|
+--------------------+----+--------+
only showing top 20 rows



In [54]:
# Example that adds an age category column to our data using the when function

from pyspark.sql.functions import when


df_with_age_category = titanic_df.withColumn("age_category",when(col("age") < 18, "child").otherwise("adult"))
df_with_age_category.show()


+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|age_category|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|       adult|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|       adult|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|       adult|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|       adult|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null

In [55]:
# Example that multiplies the age by -1

df_with_minus_age = titanic_df.withColumn("age",col("age") *-1)
df_with_minus_age.show()


+-----------+--------+------+--------------------+------+-----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|  age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+-----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|-22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|-38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|-26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|-35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|-35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male| null|    0|   

### withColumnRenamed

The withColumnRenamed function can be used to rename an existing column in the given DataFrame. The function takes two arguments: the first is the name of the current column, and the second is the new name to rename the column to.

An example of how to rename a column in the Titanic database:
```python
from pyspark.sql.functions import col

# df is the name of the Titanic DataFrame
df = df.withColumnRenamed("Pclass", "PassengerClass")

# Applying a new name to the PassengerClass column
df.show()

```

In [56]:
from pyspark.sql.functions import col

# data loading
titanic_df = spark.read.format("csv") \
    .option("header", "true") \
    .load("titanic.csv")

# Rename 'Name' column to 'PassengerName'
titanic_df = titanic_df.withColumnRenamed("Name", "PassengerName")

# print the new column
titanic_df.select(col("PassengerName")).show(5)


+--------------------+
|       PassengerName|
+--------------------+
|Braund, Mr. Owen ...|
|Cumings, Mrs. Joh...|
|Heikkinen, Miss. ...|
|Futrelle, Mrs. Ja...|
|Allen, Mr. Willia...|
+--------------------+
only showing top 5 rows



In [57]:
# Rename the "Embarked" column to "Port_of_Embarkation":

titanic_df = titanic_df.withColumnRenamed("Embarked", "Port_of_Embarkation")
titanic_df.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- PassengerName: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Port_of_Embarkation: string (nullable = true)



In [58]:
# Rename the column "SibSp" to "Siblings_or_Spouses_on_Board":

titanic_df = titanic_df.withColumnRenamed("SibSp", "Siblings_or_Spouses_on_Board")
titanic_df.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- PassengerName: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Siblings_or_Spouses_on_Board: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Port_of_Embarkation: string (nullable = true)



In [59]:
# Rename the column "Parch" to "Parents_or_Children_on_Board":

titanic_df = titanic_df.withColumnRenamed("Parch", "Parents_or_Children_on_Board")
titanic_df.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- PassengerName: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Siblings_or_Spouses_on_Board: string (nullable = true)
 |-- Parents_or_Children_on_Board: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Port_of_Embarkation: string (nullable = true)



In [60]:
# Rename multiple columns
renamed_df = titanic_df.withColumnRenamed("Age", "NewAge").withColumnRenamed("Name", "FullName")
renamed_df.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- PassengerName: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- NewAge: string (nullable = true)
 |-- Siblings_or_Spouses_on_Board: string (nullable = true)
 |-- Parents_or_Children_on_Board: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Port_of_Embarkation: string (nullable = true)



In [61]:
# Rename all columns
from functools import reduce
new_column_names = ["Col_" + str(i) for i in titanic_df.columns]
renamed_df = reduce(lambda df, idx: df.withColumnRenamed(df.columns[idx], new_column_names[idx]), range(len(titanic_df.columns)), titanic_df)
renamed_df.printSchema()

root
 |-- Col_PassengerId: string (nullable = true)
 |-- Col_Survived: string (nullable = true)
 |-- Col_Pclass: string (nullable = true)
 |-- Col_PassengerName: string (nullable = true)
 |-- Col_Sex: string (nullable = true)
 |-- Col_Age: string (nullable = true)
 |-- Col_Siblings_or_Spouses_on_Board: string (nullable = true)
 |-- Col_Parents_or_Children_on_Board: string (nullable = true)
 |-- Col_Ticket: string (nullable = true)
 |-- Col_Fare: string (nullable = true)
 |-- Col_Cabin: string (nullable = true)
 |-- Col_Port_of_Embarkation: string (nullable = true)



## Practice

In [62]:
# Example, multiplying the age by -1

df_with_minus_age = titanic_df.withColumn("age", titanic_df.Age * -1) # can use col('age') as well
df_with_minus_age.show()


+-----------+--------+------+--------------------+------+-----+----------------------------+----------------------------+----------------+-------+-----+-------------------+
|PassengerId|Survived|Pclass|       PassengerName|   Sex|  age|Siblings_or_Spouses_on_Board|Parents_or_Children_on_Board|          Ticket|   Fare|Cabin|Port_of_Embarkation|
+-----------+--------+------+--------------------+------+-----+----------------------------+----------------------------+----------------+-------+-----+-------------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|-22.0|                           1|                           0|       A/5 21171|   7.25| null|                  S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|-38.0|                           1|                           0|        PC 17599|71.2833|  C85|                  C|
|          3|       1|     3|Heikkinen, Miss. ...|female|-26.0|                           0|                           0|STON/O2. 31012

In [63]:
# Rename the column "Port_of_Embarkation" to "Embarked":

titanic_df = titanic_df.withColumnRenamed("Port_of_Embarkation", "Embarked")
titanic_df.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- PassengerName: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Siblings_or_Spouses_on_Board: string (nullable = true)
 |-- Parents_or_Children_on_Board: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [64]:
# Rename multiple columns NewAge -> Age, FullName-> Name
renamed_df = titanic_df.withColumnRenamed("NewAge", "Age").withColumnRenamed("FullName", "Name")
renamed_df.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- PassengerName: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Siblings_or_Spouses_on_Board: string (nullable = true)
 |-- Parents_or_Children_on_Board: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [65]:
titanic_df.show()

+-----------+--------+------+--------------------+------+----+----------------------------+----------------------------+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|       PassengerName|   Sex| Age|Siblings_or_Spouses_on_Board|Parents_or_Children_on_Board|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+----------------------------+----------------------------+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|  22|                           1|                           0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|  38|                           1|                           0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|  26|                           0|                           0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futre

### The groupBy and agg functions

The groupBy and agg operations within the PySpark DataFrame API group data by a given column value and then apply different aggregation functions to the groups. These operations are extremely useful when you want to perform complex data manipulations.

Here is an example of using groupby and agg with the PySpark DataFrame API:

```python
# Load data into DataFrame
from pyspark.sql.functions import *

data = [("Alice", "female", 25),
        ("Bob", "male", 30),
        ("Charlie", "male", 35),
        ("Diana", "female", 40),
        ("Emily", "female", 45)]

df = spark.createDataFrame(data, ["name", "gender", "age"])

# Group by gender and calculate average age
df.groupBy("gender").agg(avg("age").alias("average_age")).show()

```

In this example, we load a simple data set containing the names, gender and age of people into the DataFrame. We then use the groupBy and agg functions to group by gender and calculate the average age. The result will be as follows:

```python
+------+-----------+
|gender|average_age|
+------+-----------+
|female| 36.6      |
| male | 32.5      |
+------+-----------+
```
This indicates that women are on average 36.6 years old, while men are on average 32.5 years old.

Of course, groupBy and agg operations can be combined with other data manipulation functions such as filter, select or orderBy. By combining the individual operations, complex data manipulation chains can be created.


In [67]:
# Reload our dataframe

titanic_df = spark.read.format("parquet").load("titanic_parquet")

In [68]:
# Calculate the maximum age by sex and class

from pyspark.sql.functions import max

max_age_df = titanic_df.groupBy(['Sex', 'Pclass']).agg(max('Age').alias('MaxAge'))

max_age_df.show()


+------+------+------+
|   Sex|Pclass|MaxAge|
+------+------+------+
|  male|     3|  74.0|
|female|     3|  63.0|
|female|     1|  63.0|
|female|     2|  57.0|
|  male|     2|  70.0|
|  male|     1|  80.0|
+------+------+------+



In [69]:
# So can it be:

titanic_df.groupBy("Sex").agg({"Age": "avg", "Fare": "max"}).show()


+------+------------------+---------+
|   Sex|          avg(Age)|max(Fare)|
+------+------------------+---------+
|female|27.915708812260537| 512.3292|
|  male| 30.72664459161148| 512.3292|
+------+------------------+---------+



In [70]:
# Calculate the average price paid by all passengers per class:

from pyspark.sql.functions import avg

titanic_df.groupBy("Pclass").agg(avg("Fare")).show()


+------+------------------+
|Pclass|         avg(Fare)|
+------+------------------+
|     1| 84.15468749999992|
|     3|13.675550101832997|
|     2| 20.66218315217391|
+------+------------------+



### join function

The join operation allows you to join two different sets of data based on a common key. In the PySpark DataFrame API, different types of join operations are available: inner join, outer join, left outer join and right outer join.

Below is an example of how to join data in one table with data in another table.

Suppose you have two PySpark DataFrames containing the following data:
    
```python
# DataFrame 1
df1 = spark.createDataFrame([[
  ("John", "Doe", 35),
  ("Jane", "Doe", 30),
  ("Max", "Smith", 25),
], ["first_name", "last_name", "age"])

# DataFrame 2
df2 = spark.createDataFrame([
  ("Doe", "Marketing"),
  ("Smith", "Sales"),
], ["last_name", "department"])

```
In this example, we will link the data in table df1 to table df2 based on the last_name key. The join type will be inner join, i.e. only those rows that are found in both tables will be kept.
```python
# inner join
df3 = df1.join(df2, "last_name", "inner")

df3.show()
+---------+----------+---+----------+
|last_name|first_name|age|department|
+---------+----------+---+----------+
| Doe     | John     | 35| Marketing|
| Doe     | Jane     | 30| Marketing|
| Smith   | Max      | 25| Sales    |
+---------+----------+---+----------+

```
The join operation will return a new DataFrame in which the two tables have been joined based on the specified key. In the example above, the last_name column was used as the key. The joined table contains the columns first_name, age and department.

In [71]:
# Example of joining Titanic data between two DataFrames based on the "PassengerId" column:

# Let's read the data
titanic_df = spark.read.format("parquet").load("titanic_parquet")
titanic_df.show()

# Create another DataFrame containing the PassengerId and age
age_df = titanic_df.select("PassengerId", "Age")
age_df.show()

# Connect the two DataFrames by PassengerId
joined_df = titanic_df.join(age_df, "PassengerId","inner")
joined_df.show()


+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

## Practice

In [73]:
# Calculate the minimum age by sex and class

from pyspark.sql.functions import min

min_age_df = titanic_df.groupBy(["Age","Sex"]).agg(min('Age').alias('MinAge'))

min_age_df.show()

+----+------+------+
| Age|   Sex|MinAge|
+----+------+------+
|49.0|female|  49.0|
|55.0|female|  55.0|
|47.0|  male|  47.0|
|0.83|  male|  0.83|
|12.0|  male|  12.0|
|66.0|  male|  66.0|
|64.0|  male|  64.0|
|18.0|female|  18.0|
|36.0|  male|  36.0|
|44.0|  male|  44.0|
|48.0|female|  48.0|
|37.0|female|  37.0|
|34.0|  male|  34.0|
|60.0|female|  60.0|
|55.5|  male|  55.5|
|43.0|  male|  43.0|
|35.0|  male|  35.0|
|21.0|female|  21.0|
|41.0|female|  41.0|
|16.0|  male|  16.0|
+----+------+------+
only showing top 20 rows



In [74]:
# Calculates the average price paid by all passengers per class:

from pyspark.sql.functions import avg

titanic_df.groupBy("Pclass").agg(avg("Fare")).show()


+------+------------------+
|Pclass|         avg(Fare)|
+------+------------------+
|     1| 84.15468749999992|
|     3|13.675550101832997|
|     2| 20.66218315217391|
+------+------------------+

