# Quickstart: DataFrame

This is a tutorial for the PySpark [DataFrame API](https://spark.apache.org/docs/3.4.2/api/python/index.html). 

PySpark DataFrames are lazily evaluated. They are implemented on top of RDDs. When Spark transforms data, it does not immediately compute the transformation but plans how to compute later. When actions such as `collect()` are explicitly called, the computation starts. 

This notebook shows the basic usages of the DataFrame, geared mainly for new users. You can run the latest version of these examples by yourself in ‘Live Notebook: DataFrame’ at the quickstart page. It is mostly inspired by [this tutorial](https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html).

Refer to https://spark.apache.org/docs/latest/api/python/reference/index.html for the API reference (bookmark it!).

PySpark applications start with initializing SparkSession which is the entry point of PySpark as below. In case of running it in PySpark shell via pyspark executable, the shell automatically creates the session in the variable spark for users.


In [None]:
from pyspark.sql import SparkSession
import pandas as pd
import pyarrow
import pyspark
import sys
import altair as alt

# Set rendering options
alt.renderers.enable("html")

print("Pandas version: ", pd.__version__)
print("Pyarrow version: ", pyarrow.__version__)
print("Pyspark version: ", pyspark.__version__)
print("Python version: ", sys.version)


# You may also use Spark Connect to connect to a remote cluster, but this may be limiting for this specific tutorial
# spark = SparkSession.builder.remote("sc://vlenpmod302spk1.hevs.ch:15002").getOrCreate()
spark = SparkSession.builder.master("local[4]").getOrCreate()

We also set up [black](https://github.com/psf/black), which is a highly encouraged best-practice for all your Python projects. That way, you never have to worry and debate about code formatting anymore. By using it, you agree to cede control over minutiae of hand-formatting. In return, Black gives you speed, determinism, and freedom from `pycodestyle` nagging about formatting. You will save time and mental energy for more important matters.

In [2]:
import jupyter_black

# Ensure that the black formatter is loaded
jupyter_black.load()

A PySpark DataFrame can be created via `pyspark.sql.SparkSession.createDataFrame` typically by passing a list of lists, tuples, dictionaries and `pyspark.sql.Row`s, a pandas DataFrame and an RDD consisting of such a list. `pyspark.sql.SparkSession.createDataFrame` takes the schema argument to specify the schema of the DataFrame. When it is omitted, PySpark infers the corresponding schema by taking a sample from the data.

Firstly, you can create a PySpark DataFrame from a list of rows:

In [None]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="string1", d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
        Row(a=2, b=3.0, c="string2", d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
        Row(a=4, b=5.0, c="string3", d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0)),
    ]
)
df

Create a PySpark DataFrame with an explicit schema:

In [None]:
df2 = spark.createDataFrame(
    [
        (1, 2.0, "string1", date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
        (2, 3.0, "string2", date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
        (3, 4.0, "string3", date(2000, 3, 1), datetime(2000, 1, 3, 12, 0)),
    ],
    schema="a long, b double, c string, d date, e timestamp",
)
df2

What happens if you pass data that are not compatible with the schema?

In [None]:
import pyarrow as pa

# Try here different types of data and describe what happens.

# The following leads to a `PySparkTypeError` because "4.0" cannot be cast to a double.
# Note however that passing a float to a string column will not raise an error, but the value will be cast to a string.
try:
    df2 = spark.createDataFrame(
        [
            (1, 2.0, "string1", date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
            (2, 3.0, "string2", date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
            (3, "4.0", "string3", date(2000, 3, 1), datetime(2000, 1, 3, 12, 0)),
        ],
        schema="a long, b double, c string, d date, e timestamp",
    )
except pyspark.errors.PySparkTypeError as e:
    print("PySparkTypeError: ", e)
except pa.lib.ArrowInvalid as e:
    print("ArrowInvalid: ", e)

Create a PySpark DataFrame from a pandas DataFrame:

In [None]:
df_pd = pd.DataFrame(
    {
        "a": [1, 2, 3],
        "b": [2.0, 3.0, 4.0],
        "c": ["string1", "string2", "string3"],
        "d": [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
        "e": [
            datetime(2000, 1, 1, 12, 0),
            datetime(2000, 1, 2, 12, 0),
            datetime(2000, 1, 3, 12, 0),
        ],
    }
)
df3 = spark.createDataFrame(df_pd)
df3

The DataFrames created above all have the same results and schema.

In [None]:
# All DataFrames above result same.
df.show()
df.printSchema()
df2.printSchema()
df3.printSchema()

## Viewing Data

The top rows of a DataFrame can be displayed using `DataFrame.show()`.

In [None]:
df.show(1)

Alternatively, you can enable `spark.sql.repl.eagerEval.enabled` configuration for the eager evaluation of PySpark DataFrame in notebooks such as Jupyter. The number of rows to show can be controlled via `spark.sql.repl.eagerEval.maxNumRows` configuration.

**Note**: we set this configuration for the purpose of this tutorial, but it is a good practice to explicitly use `.show()`.

In [None]:
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark.conf.set("spark.sql.repl.eagerEval.maxNumRows", 5)
df

The rows can also be shown vertically. This is useful when rows are too long to show horizontally.

In [None]:
df.show(1, vertical=True)

You can see the DataFrame’s schema and column names as follows:

In [None]:
df.columns

In [None]:
df.printSchema()

There is a parallelism between the Python API and SQL. Given the above schema, if you wanted to only select columns `a`, `b`, and `c`, you would write the following SQL:

```sql
SELECT a, b, c FROM df;
```

Equivalently, you can write the following Python code using the Dataframe API:

In [None]:
df.select("a", "b", "c")

DataFrame and Spark SQL share the same execution engine so they can be interchangeably used seamlessly. For example, you can register the DataFrame as a table and run a SQL easily as below:

In [None]:
df.createOrReplaceTempView("myTable")
spark.sql("SELECT a, b, c FROM myTable")

You can do all the good stuff using SQL (and we will see later how to write an equivalent query using the Dataframe API):

In [None]:
spark.sql("SELECT sum(a) as sumA, avg(b) as avgB FROM myTable GROUP BY c")

When you have a large dataframe and you want to understand what it contains, the `describe()` function comes in handy:

In [None]:
df.select("a", "b", "c").describe().show()

`DataFrame.collect()` collects the distributed data to the driver side as the local data in Python. Note that this can throw an out-of-memory error when the dataset is too large to fit in the driver side because it collects all the data from executors to the driver side.

In [None]:
df.collect()

Remember that `collect()` will fetch **all** data from **all** workers, thus leading to a large memory load. In order to avoid throwing an out-of-memory exception, use `DataFrame.take()` or `DataFrame.tail()`.

In [None]:
df.take(1)

PySpark DataFrame also provides the conversion back to a pandas DataFrame to leverage pandas API. Note that `toPandas` also collects all data into the driver side that can easily cause an out-of-memory-error when the data is too large to fit into the driver side.

In [None]:
df.toPandas()

## Selecting and Access Data

PySpark DataFrame is lazily evaluated and simply selecting a column does not trigger the computation but it returns a `Column` instance.

In [None]:
df.a

In [None]:
from pyspark.sql import Column
import pyspark.sql.functions as F

type(df.c) == type(F.upper(df.c)) == type(df.c.isNull())

These `Column`s can be used to select the columns from a DataFrame. For example, `DataFrame.select()` takes the `Column` instances that returns another DataFrame.

In [None]:
df.select(df.c).show()

You may also refer to columns by their name, or using `pyspark.sql.functions.col` when there is ambiguity (e.g., when complex expressions must be built):

In [None]:
# Three ways to select columns
df.select(F.col("a"), df.b, "c").show()

You can use `withColumn` to add a new column to a dataframe, e.g., computed from another one:

In [None]:
df.withColumn("upper_c", F.upper(df.c))

To select a subset of rows, use `DataFrame.filter()`.

In [None]:
df.filter(F.col("a") == 1)

## Manipulating Data

Up to now, we have seen very basic manipulations. We are going to go a bit deeper into data manipulations. We are going to create a slightly more complex dataset.

In [None]:
import random

colors = ["red", "blue", "black", "green"]
fruits = ["banana", "grape", "carrot", "pear", "apple"]

df = spark.createDataFrame(
    [
        [random.choice(colors), random.choice(fruits), idx, random.randint(0, 1000)]
        for idx in range(100)
    ],
    schema="color string, fruit string, id long, calories int",
)

df.take(2)

We can start by looking at transforming columns.

Say you want to create a new column that contains the concatenation of the color and the fruit:

In [None]:
df.select(F.concat(F.col("color"), F.lit(" "), F.col("fruit")))

Now, this is what we want, but the column name isn't really nice. We actually want to `alias` the column, as you would do in SQL with `SELECT CONCAT(color, fruit) AS name`:

In [None]:
df.select(F.concat(F.col("color"), F.lit(" "), F.col("fruit")).alias("name"))

OK, this looks better, but we lost all other columns. Let's fix this and overwrite the working dataframe:

In [None]:
df = df.select("*", F.concat(F.col("color"), F.lit(" "), F.col("fruit")).alias("name"))

df

There is a myriad of different [functions](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html) that one can use. Just as an example, here are a few things you can do:

In [None]:
df.select(
    # How many characters are in the `name` column
    F.length("name").alias("name_length"),
    # Convert the `color` column to uppercase
    F.upper("color").alias("COLOR"),
    # Levenshtein distance between the `color` and `fruit`
    F.levenshtein("color", "fruit").alias("color_to_fruit_dist"),
    # Logarithm of the `calories` column
    F.log2("calories").alias("log2_calories"),
    # Product of the number of characters in the `name` column and the `calories` column
    (F.length("name") * F.col("calories")).alias("complex_calc"),
)

Most of the above calculations are plain useless, but the point is that you can perform complex calculations on very large datasets, as all these operations are going to be automatically parallelized across all the workers.

### Applying custom functions

> **NOTE**: this section requires a local cluster as Pandas is not installed on the workers. 

There will be situations where there is no available built-in function to do what you want. In this case, you can use what are called User-Defined Functions (UDFs), which can be written in plain Python or in Pandas.

In [None]:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import udf


@udf("long")
def plus_one(x):
    return x + 1


@pandas_udf("long")
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Add one to the series
    return series + 1


df_udfs = df.select(
    # Use a plain UDF to add one to the `calories` column
    plus_one(F.col("calories").cast("long")).alias("calories_plus_one_udf"),
    # Use a Pandas UDF to add one to the `calories` column
    pandas_plus_one(F.col("calories").cast("long")).alias(
        "calories_plus_one_pandas_udf"
    ),
    # Which is of course equivalent to this:
    (F.col("calories") + 1).alias("calories_plus_one_sql"),
)

df_udfs.show()

We can test whether this is equivalent by filtering on rows that differ in the resulting dataframe:

In [None]:
num_rows_diff = df_udfs.filter(
    (F.col("calories_plus_one_udf") != F.col("calories_plus_one_sql"))
    | (F.col("calories_plus_one_pandas_udf") != F.col("calories_plus_one_sql"))
).count()

print(
    f"As expected, there are {num_rows_diff} rows where the UDF and SQL results differ."
)

This was pretty boring. Imagine instead that you want to have some dedicated logic to convert calories to string descriptions:

In [None]:
# Function to convert calories to a description
@udf("string")
def calories_to_descr_udf(calories: int) -> str:
    if calories < 100:
        return "low"
    elif calories < 500:
        return "medium"
    else:
        return "high"


@pandas_udf("string")
def calories_to_descr_pandas_udf(calories: pd.Series) -> pd.Series:
    # Preallocate a Series of the same length as the input and of type string
    descr = pd.Series(data="high", index=calories.index, dtype=str)

    # Assign the description based on the calorie content
    descr[calories < 100] = "low"
    descr[(calories >= 100) & (calories < 500)] = "medium"

    return descr


df.select(
    calories_to_descr_udf("calories").alias("calories_descr_udf"),
    calories_to_descr_pandas_udf("calories").alias("calories_descr_pandas_udf"),
).show()

Remember, it's generally recommended to use Spark's built-in functions when possible, and only resort to UDFs when necessary for custom logic that can't be achieved otherwise. In reality, the above can be achieved using Spark built-in functions as well:

In [None]:
df.select(
    F.when(F.col("calories") < 100, "low")
    .when((F.col("calories") >= 100) & (F.col("calories") < 500), "medium")
    .otherwise("high")
    .alias("calories_descr")
).show()

Let's time all solutions on a much bigger dataset.

In [None]:
import timeit

# Build a dataset with 5000 rows
df = spark.createDataFrame(
    [
        [random.choice(colors), random.choice(fruits), idx, random.randint(0, 1000)]
        for idx in range(50_000)
    ],
    schema="color string, fruit string, id long, calories int",
)

# Time the plain UDF solution
num_secs = timeit.timeit(
    stmt="df.select(calories_to_descr_udf('calories').alias('calories_descr')).collect()",
    globals=globals(),
    number=10,
)
print(f"UDF solution took {num_secs:.2f} seconds")

# Time the Pandas UDF solution
num_secs = timeit.timeit(
    stmt="df.select(calories_to_descr_pandas_udf('calories').alias('calories_descr')).collect()",
    globals=globals(),
    number=10,
)
print(f"Pandas UDF solution took {num_secs:.2f} seconds")

# Now let's see how long the SQL solution takes
stmt = """
df.select(
    F.when(F.col("calories") < 100, "low")
    .when((F.col("calories") >= 100) & (F.col("calories") < 500), "medium")
    .otherwise("high")
    .alias("calories_descr")
).collect()
""".strip()

num_secs = timeit.timeit(stmt=stmt, globals=globals(), number=10)
print(f"SQL solution took {num_secs:.2f} seconds")

Can you figure out what is going on here? Read [this article](https://bryancutler.github.io/vectorizedUDFs/) to understand a bit more about what's happening under the hood.

Again, there will be situations where you will want to use UDFs, but that should be the last recourse. You should also test your solutions before making a decision.

## Grouping and Joining Data

PySpark DataFrame also provides a way of handling grouped data by using the common approach, split-apply-combine strategy. It groups the data by a certain condition applies a function to each group and then combines them back to the DataFrame.

Grouping and then applying the `avg()` function to the resulting groups.

In [None]:
df.groupby("color").avg().show()

You can also apply a Python native function against each group by using pandas API.

For instance, here we shift the `calories` value by the mean of the group.

In [None]:
def remove_mean(data: pd.DataFrame) -> pd.DataFrame:
    return data.assign(calories=data.calories - data.calories.mean())


df.groupby("color").applyInPandas(remove_mean, schema=df.schema).show()

You can also perform joins. For instance, say you have a dataframe `orders` that contains orders from different customers with the amount, and a dataset `customers` that contains the name and location of each customer. 

In [None]:
orders = spark.createDataFrame(
    [(1, "A", 100), (2, "B", 200), (3, "A", 150), (4, "C", 300), (5, "B", 250)],
    ["order_id", "customer_id", "amount"],
)

customers = spark.createDataFrame(
    [
        ("A", "Alice", "New York"),
        ("B", "Bob", "Los Angeles"),
        ("C", "Charlie", "Chicago"),
        ("D", "David", "Houston"),
    ],
    ["customer_id", "name", "city"],
)

orders.show()
customers.show()

Let's say we want the total amount spent in each city:

In [None]:
# Join orders with customers, using the `customer_id` column, only keep the `amount` and `city` columns
orders_with_city = orders.select("customer_id", "amount").join(
    customers.select("customer_id", "city"), on="customer_id"
)

# Now group by `city` and calculate the sum of the `amount` column
orders_with_city.groupBy("city").sum().show()

Now create a dataset that has the following schema:

```
root
 |-- customer_id: string (nullable = true)
 |-- num_orders: long (nullable = false)
 |-- orders: array (nullable = false)
 |    |-- element: long (containsNull = false)
 |-- total_amount: long (nullable = false)
 ```

where `num_orders` is the number of orders for each customer, `orders` is a list of `order_id`s, and `total_amount` is the total amount spent by the customer (it should be zero if no order has been made by the customer).

Hint: you will need the aggregation function [`collect_list`](https://spark.apache.org/docs/3.5.2/api/python/reference/pyspark.sql/api/pyspark.sql.functions.collect_list.html).

In [40]:
# Write your solution here...

## Getting Data In and Out

CSV is straightforward and easy to use. Parquet and ORC are efficient and compact file formats to read and write faster.

There are many other data sources available in PySpark such as JDBC, text, binaryFile, Avro, etc. See also the latest [Spark SQL, DataFrames and Datasets Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html) in Apache Spark documentation.

The following command will allow you to read a dataframe that is actually the result that you should have obtained before:

In [None]:
df = spark.read.parquet("customers_summary.parquet")
df.show()

Check that the results are the same. 

Now, let's write it to CSV:

In [None]:
df.write.csv("customers_summary.csv")

Ooops! This doesn't work. Can you figure out why?

Anyway, this is why CSV should be generally avoided to store data. It isn't compressed, and it is not _columnar_. Over the years, a plethora of open-source data formats have been designed to support the needs of various applications. These formats can be _row_ or _column_ oriented and can support various forms of serialization and compression. 

The columnar data formats such as Parquet are a popular choice for fast analytics workloads. As opposed to row-oriented storage, columnar storage can significantly reduce the amount of data fetched from disk by allowing access to only the columns that are relevant for the particular query or workload. Moreover, columnar storage combined with efficient encoding and compression techniques can drastically reduce the storage requirements without sacrificing query performance.

For instance, with the following, Spark will perform a predicate pushdown and only read on disk the columns `customer_id` and `total_amount`:

In [None]:
df = spark.read.parquet("customers_summary.parquet").select(
    "customer_id", "total_amount"
)
df.show()

With CSV, it would have to load all the data in memory, and then drop what is not necessary.

## Using SQL

DataFrame and Spark SQL share the same execution engine so they can be interchangeably used seamlessly. If you remember, we had registered a table before, let's see if it is still there:

In [None]:
spark.sql("SHOW VIEWS;").show()

Great, it should be there under `mytable`. Let's see its schema:

In [None]:
spark.sql("DESCRIBE TABLE mytable;").show()

We can now run a query:

In [None]:
spark.sql("SELECT sum(a) as sumA, avg(b) as avgB FROM myTable GROUP BY c").show()

We can also get a reference to the table in Python world and then use the Dataframe API:

In [None]:
df = spark.table("myTable")
df.groupBy("c").agg(F.sum("a").alias("sumA"), F.avg("b").alias("avgB")).show()

We see a difference here: the Dataframe API automatically selects the column(s) used in the group-by, whereas SQL would require to an explicit selection. To get an exact match, we need to run:

In [None]:
spark.sql("SELECT c, sum(a) as sumA, avg(b) as avgB FROM myTable GROUP BY c").show()

## Working with window functions

Window functions are more advanced constructs, but they are critical when dealing with time-series data, which is going to be the case in the rest of this module. Window functions are powerful tools in PySpark that allow you to perform calculations across a set of rows that are related to the current row. They are particularly useful for tasks like running totals, rankings, and moving averages.

Assume for instance that we have a dataframe of employees, their role, and salary:

In [49]:
# Create a sample DataFrame
data = [
    ("Alice", "Sales", 5000),
    ("Bob", "Sales", 4000),
    ("Charlie", "Marketing", 4500),
    ("David", "Sales", 6000),
    ("Eve", "Marketing", 5500),
    ("Lisa", "Marketing", 5500),
    ("Meghan", "Sales", 5000),
]
df = spark.createDataFrame(data, ["name", "department", "salary"])

### Ranking Functions

Let's start with a simple window function to assign row numbers:

In [None]:
from pyspark.sql.window import Window

# Define the window: partition by department, order by salary
windowSpec = Window.partitionBy("department").orderBy("salary")

# Compute the row number in the window for each row
df_with_row_number = df.withColumn("row_number", F.row_number().over(windowSpec))

df_with_row_number.show()

What did we do here? We computed the number of each row the window it belongs:

```
+-------+----------+------+----------+
|   name|department|salary|row_number|
+-------+----------+------+----------+
|Charlie| Marketing|  4500|         1| <-- this row is the 1st in the 'Marketing' window, ordered by (ascending) salaries
|    Eve| Marketing|  5500|         2| <-- this row is the 2nd in the 'Marketing' window, order by (ascending) salaries
|   Lisa| Marketing|  5500|         3|
|    Bob|     Sales|  4000|         1|
                  ...
+-------+----------+------+----------+
```

Now, this is extremely useful if you are asked to report the top salaries by function in an organization. Write the query that would answer this question:

In [51]:
# Write a query that reports the top employee by salary in each department

There are actually several ranking functions:
- `row_number` just enumerates the row. If two rows have the same value used by the `orderBy`, they will be assigned different values in **a non-deterministic fashion**.
- `rank` calculates the rank of the row in the window in a similar way you would rank runners in a race. It means that if two rows have the same rank `n`, the next row will have the rank `n+2`.
- `dense_rank` is the same as `rank`, but it doesn't leave any gap: if two rows have the same rank `n`, the next row will have the rank `n+1`.

You can check that this is correct here:

In [None]:
(
    df.withColumn("row_number", F.row_number().over(windowSpec))
    .withColumn("rank", F.rank().over(windowSpec))
    .withColumn("dense_rank", F.dense_rank().over(windowSpec))
).show()

### Aggregate Functions

Similarly to a group-by, one can use `avg`, `sum`, etc. as window functions. For instance, if you want to compare the salary of a given person with the average salary in their department:

In [None]:
windowSpec = Window.partitionBy("department")

df_with_avg = df.withColumn(
    "avg_salary_in_department", F.avg("salary").over(windowSpec)
).show()

Now, write a query to identify people who are below one standard deviation of the average salary of their department:

In [None]:
# Write your query here

Your query should return `Charlie` and `Bob` as being the two underpaid employees.

### Lead and Lag Functions

Lead and lag functions allow you to access data from other rows relative to the current row. This requires an `orderBy` clause in the window definition:

In [None]:
windowSpec = Window.partitionBy("department").orderBy("salary")
df_with_lead_lag = df.withColumn(
    "next_salary", F.lead("salary", 1).over(windowSpec)
).withColumn("prev_salary", F.lag("salary", 1).over(windowSpec))

df_with_lead_lag.show()

These `lead` and `lag` functions are particularly useful when dealing with time-series, as one can compute differences between two successive samples.

In [None]:
# Create a time-indexed dataframe with a value and category column
data = [
    (datetime.fromisoformat("2020-01-01"), 100, "A"),
    (datetime.fromisoformat("2020-01-02"), 120, "A"),
    (datetime.fromisoformat("2020-01-03"), 130, "A"),
    (datetime.fromisoformat("2020-01-01"), 200, "B"),
    (datetime.fromisoformat("2020-01-02"), 210, "B"),
    (datetime.fromisoformat("2020-01-03"), 220, "B"),
]

df = spark.createDataFrame(data, ["date", "measurement", "sensor"])
df.show()

Write a query that computes the average daily variation of each sensor.

In [None]:
# Write your query here

The resulting dataframe should look like this:
```
+------+-------------+
|sensor|avg_variation|
+------+-------------+
|     A|         15.0|
|     B|         10.0|
+------+-------------+
```

## Visualizing data

Unfortunately, Spark does not offer any built-in visualization. One must first materialize data as Pandas dataframe and then plot them:

In [None]:
df.toPandas().plot.line(x="date", y="measurement")

There is a big problem with the above plot. What is it? Can you try to fix it and have a nicer plot?

In [60]:
# Explain what's wrong with this?

# Any idea how to fix it?

## Pivoting data

One problem here is that we have a dataframe in a so-called 'long' format: we have one row per sensor, and a column `sensor` that denotes which row belongs to which sensor. Plotting these data isn't very easy, as the points overlap.

Instead, we should have a dataframe in so-called 'wide' format, where one row corresponds to one measurement, with multiple columns per sensor. To achieve this, we need to perform a `pivot`, which consists in four steps:

1. Choose a pivot column: This column's unique values will become the new column headers. In our case, this is the `sensor` column.
2. Select a value column: The data from this column will fill the cells in the new table. In our case, this is the `measurement` column.
3. Identify a grouping column: This column (or columns) will define the rows of the new table. In our case, this is the `date` column.
4. Reshape the data: The pivoting operation then reorganizes your data based on these choices.

In PySpark, pivoting is expressed as an operation on a `GroupedData` expression, where the grouping key is the `date`. We then `pivot` on the `sensor` column, and aggregate `measurements` that correspond to a unique combination of `date` and `sensor`. Note that we use `F.first` here as we expect only one measurement per date and sensor, but we could in principle average, sum, or take the standard deviation across multiple measurements.

Before plotting the data, we must re-order by `date`, as the pivoting processing will shuffle the order of the rows. Finally, we can simply plot the resulting Pandas dataframe.

In [None]:
df_pd = (
    df.groupBy("date")
    .pivot("sensor")
    .agg(F.first("measurement"))
    .orderBy("date")
    .toPandas()
)

df_pd.plot.line(x="date")

The above plot used [Matplotlib](https://matplotlib.org/stable/) as backend, and it is rendered as a static image. Other backends are supported by Pandas and can be helpful for interactive visualization, such as [Plotly](https://plotly.com/python/pandas-backend/):

In [None]:
pd.options.plotting.backend = "plotly"

# Different backends might require different arguments
df_pd.plot.line(x="date", y=["A", "B"])

## A quick word about Pandas 

This tutorial has been using very small datasets, for which Spark isn't really fit-for-purpose. Very often, you might be tempted to simply do `toPandas()` and then work with the Pandas API. This is a **very bad idea** for at least two reasons:

1. The code that you produce will **NOT** scale to larger datasets. If you picked Spark in the first place, it is probably because you intend your code to scale up to hundreds of GBs, maybe dozens of TBs. Remember than whenever you do a `toPandas()`, you essentially bring back the result of your query in a single node (the master node), which can cause Out-Of-Memory (OOM) errors and will slow down massively your computation.
2. If you know that the result of your query will fit in memory, you might be tempted to work with Pandas anyway. Even then, I would strongly suggest you don't. Instead, use [Polars](https://pola.rs/), which is a much faster, easy-to-use, and robust library than Pandas. It is written in Rust (which makes it **a lot** faster than Pandas) and its [API is much closer to PySpark](https://docs.pola.rs/user-guide/migration/spark/) (and SQL) than the [rather poorly designed Pandas API](https://docs.pola.rs/user-guide/migration/pandas/).

In particular, Polars support lazy queries, similar to Spark, which allows it to perform query optimizations (in the same as the Catalyst engine does), work with larger-than-memory datasets using streaming, or catch schema errors before processing data. Generally speaking, its API is cleaner and better constructed than Pandas'. 

So, instead of a plain `toPandas()`, do the following:

In [None]:
import polars as pl

print("Polars version: ", pl.__version__)

# Convert a Spark dataframe to Polars
df_pl = pl.from_pandas(df.toPandas())
df_pl

In [None]:
# Plot the data using Polars and Altair, which doesn't require a pivot
df_pl.plot.line(x="date", y="measurement", color="sensor")

Polars can be a very effective solution for smaller dataset (up to a few GBs). If you are on a beefy server with lots of RAM and CPU cores, it can crunch datasets up to a few hundreds GBs.

### GPU acceleration

More recently, Polars and NVIDIA engineers started to collaborate on [RAPIDS](https://rapids.ai/), which aims at bringing [GPU acceleration to Polars DataFrames](https://pola.rs/posts/polars-on-gpu/). Using this strategy, Polars is able to fully utilize both the CPU and GPU (if available) significantly speeding up certain workloads. Note that a similar integration is being designed for [Apache Spark](https://nvidia.github.io/spark-rapids/).

For a list of curated resources about Polars, check out [Awesome Polars](https://github.com/ddotta/awesome-polars).

That's it, thanks a lot for following this tutorial!

# Improve me!

Now, the goal is that you extend or improve this tutorial so that the students have a better experience next year! Here are five things you can choose and focus on:

- **Optimizing Data Processing with Catalyst Optimizer and Tungsten**: Extend the tutorial to illustrate the optimization performed by the query optimizer (Catalyst) and the execution engine (Tungsten), in particular looking at the logical and physical plans.
- **Performance Tuning and Partitioning Strategies**: Explore and discuss performance tuning and partitioning strategies, especially when data are skewed. Generate scenarios wiht data skews and explore what happens in terms of performance with multiple workers.
- **Structured Streaming with Spark**: How to process real-time data by intesting a streaming dataset and applying real-time transformations, aggregations, or windowing operations. 
- **Window functions**: Improve the section on window functions with more didactic or more complex examples. 
- **Spark vs competitors**: Explore and discuss the differences between Apache Spark, [Polars](https://github.com/pola-rs/polars), and [DuckDB](https://duckdb.org/docs/installation/?version=stable&environment=cli&platform=linux&download_method=direct&architecture=x86_64). Compare the performance between these on the same queries over a benchmark dataset.

You will then get the opportunity to review the tutorial of a fellow student and learn something new in the process!