# Concatenating Tables with Set-Like Operations in `pyspark`

Now let's look at combining tables with `union`, `intersect`, and `except` in `pyspark`.

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Ops').config('spark.driver.host', 'localhost').getOrCreate()

## Example - Auto Sales in Spark

In [3]:
from more_pyspark import to_pandas
sales_aprk = spark.read.csv("./data/auto_sales_apr.csv",  header=True, inferSchema=True)
sales_aprk.collect() >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck
0,Ann,22,18,15,12
1,Bob,20,14,6,24
2,Yolanda,19,10,28,17
3,Xerxes,11,27,17,9


In [4]:
sales_mayk = spark.read.csv("./data/auto_sales_may.csv",  header=True, inferSchema=True)
sales_mayk.collect() >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck
0,Ann,22,18,15,12
1,Bob,19,12,17,20
2,Yolanda,19,8,32,15
3,Xerxes,12,23,18,9


## `UNION ALL` in `pyspark`

Both `union` and `unionAll` area actually `UNION ALL`

In [5]:
(sales_aprk
 .union(sales_mayk)
 .collect()
) >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck
0,Ann,22,18,15,12
1,Bob,20,14,6,24
2,Yolanda,19,10,28,17
3,Xerxes,11,27,17,9
4,Ann,22,18,15,12
5,Bob,19,12,17,20
6,Yolanda,19,8,32,15
7,Xerxes,12,23,18,9


## `UNION/UNION DISTINCT` in `pyspark`

Use `distinct` to get the usual `UNION/UNION DISTINCT`

In [6]:
(sales_aprk
 .union(sales_mayk)
 .distinct()
 .collect()
) >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck
0,Xerxes,12,23,18,9
1,Bob,20,14,6,24
2,Yolanda,19,8,32,15
3,Ann,22,18,15,12
4,Bob,19,12,17,20
5,Yolanda,19,10,28,17
6,Xerxes,11,27,17,9


## Adding a `month` column

As mentioned before, we really should add a month column here. Note that we need to use `lit` to add a *literal constant*

In [7]:
from pyspark.sql.functions import lit

(sales_aprk.withColumn('month', lit('April'))
 .union(sales_mayk.withColumn('month', lit('May')))
 .collect()
) >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck,month
0,Ann,22,18,15,12,April
1,Bob,20,14,6,24,April
2,Yolanda,19,10,28,17,April
3,Xerxes,11,27,17,9,April
4,Ann,22,18,15,12,May
5,Bob,19,12,17,20,May
6,Yolanda,19,8,32,15,May
7,Xerxes,12,23,18,9,May


## Performing `INTERSECT`

Note that `intersect` and `intersectAll` are synonymous.

In [8]:
sales_aprk.intersect(sales_mayk).collect() >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck
0,Ann,22,18,15,12


## Performing a set difference with `exceptAll`

In [9]:
sales_aprk.exceptAll(sales_mayk).collect() >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck
0,Yolanda,19,10,28,17
1,Xerxes,11,27,17,9
2,Bob,20,14,6,24


## <font color="red"> Exercise 2 </font>

In the data folder, you will find 6 files that contain a sample 100,000 rows from the uber data for the month apr14-sep14.  Perform the following tasks:

1. Use `glob` to get all 6 file paths.
2. Use a regular expression to create a `lambda` function that pulls the month from the files.
3. Read the 6 `pyspark` dataframes into a `dict` with keys equal to the month name and values containing the corresponding data frame.
4. Use a dictionary comprehension to add a month column to each `df`.
5. Use the accumulator pattern and the `union` method to combine these 6 data frames into one combined `pysaprk df`
6. Inspect the head and compute the number of rows (use the `count` method)

In [68]:
# Your code here
from glob import glob 
import os

filePath = "./Data/uber/uber-trip-data"

os.listdir(filePath)

['taxi-zone-lookup.csv',
 '.DS_Store',
 'uber-raw-data-janjune-15.csv.zip',
 'uber-raw-data-janjune-15.csv',
 'uber-raw-data-apr14.csv',
 'uber-raw-data-aug14.csv',
 'uber-raw-data-sep14.csv',
 'uber-raw-data-jul14.csv',
 'uber-raw-data-jun14.csv',
 'uber-raw-data-may14.csv']

In [79]:
files = glob("./Data/uber/uber-trip-data/*")
files

['./Data/uber/uber-trip-data/taxi-zone-lookup.csv',
 './Data/uber/uber-trip-data/uber-raw-data-janjune-15.csv.zip',
 './Data/uber/uber-trip-data/uber-raw-data-janjune-15.csv',
 './Data/uber/uber-trip-data/uber-raw-data-apr14.csv',
 './Data/uber/uber-trip-data/uber-raw-data-aug14.csv',
 './Data/uber/uber-trip-data/uber-raw-data-sep14.csv',
 './Data/uber/uber-trip-data/uber-raw-data-jul14.csv',
 './Data/uber/uber-trip-data/uber-raw-data-jun14.csv',
 './Data/uber/uber-trip-data/uber-raw-data-may14.csv']

In [77]:
s = "./Data/uber/uber-trip-data/uber-raw-data-sep14.csv"
match = re.match(r'./Data/uber/uber-trip-data/uber-raw-data-(\w+\d+)\.csv$',s)
match.group(1)


'sep14'

In [78]:
#We need apr to sep only

import re
FILE_NAME_RE = re.compile(r'./Data/uber/uber-trip-data/uber-raw-data-(\w+\d+)\.csv$')
file_name = lambda p: FILE_NAME_RE.match(p).group(1) 
match = lambda p: FILE_NAME_RE.match(p)
file_names = lambda files: [file_name(p) for p in files if match(p) ]
file_names(files)

['apr14', 'aug14', 'sep14', 'jul14', 'jun14', 'may14']

In [80]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Ops').config('spark.driver.host', 'localhost').getOrCreate()

from more_pyspark import to_pandas
month_df_dict ={ file_name(file):spark.read.csv(file, header=True, inferSchema=True) for file in files if match(file)}
month_df_dict

{'apr14': DataFrame[Date/Time: string, Lat: double, Lon: double, Base: string],
 'aug14': DataFrame[Date/Time: string, Lat: double, Lon: double, Base: string],
 'sep14': DataFrame[Date/Time: string, Lat: double, Lon: double, Base: string],
 'jul14': DataFrame[Date/Time: string, Lat: double, Lon: double, Base: string],
 'jun14': DataFrame[Date/Time: string, Lat: double, Lon: double, Base: string],
 'may14': DataFrame[Date/Time: string, Lat: double, Lon: double, Base: string]}

In [81]:
from pyspark.sql.functions import lit

df_month_renamed = [df.withColumn('month', lit(month)) for month,df in month_df_dict.items()]
df_month_renamed

[DataFrame[Date/Time: string, Lat: double, Lon: double, Base: string, month: string],
 DataFrame[Date/Time: string, Lat: double, Lon: double, Base: string, month: string],
 DataFrame[Date/Time: string, Lat: double, Lon: double, Base: string, month: string],
 DataFrame[Date/Time: string, Lat: double, Lon: double, Base: string, month: string],
 DataFrame[Date/Time: string, Lat: double, Lon: double, Base: string, month: string],
 DataFrame[Date/Time: string, Lat: double, Lon: double, Base: string, month: string]]

In [85]:
from functools import reduce  
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

df = unionAll(*df_month_renamed)


In [86]:
df.show()

+----------------+-------+--------+------+-----+
|       Date/Time|    Lat|     Lon|  Base|month|
+----------------+-------+--------+------+-----+
|4/1/2014 0:11:00| 40.769|-73.9549|B02512|apr14|
|4/1/2014 0:17:00|40.7267|-74.0345|B02512|apr14|
|4/1/2014 0:21:00|40.7316|-73.9873|B02512|apr14|
|4/1/2014 0:28:00|40.7588|-73.9776|B02512|apr14|
|4/1/2014 0:33:00|40.7594|-73.9722|B02512|apr14|
|4/1/2014 0:33:00|40.7383|-74.0403|B02512|apr14|
|4/1/2014 0:39:00|40.7223|-73.9887|B02512|apr14|
|4/1/2014 0:45:00| 40.762| -73.979|B02512|apr14|
|4/1/2014 0:55:00|40.7524| -73.996|B02512|apr14|
|4/1/2014 1:01:00|40.7575|-73.9846|B02512|apr14|
|4/1/2014 1:19:00|40.7256|-73.9869|B02512|apr14|
|4/1/2014 1:48:00|40.7591|-73.9684|B02512|apr14|
|4/1/2014 1:49:00|40.7271|-73.9803|B02512|apr14|
|4/1/2014 2:11:00|40.6463|-73.7896|B02512|apr14|
|4/1/2014 2:25:00|40.7564|-73.9167|B02512|apr14|
|4/1/2014 2:31:00|40.7666|-73.9531|B02512|apr14|
|4/1/2014 2:43:00| 40.758|-73.9761|B02512|apr14|
|4/1/2014 3:22:00|40

In [87]:
df.count()

4534327

## Up Next

Stuff