Today, we're going to dive into the basics of PySpark and the DataFrame API. Our goal is to set up and get familiar with PySpark API, focusing on the DataFrame API and advanced data operations such as filtering, joining, aggregating, and grouping. We'll also discuss what use cases PySpark is a good fit for.

In [0]:
# Starting a SparkSession

# from pyspark.sql import SparkSession

# spark = SparkSession.builder \
#     .appName("PySpark Advanced Basics") \
#     .getOrCreate()

In [0]:
# Demonstrate entrypoint via autocomplete
# spark.

We have set SparkSession available as the spark variable. It's our entrypoint into using Spark.

Now, let's load a dataset from the Databricks file system as a DataFrame. 

The "Wine Quality" dataset consists of 2 datasets, related to red and white vinho verde wine samples, from the north of Portugal. This dataset was originally used to model wine quality based on physicochemical tests.

Input variables (based on physicochemical tests):
1. fixed acidity
2. volatile acidity
3. citric acid
4. residual sugar
5. chlorides
6. free sulfur dioxide
7. total sulfur dioxide
8. density
9. pH
10. sulphates
11. alcohol  

Output variable (based on sensory data):  
12. quality (score between 0 and 10)

*Source:  
Paulo Cortez, University of Minho, Guimarães, Portugal, http://www3.dsi.uminho.pt/pcortez  
A. Cerdeira, F. Almeida, T. Matos and J. Reis, Viticulture Commission of the Vinho Verde Region(CVRVV), Porto, Portugal
@2009*

In [0]:
dbutils.fs.ls('/databricks-datasets')

In [0]:
%fs

ls databricks-datasets/wine-quality

path,name,size,modificationTime
dbfs:/databricks-datasets/wine-quality/README.md,README.md,1066,1594262736000
dbfs:/databricks-datasets/wine-quality/winequality-red.csv,winequality-red.csv,84199,1594262736000
dbfs:/databricks-datasets/wine-quality/winequality-white.csv,winequality-white.csv,264426,1594262736000


In [0]:
data_path = "dbfs:/databricks-datasets/wine-quality/winequality-red.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True, sep=";")

The above code reads a CSV file from the specified path and creates a DataFrame called df with the schema inferred from the data.

In [0]:
df.show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0.56|           1.9|    0.075|               17.0|           

#### What are DataFrames and why are they important in PySpark?

A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in Python's pandas library, but with optimizations for distributed processing and the ability to scale to big data.

DataFrames provide a high-level API for distributed data processing in PySpark. They allow us to perform complex data manipulations and analysis using concise and expressive syntax. Additionally, DataFrames in PySpark benefit from the Spark engine's optimizations, resulting in better performance compared to the lower-level RDD (Resilient Distributed Datasets) API.

**Nulls**

In [0]:
# import some standard functions
from pyspark.sql.functions import col, sum 

# df.select(col('quality')).show(2)
df.select(sum( col('quality').isNull().cast('int') ).alias('quality')).collect()[0]['quality']

Out[11]: 0

In [0]:
null_counts = df.select([sum( col(c).isNull().cast('int') ).alias(c) for c in df.columns]).collect()
null_counts = {c: null_counts[0][c] for c in df.columns}
print(null_counts)

{'fixed acidity': 0, 'volatile acidity': 0, 'citric acid': 0, 'residual sugar': 0, 'chlorides': 0, 'free sulfur dioxide': 0, 'total sulfur dioxide': 0, 'density': 0, 'pH': 0, 'sulphates': 0, 'alcohol': 0, 'quality': 0}


1. `col(c).isNull().cast('int')`: For each column `c`, the `isNull()` function returns a boolean value indicating whether the value is null or not. We then cast this boolean value to an integer, where `True` becomes `1` and `False` becomes `0`. This creates a DataFrame where the null values are represented as `1` and non-null values as `0`.

2. `[sum(col(c).isNull().cast('int')).alias(c) for c in df.columns]`: We use a list comprehension to apply the previous step for all columns in the DataFrame `df`. The `sum()` function is used to aggregate the 1s and 0s, calculating the total number of null values for each column. The `alias(c)` function is used to keep the original column name for the resulting DataFrame.

3. `df.select(...)`: We use the `select()` function to create a new DataFrame with the aggregated null counts for each column.

4. `null_counts = ...collect()`: The `collect()` function is used to retrieve the result of the null count calculation as a list of Row objects. In this case, there will only be one Row object because we've aggregated the data.

5. `{c: null_counts[0][c] for c in df.columns}`: We use a dictionary comprehension to convert the Row object into a dictionary, where the keys are the column names and the values are the null counts for each column.

6. `print(null_counts)`: Finally, we print the dictionary containing the null counts for each column.

**Sorting**

Sort the records based on the pH and residual sugar amount.

In [0]:
from pyspark.sql.functions import desc

sorted_df = df.orderBy(desc("pH"), desc("residual sugar"))
sorted_df.show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          5.4|            0.74|        0.0|           1.2|    0.041|               16.0|                46.0|0.99258|4.01|     0.59|   12.5|      6|
|          5.0|            0.74|        0.0|           1.2|    0.041|               16.0|                46.0|0.99258|4.01|     0.59|   12.5|      6|
|          4.6|            0.52|       0.15|           2.1|    0.054|                8.0|                65.0| 0.9934| 3.9|     0.56|   13.1|      4|
|          5.1|            0.47|       0.02|           1.3|    0.034|               18.0|           

**Filtering**

Filter the records to keep only those within the ideal pH range

In [0]:
filtered_df = df.filter((df["pH"] >= 3.4) & (df["pH"] <= 3.6))
filtered_df.show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.4|            0.66|        0.0|           1.8|    0.075|               13.0|                40.0| 0.9978|3.51|     0.56|    9.4|      5|
|          5.6|           0.615|        0.0|           1.6|    0.089|               16.0|           

**Window functions**

Next, let's look at window functions. They allow us to perform calculations across sets of rows that are related to the current row.  
In this example, we use a window function:

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc

window_spec = Window.partitionBy("quality").orderBy(desc("alcohol"))
ranked_df = df.withColumn("rank", row_number().over(window_spec))
ranked_df.show()


+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+----+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|rank|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+----+
|          8.3|            1.02|       0.02|           3.4|    0.084|                6.0|                11.0|0.99892|3.48|     0.49|   11.0|      3|   1|
|          7.6|            1.58|        0.0|           2.1|    0.137|                5.0|                 9.0|0.99476| 3.5|      0.4|   10.9|      3|   2|
|          7.4|           1.185|        0.0|          4.25|    0.097|                5.0|                14.0| 0.9966|3.63|     0.54|   10.7|      3|   3|
|          7.1|           0.875|       0.05|           5.7|    0.082| 

The output DataFrame contains an additional "rank" column. The rank column assigns a unique rank to each row within the group of rows with the same "quality". The ranking is determined by the "alcohol" column, with higher values receiving a higher rank (1 being the highest rank in each group).

**Summary statistics**

In [0]:
summary = df.describe()

summary.select("summary", "fixed acidity", "residual sugar").show()

+-------+------------------+------------------+
|summary|     fixed acidity|    residual sugar|
+-------+------------------+------------------+
|  count|              1599|              1599|
|   mean| 8.319637273295838|2.5388055034396517|
| stddev|1.7410963181276948|  1.40992805950728|
|    min|               4.6|               0.9|
|    max|              15.9|              15.5|
+-------+------------------+------------------+



**Using SQL queries**

Lastly, we'll cover how to use SQL queries with DataFrames.  
Here, we create a temporary view called "people" from our DataFrame df and then execute an SQL query to calculate the average age for each occupation group.

In [0]:
# Register the DataFrame as a temporary view
df.createOrReplaceTempView("wine")

# Example: Counting the number of matches and non-matches
results = spark.sql("SELECT quality, COUNT(*) as count FROM wine GROUP BY quality")
results.show()

+-------+-----+
|quality|count|
+-------+-----+
|      6|  638|
|      3|   10|
|      5|  681|
|      4|   53|
|      8|   18|
|      7|  199|
+-------+-----+



In this module, we've covered the basics of PySpark and the DataFrame API. We've learned how to set up a SparkSession, load data, perform data operations, and use SQL queries. Now that you're familiar with the DataFrame API, we'll move on to using PySpark for natural language processing tasks in the next module.