# PySpark DataFrame Operations in Google Colab

# This notebook demonstrates various methods to convert a column from a PySpark DataFrame into a Python list.

# Step 1: Install PySpark

In [3]:
# Uncomment and run the following line in Google Colab to install PySpark.
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=c5034a9eddb111889c795dd6e34e615730f8c3d87f235f32b6366351d08424c8
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


# Step 2: Import the necessary libraries


In [4]:
from pyspark.sql import SparkSession

# Step 3: Initialize the Spark session

The `SparkSession.builder` method is used to configure and initialize a Spark session, which is the entry point to programming with Spark. Let's break down the parameters used in this method:

1. **`master("local[1]")`**:
   - **Purpose**: Specifies the master URL for the Spark application.
   - **Meaning**:
     - `"local"`: Indicates that the Spark application will run locally on a single machine.
     - `"[1]"`: Specifies that Spark should use only one thread for execution. This is useful for testing and development purposes. You can change the number of threads to `"[n]"` where `n` is the number of threads you want to use. For example, `"local[4]"` would use four threads.

2. **`appName('SparkByExamples.com')`**:
   - **Purpose**: Sets the name of the Spark application.
   - **Meaning**:
     - `'SparkByExamples.com'`: This is an arbitrary name for the application. It helps identify your application in the Spark UI or logs. You can name it anything that helps you recognize your application.

3. **`getOrCreate()`**:
   - **Purpose**: This method either retrieves an existing Spark session or, if none exists, creates a new one.
   - **Meaning**:
     - Ensures that you don't accidentally create multiple Spark sessions in your application. If a session already exists, it will return that session instead of creating a new one.


In [5]:
spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

# Step 4: Create a DataFrame

In [6]:
data = [
    ("James", "Smith", "USA", "CA"),
    ("Michael", "Rose", "USA", "NY"),
    ("Robert", "Williams", "USA", "CA"),
    ("Maria", "Jones", "USA", "FL")
]
columns = ["firstname", "lastname", "country", "state"]
df = spark.createDataFrame(data=data, schema=columns)

# Step 5: Show the DataFrame

In [7]:
df.show()

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|    James|   Smith|    USA|   CA|
|  Michael|    Rose|    USA|   NY|
|   Robert|Williams|    USA|   CA|
|    Maria|   Jones|    USA|   FL|
+---------+--------+-------+-----+



In [9]:
df.collect()

[Row(firstname='James', lastname='Smith', country='USA', state='CA'),
 Row(firstname='Michael', lastname='Rose', country='USA', state='NY'),
 Row(firstname='Robert', lastname='Williams', country='USA', state='CA'),
 Row(firstname='Maria', lastname='Jones', country='USA', state='FL')]

# Step 6: Convert Column to List Using RDD map

DataFrames can be converted to Resilient Distributed Datasets (RDDs), which are the fundamental data structures of Spark. The `rdd` property of a DataFrame returns the underlying RDD. The `map` function is an RDD transformation that applies a given function to each element of the RDD and returns a new RDD with the results. Let's break down the statement `states1 = df.rdd.map(lambda x: x[3]).collect()`:

**Breaking Down the Code**

1. **`df.rdd`**:
   - **Purpose**: Converts the DataFrame `df` to its underlying RDD.
   - **Meaning**:
     - `df` is your DataFrame. The `rdd` property accesses the RDD representation of this DataFrame.
   
2. **`.map(lambda x: x[3])`**:
   - **Purpose**: Applies a function to each element of the RDD.
   - **Meaning**:
     - `map` is a transformation operation in Spark that applies the given function (here, a lambda function) to each element of the RDD.
     - `lambda x: x[3]` is a lambda function that takes each row (`x`) of the RDD and extracts the element at index `3` (the fourth column in the DataFrame).
   
3. **`.collect()`**:
   - **Purpose**: Collects the results of the RDD transformations back to the driver program.
   - **Meaning**:
     - `collect` is an action in Spark that gathers all elements of the RDD and returns them as a list to the driver program.

**Summary**

The line `states1 = df.rdd.map(lambda x: x[3]).collect()`:

- Converts the DataFrame to an RDD.
- Maps each row of the RDD to its fourth column (state).
- Collects the list of states back to the driver program.

In essence, it extracts the "state" column from the DataFrame and returns it as a list.

In [10]:
states1 = df.rdd.map(lambda x: x[3]).collect()

In [11]:
states1  # Output: ['CA', 'NY', 'CA', 'FL']

['CA', 'NY', 'CA', 'FL']

# Step 7: Remove Duplicates Using OrderedDict

In [12]:
from collections import OrderedDict
res = list(OrderedDict.fromkeys(states1))

print(res)  # Output: ['CA', 'NY', 'FL']

['CA', 'NY', 'FL']


# Example 2: Convert Column to List Using DataFrame map

In [13]:
states2 = df.rdd.map(lambda x: x.state).collect()

print(states2)  # Output: ['CA', 'NY', 'CA', 'FL']

['CA', 'NY', 'CA', 'FL']


# Step 8: Convert Column to List Using select and collect

In [14]:
states3 = df.select(df.state).collect()
print(states3)  # Output: [Row(state='CA'), Row(state='NY'), Row(state='CA'), Row(state='FL')]

[Row(state='CA'), Row(state='NY'), Row(state='CA'), Row(state='FL')]


# Step 9: Flatten the List of Rows

The `flatMap` function in PySpark is an RDD transformation that maps each input element to an iterable (e.g., a list or another RDD) and then flattens the results. It applies the provided function to each element of the RDD and returns a new RDD by concatenating the results of the function.

**Breaking Down the Code**

1. **`df.select(df.state)`**:
   - **Purpose**: Selects the `state` column from the DataFrame.
   - **Meaning**:
     - Creates a new DataFrame containing only the `state` column.

2. **`.rdd`**:
   - **Purpose**: Converts the DataFrame to its underlying RDD.
   - **Meaning**:
     - Each row in the new DataFrame (containing only the `state` column) becomes a Row object in the RDD.

3. **`.flatMap(lambda x: x)`**:
   - **Purpose**: Applies a function to each element of the RDD and flattens the results.
   - **Meaning**:
     - `flatMap` is a transformation that takes a function as an argument.
     - `lambda x: x` is a lambda function that takes each Row object (`x`) and returns it as-is.
     - Because the Row object contains just one element (the `state` value), `flatMap` effectively extracts and flattens this value.

4. **`.collect()`**:
   - **Purpose**: Collects the results of the RDD transformations back to the driver program.
   - **Meaning**:
     - `collect` is an action that gathers all elements of the RDD and returns them as a list to the driver program.

**Summary**

The line `states4 = df.select(df.state).rdd.flatMap(lambda x: x).collect()`:

- Selects the `state` column from the DataFrame, resulting in a new DataFrame.
- Converts this new DataFrame to an RDD where each row is a Row object containing a single element (`state` value).
- Uses `flatMap` with a lambda function that extracts the `state` value from each Row object.
- Collects the list of state values back to the driver program.

In essence, `flatMap` is used to flatten the Row objects and extract the `state` values into a single list.

In [15]:
states4 = df.select(df.state).rdd.flatMap(lambda x: x).collect()
print(states4)  # Output: ['CA', 'NY', 'CA', 'FL']

['CA', 'NY', 'CA', 'FL']


# Step 10: Convert Column to List Using toPandas

In [16]:
states5 = df.select(df.state).toPandas()['state']
states6 = list(states5)
print(states6)  # Output: ['CA', 'NY', 'CA', 'FL']


['CA', 'NY', 'CA', 'FL']


# Step 11: Convert Multiple Columns to List Using toPandas

In [18]:
pandDF = df.select(df.state, df.firstname).toPandas()
print(list(pandDF['state']))      # Output: ['CA', 'NY', 'CA', 'FL']
print(list(pandDF['firstname']))  # Output: ['James', 'Michael', 'Robert', 'Maria']

['CA', 'NY', 'CA', 'FL']
['James', 'Michael', 'Robert', 'Maria']
