# Concatenating Tables with Set-Like Operations in `pyspark`

Now let's look at combining tables with `union`, `intersect`, and `except` in `pyspark`.

In [1]:
from pyspark.sql import SparkSession
from more_pyspark import to_pandas
from pyspark.sql.functions import lit
from glob import glob
import re

In [4]:
spark = SparkSession.builder.appName('Ops').config('spark.driver.host', 'localhost').getOrCreate()

## Example - Auto Sales in Spark

In [2]:
sales_aprk = spark.read.csv("./data/auto_sales_apr.csv",  header=True, inferSchema=True)
sales_aprk.collect() >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck
0,Ann,22,18,15,12
1,Bob,20,14,6,24
2,Yolanda,19,10,28,17
3,Xerxes,11,27,17,9


In [3]:
sales_mayk = spark.read.csv("./data/auto_sales_may.csv",  header=True, inferSchema=True)
sales_mayk.collect() >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck
0,Ann,22,18,15,12
1,Bob,19,12,17,20
2,Yolanda,19,8,32,15
3,Xerxes,12,23,18,9


## `UNION ALL` in `pyspark`

Both `union` and `unionAll` area actually `UNION ALL`

In [4]:
(sales_aprk
 .union(sales_mayk)
 .collect()
) >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck
0,Ann,22,18,15,12
1,Bob,20,14,6,24
2,Yolanda,19,10,28,17
3,Xerxes,11,27,17,9
4,Ann,22,18,15,12
5,Bob,19,12,17,20
6,Yolanda,19,8,32,15
7,Xerxes,12,23,18,9


## `UNION/UNION DISTINCT` in `pyspark`

Use `distinct` to get the usual `UNION/UNION DISTINCT`

In [5]:
(sales_aprk
 .union(sales_mayk)
 .distinct()
 .collect()
) >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck
0,Xerxes,12,23,18,9
1,Bob,20,14,6,24
2,Yolanda,19,8,32,15
3,Ann,22,18,15,12
4,Bob,19,12,17,20
5,Yolanda,19,10,28,17
6,Xerxes,11,27,17,9


## Adding a `month` column

As mentioned before, we really should add a month column here. Note that we need to use `lit` to add a *literal constant*

In [7]:
(sales_aprk.withColumn('month', lit('April'))
 .union(sales_mayk.withColumn('month', lit('May')))
 .collect()
) >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck,month
0,Ann,22,18,15,12,April
1,Bob,20,14,6,24,April
2,Yolanda,19,10,28,17,April
3,Xerxes,11,27,17,9,April
4,Ann,22,18,15,12,May
5,Bob,19,12,17,20,May
6,Yolanda,19,8,32,15,May
7,Xerxes,12,23,18,9,May


## Performing `INTERSECT`

Note that `intersect` and `intersectAll` are synonymous.

In [8]:
sales_aprk.intersect(sales_mayk).collect() >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck
0,Ann,22,18,15,12


## Performing a set difference with `exceptAll`

In [9]:
sales_aprk.exceptAll(sales_mayk).collect() >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck
0,Yolanda,19,10,28,17
1,Xerxes,11,27,17,9
2,Bob,20,14,6,24


## <font color="red"> Exercise 2 </font>

In the data folder, you will find 6 files that contain a sample 100,000 rows from the uber data for the month apr14-sep14.  Perform the following tasks:

1. Use `glob` to get all 6 file paths.
2. Use a regular expression to create a `lambda` function that pulls the month from the files.
3. Read the 6 `pyspark` dataframes into a `dict` with keys equal to the month name and values containing the corresponding data frame.
4. Use a dictionary comprehension to add a month column to each `df`.
5. Use the accumulator pattern and the `union` method to combine these 6 data frames into one combined `pysaprk df`
6. Inspect the head and compute the number of rows (use the `count` method)

In [2]:
files = glob('./data/uber/uber-trip-data/uber-raw-data-[a-z][a-z][a-z]14-sample.csv')
FILE_NAME_RE = re.compile(r'^\./data/uber/uber-trip-data/uber-raw-data-([a-z]+)14-sample\.csv$')
file_name = lambda p: FILE_NAME_RE.match(p).group(1) 
file_names = lambda files: [file_name(p) for p in files]
months = file_names(files)

In [5]:
dfs = {name:spark.read.csv(path, header=True, inferSchema=True) for name, path in zip(months, files)}

In [6]:
dfs_with_month = {name:df.withColumn('month', lit(name)) for name,df in dfs.items()}

In [7]:
for i,df in enumerate(dfs_with_month.values()):
    if i == 0:
        df_union = df
    else:
        df_union=df_union.union(df)

In [8]:
df_union.head(10)

[Row(Date/Time='4/18/2014 21:38:00', Lat=40.7359, Lon=-73.9852, Base='B02682', month='apr'),
 Row(Date/Time='4/23/2014 15:19:00', Lat=40.7642, Lon=-73.9543, Base='B02598', month='apr'),
 Row(Date/Time='4/10/2014 7:15:00', Lat=40.7138, Lon=-74.0103, Base='B02598', month='apr'),
 Row(Date/Time='4/11/2014 15:23:00', Lat=40.7847, Lon=-73.9698, Base='B02682', month='apr'),
 Row(Date/Time='4/7/2014 17:26:00', Lat=40.646, Lon=-73.7767, Base='B02598', month='apr'),
 Row(Date/Time='4/10/2014 16:56:00', Lat=40.7414, Lon=-74.0071, Base='B02682', month='apr'),
 Row(Date/Time='4/24/2014 23:26:00', Lat=40.7181, Lon=-73.95, Base='B02682', month='apr'),
 Row(Date/Time='4/23/2014 13:44:00', Lat=40.7634, Lon=-73.9964, Base='B02598', month='apr'),
 Row(Date/Time='4/9/2014 7:05:00', Lat=40.7167, Lon=-73.9635, Base='B02682', month='apr'),
 Row(Date/Time='4/27/2014 15:07:00', Lat=40.7216, Lon=-73.9977, Base='B02598', month='apr')]

In [9]:
df_union.count()

600000

## Up Next

Stuff