# Spark Dataframes

- look like pandas dataframes
- share some of the same methods and syntax
- but they are 2 seperate types of objects

Create Spark Session

In [None]:
import pyspark

spark = pyspark.sql.SparkSession.builder.getOrCreate()

## Create Dataframes

Create a spark dataframe from a pandas dataframe. 

As a reminder, there are multiple ways to create a pandas dataframe. 
Below, we call out 2 methods: 

1. From a dictionary-like object, where we provide the values by columns. 
2. From an array-like object, where we provide values by rows. 

In [None]:
import pandas as pd
import numpy as np

# Create pandas dataframe by columns using dictionary-like object

pd_df = pd.DataFrame({'col1': ['r1c1', 'r2c1', 'r3c1'], 
                      'col2': ['r1c2', 'r2c2', 'r3c2'], 
                      'col3': ['r1c3', 'r2c3', 'r3c3']}, 
                     index = [1, 2, 3])

pd_df

In [None]:
# create pandas dataframe by rows
pd_df = pd.DataFrame([['r1c1', 'r1c2', 'r1c3'], 
                      ['r2c1', 'r2c2', 'r2c3'], 
                      ['r3c1', 'r3c2', 'r3c3']], 
                     index = [1, 2, 3], 
                     columns = ['col1', 'col2', 'col3'])

pd_df

In [None]:
# set random seed for reproducing our dataframe creation using np.random.choice
np.random.seed(456)

# create dataframe from dictionary, where column 1, 'n', are numbers 0 - 19 and 
# column 2, 'group' is a random letter of a, b, or c. 
col1 = np.arange(20)
col2 = np.random.choice(list("abc"), 20)
my_dict = dict(n=col1, group=col2)

# create pandas dataframe from the dictionary, my_dict. 
pd_df = pd.DataFrame(my_dict)
pd_df.head()

Create Spark Dataframe from Pandas Dataframe

In [None]:
sp_df = spark.createDataFrame(pd_df)
sp_df

In [None]:
pd_df

- We do see the column names, but we don't see the data. Why?
- Because spark is lazy, in that it won't show us values until it has to. 
- To peek, use .show
- .show defaults to 20

In [None]:
sp_df.show(2)

Read Data from files

In [None]:

pd_v_spark = pd.DataFrame([['pd.read_csv("myfile.csv")', 
                            'spark.read.load("myfile.csv", format = "csv", sep = ",")'], 
                           ['pd.read_json("myfile.json")', 
                            'spark.read.load("myfile.json", format = "json") OR spark.read.json("myfile.json")']], 
                          index = ['csv', 'json'], 
                          columns = ['pandas', 'spark'])

# to display and see all text in dataframe
pd.set_option('display.max_colwidth', 10000)


pd_v_spark

Summarize Data

In [None]:
pd_v_spark = pd_v_spark.append(pd.DataFrame([['pd_df.head()', 'sp_df.show(), .head(), .take()'],
                                             ['pd_df.head(1)', 'sp_df.first()'],
                                             ['pd_df.describe()', 'sp_df.describe()'],
                                             ['pd_df.columns', 'sp_df.columns'],
                                             ['len(pd_df)', 'sp_df.count()'],
                                             ['len(pd_df.drop_duplicates())', 'sp_df.distinct().count()'],
                                             ['pd_df.info()', 'sp_df.printSchema()']
                                            ],
                                            index = ['1st n rows', '1st row','summary statistics', 
                                                     'column names', '# rows', '# distinct rows', 
                                                     'df schema info'], 
                                            columns = ['pandas', 'spark']))

In [None]:
pd_v_spark

Let's use a dataset with more realistic looking data to explore...

In [None]:
from pydataset import data

mpg_pd = data("mpg")
mpg_pd.head(5)

In [None]:
mpg = spark.createDataFrame(data("mpg"))
mpg.show(5)

## Columns

- The following will create a series from a pandas dataframe, but a column object from a spark dataframe. 
- A column object represents a vertical slice of a dataframe, but does not contain the data itself. 
- You will use it to perform functions on and reference that column. 

In [None]:
mpg_pd.year

In [None]:
mpg.year

Select columns

In [None]:
pd_v_spark = pd_v_spark.append(pd.DataFrame([['pd_df[["col1", "col2"]]', 'sp_df.select(sp_df.col1, sp_df.col2)']
                                            ],
                                            index = ['select columns'], 
                                            columns = ['pandas', 'spark']))
pd_v_spark

In [None]:
mpg.select(mpg.hwy, mpg.cty, mpg.model)

Why can't I see the data?

In [None]:
mpg.select(mpg.hwy, mpg.cty, mpg.model).show()

Column objects support operations such as arithmetic operators

In [None]:
mpg.hwy + 1

In [None]:
mpg.select(mpg.hwy, mpg.hwy + 1).show(2)

Once we have a column object, we can use the .alias method to rename it. 

In [None]:
mpg.select(mpg.hwy.alias('highway_mileage'), 
           (mpg.hwy + 1).alias("hwy_mileage_plus1")).show(2)

We can also store column objects in variables and reference them

In [None]:
col1 = mpg.hwy.alias("highway_mileage")
col2 = (mpg.hwy / 2).alias("highway_mileage_halved")
mpg.select(col1, col2).show(5)

In addition to the syntax we've seen above, we can create columns with the `col` and `expr` functions from `pyspark.sql.functions` module.

**col**

In [None]:
from pyspark.sql.functions import col, expr
col("hwy")
# mpg.hwy

In [None]:
col("class")

The column object produced by the col function is the same as the the previous column object we saw.

In [None]:
avg_column = (col("hwy") + col("cty")) / 2
avg_column

In [None]:
mpg.select(
    col("hwy").alias("highway_mileage"),
    mpg.cty.alias("city_mileage"),
    avg_column.alias("avg_mileage"),
).show(5)

`avg_mileage` is created by using the col function to produce pyspark Column objects and using the arithmetic operators to combine them.

**expr**

- Does everything col does and more
- Returns the same type of column object
- But also allows us to express manipulations to the column within the string that defines the column.

In [None]:
mpg.select(
    expr("hwy"),  # the same as `col`
    expr("hwy + 1"),  # an arithmetic expression col("hwy") + 1
    expr("hwy AS highway_mileage"),  # using an alias col("hwy").alias("highway_mileage")
    expr("hwy + 1 AS highway_incremented"),  # a combination of the above (col("hwy") + 1).alias()
).show(5)

Note that all the columns created below are identical, and which syntax to use is merely a style choice.

In [None]:
mpg.select(
    mpg.hwy.alias("highway"),
    col("hwy").alias("highway"),
    expr("hwy").alias("highway"),
    expr("hwy AS highway"),
).show(5)

## Spark SQL

- Spark SQL allows us to write SQL queries against our spark dataframes.  
- We'll first "register" the table with spark with `sp_df.createOrReplaceTempView('sp_df')`.  

In [None]:
mpg.createOrReplaceTempView("mpg_view")

- Now we can write a sql query against the mpg table.  

In [None]:
spark.sql(
    """
SELECT hwy, cty, (hwy + cty) / 2 AS avg
FROM mpg_view
"""
).show()

- The resulting value is another dataframe. 
- To see the values, we have to ...

In [None]:
spark.sql(
    """
SELECT hwy, cty, (hwy + cty) / 2 AS avg
FROM mpg
"""
).show()

**Note:** All of these methods for creating / manipulating dataframes are the same in terms of performance. The resulting dataframes get turned into the same spark code that gets executed on the JVM, so it really is just a style choice as to which to use.

## Type Casting

View column datatypes:  

In [None]:
mpg.dtypes

In [None]:
mpg.printSchema()

To convert from one type to another use the `.cast` method on a column.

In [None]:
mpg.select(mpg.hwy.cast("string")).printSchema()

If a value is not able to be converted, it will be replaced with null:

In [None]:
mpg.select(mpg.model, mpg.model.cast("int")).show(5)

## Basic Built-in Functions

There are many other functions beyond col and expr within the pyspark.sql.functions module for operating on pyspark dataframe columns.

- `concat`: to concatenate strings  
- `sum`: to sum a group  
- `avg`: to take the average of a group  
- `min`: to find the minimum  
- `max`: to find the maximum  

**Note**: importing the sum, min and max functions directly will override the built-in sum, min and max functions. This means you will get an error if you try to sum a list of numbers, because sum will reference the relative pyspark function, which works with pyspark dataframe columns, while the relative built-in function works with lists of numbers.

In [None]:
# Note: The pyspark avg and mean functions are aliases of eachother

from pyspark.sql.functions import concat, sum, avg, 
min, max, count, mean

It very common to see something like:  

`import pyspark.sql.functions as F`

which will import all of the functions from the `pyspark.sql.functions` module.

In [None]:
mpg.select(
    (sum(mpg.hwy) / count(mpg.hwy)).alias("average_1"),
    avg(mpg.hwy).alias("average_2"),
    min(mpg.hwy),
    max(mpg.hwy),
).show()

In [None]:
mpg.select(concat(mpg.manufacturer, mpg.model)).show(5)

In order to use a string literal as part of our select, we'll need to use the `lit` function, otherwise spark will try to resolve our string as a column.

In [None]:
from pyspark.sql.functions import lit
mpg.select(concat(mpg.cyl, lit(" cylinders")).alias("cylinders")).show(5)

## String Manipulation PySpark Functions

In order to demonstrate these functions we'll create a dataframe with some text data.

In [None]:
from pyspark.sql.functions import regexp_extract, regexp_replace

In [None]:
textdf = spark.createDataFrame(
    pd.DataFrame(
        {
            "address": [
                "600 Navarro St ste 600, San Antonio, TX 78205",
                "3130 Broadway St, San Antonio, TX 78209",
                "303 Pearl Pkwy, San Antonio, TX 78215",
                "1255 SW Loop 410, San Antonio, TX 78227",
            ]
        }
    )
)

textdf.show(truncate=False)

`regexp_extract`: specify a regular expression with at least one capture group, and create a new column based on the contents of a capture group.


- first argument: the name of the string column to extract from.  
- second argument: the regular expression itself.  
- last argument: specifies which capture group we want to use. If, for example, our regular expression had 2 capture groups in it and we wanted the contents of the 2nd group, we would specify a 2 here.


In [None]:
textdf.select(
    "address",
    regexp_extract("address", r"^(\d+)", 1).alias("street_no"),
    regexp_extract("address", r"^\d+\s([\w\s]+?),", 1).alias("street"),
).show(truncate=False)

`regexp_replace` lets us make substitutions based on a regular expression.

Below, we obtain just the city, state, and zip code of the address by replacing everything up to the first comma with an empty string.

In [None]:
textdf.select(
    "address",
    regexp_replace("address", r"^.*?,\s*", "").alias("city_state_zip"),
).show(truncate=False)

## Conditional Subsetting and Filtering of Dataframes

`.filter` and `.where` both allow us to select a subset of the rows of our dataframe.

In [None]:
pd_v_spark = pd_v_spark.append(pd.DataFrame([['pd_df[pd_df.c1 > 0]', 'sp_df.filter(df.c1 > 0), sp_df.where(df.c1 > 0)'],
                                            ],
                                            index = ['conditional filtering'], 
                                            columns = ['pandas', 'spark']))
pd_v_spark

In [None]:
mpg.filter(mpg.cyl == 4).where(mpg["class"] == "subcompact").show()

## Conditional Assigning of Values

Spark => when :  Excel => IF : SQL => CASE...WHEN : Python => numpy.where

- Specify a condition, and a value to produce if that condition is true

In [None]:
pd_v_spark = pd_v_spark.append(pd.DataFrame([['np.where(pd_df.c1.array > 0, "positive")', 
                                              'sp_df.select(df.c1, when(df.c1 > 0, "positive").alias("number_sign"))'],
                                            ],
                                            index = ['conditional assigning'], 
                                            columns = ['pandas', 'spark']))
pd_v_spark

In [None]:
from pyspark.sql.functions import when

mpg.select(mpg.hwy, when(mpg.hwy > 25, "good_mileage").alias("mpg_desc")).show(12)

- If the condition we specified is false, null will be produced.   
- Use the `.otherwise` method to specify a value to use if our condition is false  

In [None]:
pd_v_spark = pd_v_spark.append(pd.DataFrame([['np.where(pd_df.c1.array > 0, "pos", "neg")', 
                                              'sp_df.select(df.c1, when(df.c1 > 0, "pos").otherwise("neg").alias("number_sign"))'],
                                            ],
                                            index = ['conditional assigning with else'], 
                                            columns = ['pandas', 'spark']))
pd_v_spark

In [None]:
mpg.select(
    mpg.hwy,
    when(mpg.hwy > 25, "good_mileage")
    .otherwise("bad_mileage")
    .alias("mpg_desc"),
).show(12)

- To specify multiple conditions, we can chain `.when` calls.   
- The first condition that is met will be the value that is used.  
- If none of the conditions are met the value specified in the .otherwise will be used (or null if you don't provide a .otherwise).  

In [None]:
mpg.select(
    mpg.displ,
    (
        when(mpg.displ < 2, "small")
        .when(mpg.displ < 3, "medium")
        .otherwise("large")
        .alias("engine_size")
    ),
).show(10)

Notice here that a car with a displ of 1.8 matches both conditions we specified, but small is produced because it is associated with the first matching condition. For any value between 2 and 3, medium will be produced, and anything larger than 3 will produce large.

## Sorting and Ordering

- Sort the rows by one or more columns with two methods: `.sort` and `.orderBy`. 
- `.sort` and `.orderBy` are aliases of each other and do the exact same thing. 
- Takes in a Column object or a string that is the name of a column.
- By default, values are sorted in ascending order.    

In [None]:
mpg.sort(mpg.hwy).show(8)

- To sort in descending order, we can use the `.desc` method on any Column object, or the `desc` function from `pyspark.sql.functions`

In [None]:
from pyspark.sql.functions import asc, desc

mpg.sort(mpg.hwy.desc())
# is the same as
mpg.sort(col("hwy").desc())
# is the same as
mpg.sort(desc("hwy")).show(5)

- To specify sorting by multiple columns, we provide each column as a separate argument to `.sort`.  

In the example below: 

1. Reverse alphabetically by the vehicle's class   
2. By the number of cylinders from lowest to highest  
3. By the vehicle's highway mileage, from greatest to smallest  

In [None]:
mpg.sort(desc("class"), mpg.cyl.asc(), col("hwy").desc()).show()

In [None]:


pd_v_spark = pd_v_spark.append(pd.DataFrame([['pd_df.sort_values(by=["c1"])', 
                                              'sp_df.sort(sp_df.c1)'],
                                             ['pd_df.sort_values(by=["c1","c2"])',
                                              'sp_df.sort(sp_df.c1, sp_df.c2)'],
                                             ['pd_df.sort_values(by=["c1","c2"], ascending=[False, True])',
                                              'sp_df.sort(sp_df.c1.desc(), sp_df.c2)'],
                                             ['pd_df.sort_values(by=["c1","c2"], ascending=False)', 
                                              'sp_df.sort(desc("c1"), desc("c2")) OR sp_df.sort(col("c1").desc(), col("c2").desc())']
                                            ],
                                            index = ['sort 1 col asc', 'sort 2+ cols asc', 'sort 2+ cols desc/asc', 'sort 2+ cols desc'], 
                                            columns = ['pandas', 'spark']))
pd_v_spark

## Grouping and Aggregating

- To aggregate our data by group, use the `.groupBy` method.  
- Like with .select and .sort, we can pass either Column objects or strings that are column names to .groupBy.  
- All of the expressions below are equivalent.

In [None]:
mpg.groupBy(mpg.cyl)
mpg.groupBy(col("cyl"))
mpg.groupBy("cyl")

- Once the data is grouped, specify an aggregation.    
- We can use one of the aggregate functions we imported earlier, along with a column  

In [None]:
mpg.groupBy(mpg.cyl).agg(avg(mpg.cty), avg(mpg.hwy)).show()

- To group by multiple columns, pass each of the columns as a separate argument to .groupBy.   
- This is different from pandas, where we would need to pass a list.  

In [None]:
mpg.groupBy("cyl", "class").agg(avg(mpg.cty), avg(mpg.hwy)).show()

- In addition to `.groupBy`, we can use `.rollup`, which will do the same aggregations, but will also include the overall total.  
- Below the null value in cyl indicates the total count.  

In [None]:
mpg.rollup("cyl").count().sort("cyl").show()

- Use `.rollup` to compute average by group with an overall average
- The null row represents the overall average highway mileage.

In [None]:
mpg.rollup("cyl").agg(expr("avg(hwy)")).sort("cyl").show()

mpg.rollup("cyl").agg(avg(mpg.hwy)).sort("cyl").show()

- You can rollup to multiple columns.  
- Where cyl = null you see the overall average.  
- Where cyl = n and class = null, you have the average across all classes for each cylinder value.  

In [None]:
mpg.rollup("cyl", "class").mean("hwy").sort(col("cyl"), col("class")).show()

## Crosstabs and Pivot Tables  

- Another way to aggregate is by `.crosstab`.    
- Similar to pandas `.crosstab` function, in that it calculates the number of occurrences of each unique value from the two passed columns.    
- `.crosstab` does counts.  
- For a different aggregation, use `.pivot`.  

In [None]:
mpg.crosstab("class", "cyl").show()

To find the average highway mileage for each combination of car class and number of cylinders, we could use `.pivot`.  

In [None]:
mpg.groupby("class").pivot("cyl").mean("hwy").sort(col("class")).show()

You can see how this is a reshape of the following: 

In [None]:
mpg.groupBy("class", "cyl").mean("hwy").sort(col("class"), col("cyl")).show()

You can see from above:   
- The unique values from the column we group by will be the rows in the resulting dataframe.  
- The unique values from the column we pivot on will become the columns.  
- The values in each cell will be equal to the aggregation we specified over the group of values defined by the intersection of the rows and the columns.  

## Handling Missing Data  

Let's take a look at how spark handles missing data. First we'll create a dataframe that has a few missing values:  

In [None]:
df = spark.createDataFrame(
    pd.DataFrame(
        {"x": [1, 2, np.nan, 4, 5, np.nan], "y": [np.nan, 0, 0, 3, 1, np.nan]}
    )
)
df.show()

Spark provides two main ways to deal with missing values:

- `.fill`: to replace missing values with a specified value  
- `.drop`: to drop rows containing missing values  
- Both methods are accessed through the `.na` property. We'll look at some examples below:  

In [None]:
df.na.drop().show()

In [None]:
df.na.fill(0).show()

For both methods, we can specify that we only want to fill or drop values in a specific column with a second argument:

In [None]:
df.na.fill(0, subset="x").na.fill(-1, subset="y").show()

Notice that above the na values in the x column were filled with 0, but the na values in y were left alone.

In the example below, the rows with an na value for the y column will be dropped, but the rows with na values for only the x column will remain.  

In [None]:
df.na.drop(subset="y").show()

## DataFrame Transformations

The .explain method will show us how spark is thinking about our dataframe.

In [None]:
mpg.explain()

For our basic example, we see that there is only a single step.

In [None]:
mpg.select(mpg.cyl, mpg.hwy).explain()

Here we are doing a more advanced select calculation, but this is still just a single step to spark.

In [None]:
mpg.filter(mpg.cyl == 6).explain()

Notice that our filter is also a single step.

Without reading ahead, do you think the execution plan for the two dataframes below will be the same or not?

In [None]:
mpg.select("cyl", "hwy").filter(expr("cyl = 6")).explain()
mpg.filter(expr("cyl = 6")).select("cyl", "hwy").explain()

Notice that even though we specified the transformations (.select and .filter) in a different order, we end up with the same output when we call .explain. This is because spark will look at our dataframe and transform it into the most efficient representation possible.

In [None]:
mpg.selectExpr("cyl + 3 * 16 / 4 + 19 AS unused", "hwy").select(
    "hwy"
).explain()

Notice here that we have 2 seperate select statements, but spark will condense this down to a single Project, as it is smart enough to realize that it doesn't actually need to do all the arithmetic we specified in the first select, since we arent using that value later on.

In [None]:
mpg.select(min(mpg.cyl)).explain()

Notice now that the execution plan gets much more complicated. This is because in steps prior, we were applying transformations that applied to each row individually. To calculate a minimum, we have to look at all the rows in the dataset to find the smallest.

In [None]:
mpg.groupby(mpg.cyl).agg(min(mpg.hwy), max(mpg.hwy)).explain()

In [None]:
(
    mpg.select(col("cyl"), expr("(cty + hwy) / 2 AS avg_mpg"))
    .filter(expr('class == "compact"'))
    .groupby("cyl")
    .agg(min("avg_mpg"), avg("avg_mpg"), max("avg_mpg"))
    .explain()
)

## More Dataframe Manipulation Examples

Let's take a look at some more examples of working with spark dataframes. For these examples, we'll be working with a dataset of observations of the weather in seattle.

In [None]:
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(6)

Let's first find the dates where the data starts and stops:

In [None]:
min_date, max_date = weather.select(min("date"), max("date")).first()
min_date, max_date


- `.select` to select the minimum date and the maximum date. 
- `.first` returns us the first row of our results, which consists of two values, and so can be unpacked into the min_date and max_date variables.  
- Combine the temp max and min columns into a single column, temp_avg.  

In [None]:
weather = weather.withColumn(
    "temp_avg", expr("ROUND(temp_min + temp_max) / 2")
).drop("temp_max", "temp_min")
weather.show(6)

Now we will calculate the total amount of rainfall for each month. We'll do this by first creating a month column, then grouping by the month, and finally, aggregating by taking the sum of the precipitation. To do this we will need to use the month function.


In [None]:
from pyspark.sql.functions import month, year, quarter

(
    weather.withColumn("month", month("date"))
    .groupBy("month")
    .agg(sum("precipitation").alias("total_rainfall"))
    .sort("month")
    .show()
)

Let's now take a look at the average tempurature for each type of weather in December 2013:

In [None]:
(
    weather.filter(month("date") == 12)
    .filter(year("date") == 2013)
    .groupBy("weather")
    .agg(mean("temp_avg"))
    .show()
)

Here we first have a couple of .filter calls in order to restrict our data to December of 2013. We then group by the weather column, and lastly, aggregate by taking the average of our temp_avg column. The combination of group by and agg will calculate the average tempurature for each unique value of the weather column.

Let's now find out how many days had freezing tempuratures in each month of 2013.


In [None]:

(
    weather.filter(year("date") == 2013)
    .withColumn("freezing_temps", (weather.temp_avg <= 0).cast("int"))
    .withColumn("month", month("date"))
    .groupBy("month")
    .agg(sum("freezing_temps").alias("no_of_days_with_freezing_temps"))
    .sort("month")
    .show()
)

Joins
Like pandas and sql, spark has functionality that lets us combine two tabular datasets, known as a join.

We'll start by creating some data that we can join together:


In [None]:
users = spark.createDataFrame(
    pd.DataFrame(
        {
            "id": [1, 2, 3, 4, 5, 6],
            "name": ["bob", "joe", "sally", "adam", "jane", "mike"],
            "role_id": [1, 2, 3, 3, np.nan, np.nan],
        }
    )
)
roles = spark.createDataFrame(
    pd.DataFrame(
        {
            "id": [1, 2, 3, 4],
            "name": ["admin", "author", "reviewer", "commenter"],
        }
    )
)
print("--- users ---")
users.show()
print("--- roles ---")
roles.show()

- To join two dataframes together, we'll need to call the `.join` method on one of them and supply the other as an argument.  
- In addition, we'll need to supply the condition on which we are joining.   
- In this case, we are joining where the role_id column on the users table is equal to the id column on the roles table.  
- By default, spark will perform an inner join

In [None]:
users.join(roles, on=users.role_id == roles.id).show()

In [None]:
users.join(roles, on=users.role_id == roles.id, how="left").show()

Notice a duplicate id column. There are several ways we could go about dealing with this:

- alias each dataframe + explicitly select columns after joining (this could also be implemented with spark SQL).  
- rename duplicated columns before merging.  
- drop duplicated columns after the merge (.drop(right.id))  

## Visualization (or Lack Therof)

Spark does not provide a way to do visualization with their dataframes. To visualize data from spark, you should use the `.toPandas` method on a spark dataframe to convert it to a pandas dataframe, then visualize as you normally would.

In [None]:
users.toPandas()