# Spark Tutorial
Hands-on, in-class session to get practical experience with one of the most popular and widely used Data Science platforms.
As you run spark commands, try to identify when lazy evaluation is applied/useful.

## Table of Contents

* Installation
* Euler sum, RDDs
* Spark DataFrames
* Column selection
* Filters
* Sorting
* Joins, lazy evaluation

## Installation
I suggest to follow https://sparkbyexamples.com/pyspark/install-pyspark-in-anaconda-jupyter-notebook/

In essence, do:
`conda create -n sparkds python=3.9
conda activate sparkds
conda install -y openjdk
conda install -y pyspark 
conda install -c conda-forge findspark
conda install -y jupyter`

In [20]:
## installed in pyspark environment
import findspark
findspark.init()
findspark.find()

'/Users/ahenschel/Applications/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark'

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("COSC608_spark").getOrCreate()

22/11/10 17:21:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
spark

In [75]:
## For older Spark (2.*) versions, often a context object is used
sc = spark.sparkContext

In [76]:
spark

## First steps
Creating the first DataFrame. They are similar, yet with subtle differences wrt Pandas dataframes.

## Euler Sum
Implement Eulers sum, as per the slides. Use parallelization and experiment with
$\lim_{n->\infty}\sum_{i=1}^{n}\frac{1}{i^2} = \frac{\pi^2}{6}$

In [77]:
import numpy as np
import pyspark
import time

In [78]:
n = 10000000
ar = np.arange(n)

In [14]:
type(spark)

pyspark.sql.session.SparkSession

In [211]:
numpartitions = 16
dat = sc.parallelize(ar, numpartitions)
sqrs = dat.map(lambda i: 1.0/(i+1)**2)

In [214]:
type(sqrs)

pyspark.rdd.PipelinedRDD

In [29]:
t0 = time.time()
x = sqrs.reduce(lambda a,b: a+b)
t1 = time.time()

22/11/01 13:42:26 WARN TaskSetManager: Stage 8 contains a task of very large size (11661 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [30]:
print("x = %f"%x)
print("time=%f"%(t1-t0))

x = 1.644934
time=4.974174


In [31]:
x = 0.0
t0 = time.time()
for i in range(1,n):
    x += 1.0/(i**2)
t1 = time.time()
print("x = %f"%x)
print("time=%f"%(t1-t0))

x = 1.644934
time=3.844444


In [37]:
spark

## Working with DataFrames
Spark has shifted paradigm from RDD to DataFrames, similar to R or Python/Pandas dataframes

### Self-created DataFrames


In [215]:
data = [("Java", 1200), ("Python", 10000), ("Scala", 3000)]
df = spark.createDataFrame(data)
df

DataFrame[_1: string, _2: bigint]

In [217]:
a = df.collect()
print(type(a), type(a[0]))
df

<class 'list'> <class 'pyspark.sql.types.Row'>


DataFrame[_1: string, _2: bigint]

In [6]:
df.show()

[Stage 0:>                                                          (0 + 1) / 1]

+------+-----+
|    _1|   _2|
+------+-----+
|  Java| 1200|
|Python|10000|
| Scala| 3000|
+------+-----+



                                                                                

In [218]:
tipsFile = "/Users/ahenschel/Dropbox/COSC101/Exercises/tips.csv"
tips = spark.read.csv(tipsFile)

In [219]:
tips.show(3)

+----------+----+------+------+---+------+----+
|       _c0| _c1|   _c2|   _c3|_c4|   _c5| _c6|
+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
+----------+----+------+------+---+------+----+
only showing top 3 rows



In [220]:
tips = spark.read.option('header', 'true').csv(tipsFile)
tips

DataFrame[total_bill: string, tip: string, sex: string, smoker: string, day: string, time: string, size: string]

In [229]:
tips.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [100]:
type(tips)

pyspark.sql.dataframe.DataFrame

In [101]:
tips.printSchema()

root
 |-- total_bill: string (nullable = true)
 |-- tip: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: string (nullable = true)



In [230]:
tips = spark.read.option('header', 'true').csv(tipsFile, inferSchema=True)
tips

DataFrame[total_bill: double, tip: double, sex: string, smoker: string, day: string, time: string, size: int]

In [104]:
tips.printSchema()

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)



In [105]:
tips.head(3)

[Row(total_bill=16.99, tip=1.01, sex='Female', smoker='No', day='Sun', time='Dinner', size=2),
 Row(total_bill=10.34, tip=1.66, sex='Male', smoker='No', day='Sun', time='Dinner', size=3),
 Row(total_bill=21.01, tip=3.5, sex='Male', smoker='No', day='Sun', time='Dinner', size=3)]

### Column selection

In [232]:
tips.select(['tip', 'sex']).show(4)

+----+------+
| tip|   sex|
+----+------+
|1.01|Female|
|1.66|  Male|
| 3.5|  Male|
|3.31|  Male|
+----+------+
only showing top 4 rows



In [107]:
tips.dtypes

[('total_bill', 'double'),
 ('tip', 'double'),
 ('sex', 'string'),
 ('smoker', 'string'),
 ('day', 'string'),
 ('time', 'string'),
 ('size', 'int')]

In [108]:
tips.describe().show()

+-------+------------------+------------------+------+------+----+------+------------------+
|summary|        total_bill|               tip|   sex|smoker| day|  time|              size|
+-------+------------------+------------------+------+------+----+------+------------------+
|  count|               244|               244|   244|   244| 244|   244|               244|
|   mean|19.785942622950824|2.9982786885245902|  null|  null|null|  null| 2.569672131147541|
| stddev| 8.902411954856857|1.3836381890011815|  null|  null|null|  null|0.9510998047322347|
|    min|              3.07|               1.0|Female|    No| Fri|Dinner|                 1|
|    max|             50.81|              10.0|  Male|   Yes|Thur| Lunch|                 6|
+-------+------------------+------------------+------+------+----+------+------------------+



In [111]:
tips1 = tips.withColumn("tipPP", tips['tip']/tips['size'])

In [117]:
tips1.show()

+----------+----+------+------+---+------+----+------------------+
|total_bill| tip|   sex|smoker|day|  time|size|             tipPP|
+----------+----+------+------+---+------+----+------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|             0.505|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.5533333333333333|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|1.1666666666666667|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|             1.655|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|            0.9025|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|            1.1775|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|               1.0|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|              0.78|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|              0.98|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|             1.615|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|             0.855|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|              1

In [118]:
tips1.sort('tipPP', ascending=False).show()

+----------+----+------+------+----+------+----+------------------+
|total_bill| tip|   sex|smoker| day|  time|size|             tipPP|
+----------+----+------+------+----+------+----+------------------+
|     50.81|10.0|  Male|   Yes| Sat|Dinner|   3|3.3333333333333335|
|     24.71|5.85|  Male|    No|Thur| Lunch|   2|             2.925|
|     23.33|5.65|  Male|   Yes| Sun|Dinner|   2|             2.825|
|      7.25|5.15|  Male|   Yes| Sun|Dinner|   2|             2.575|
|     22.23| 5.0|  Male|    No| Sun|Dinner|   2|               2.5|
|     25.28| 5.0|Female|   Yes| Sat|Dinner|   2|               2.5|
|     32.68| 5.0|  Male|   Yes|Thur| Lunch|   2|               2.5|
|     48.33| 9.0|  Male|    No| Sat|Dinner|   4|              2.25|
|     28.17| 6.5|Female|   Yes| Sat|Dinner|   3|2.1666666666666665|
|      21.7| 4.3|  Male|    No| Sat|Dinner|   2|              2.15|
|     16.32| 4.3|Female|   Yes| Fri|Dinner|   2|              2.15|
|     25.21|4.29|  Male|   Yes| Sat|Dinner|   2|

In [119]:
tips.show(3)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
+----------+----+------+------+---+------+----+
only showing top 3 rows



In [123]:
tips.groupBy('sex').count().sort('count').show()

+------+-----+
|   sex|count|
+------+-----+
|Female|   87|
|  Male|  157|
+------+-----+



In [126]:
tips.groupBy('size').mean("tip").orderBy("avg(tip)").show()

+----+------------------+
|size|          avg(tip)|
+----+------------------+
|   1|            1.4375|
|   2| 2.582307692307693|
|   3| 3.393157894736842|
|   5|4.0280000000000005|
|   4| 4.135405405405407|
|   6|             5.225|
+----+------------------+



### Exercise - Aggregate functions
[https://sparkbyexamples.com/pyspark/pyspark-aggregate-functions/]
Calculate, whether men or women tip more, per person and percentage wise. To this end, calculate for each table the percentage tip (multiply by 100 for better readability) and divide that by size. Add a new column tipPctPP for this. Then calculate the average of this column for men and for women. Use the groupBy method and confirm with the filter method.

In [138]:
tips2 = tips.withColumn("tipPctPP", (100*tips['tip']/tips['total_bill'])/tips['size'])
tips2.show(10)

+----------+----+------+------+---+------+----+------------------+
|total_bill| tip|   sex|smoker|day|  time|size|          tipPctPP|
+----------+----+------+------+---+------+----+------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|2.9723366686286052|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|5.3513862024500325|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3| 5.552911312073615|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|  6.98902027027027|
|     24.59|3.61|Female|    No|Sun|Dinner|   4| 3.670191134607564|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4| 4.655990510083037|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|11.402508551881414|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|2.9017857142857144|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2| 6.515957446808511|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|10.926928281461436|
+----------+----+------+------+---+------+----+------------------+
only showing top 10 rows



In [139]:
tips2.groupBy("sex").mean().show()

+------+------------------+------------------+------------------+-----------------+
|   sex|   avg(total_bill)|          avg(tip)|         avg(size)|    avg(tipPctPP)|
+------+------------------+------------------+------------------+-----------------+
|Female|18.056896551724137| 2.833448275862069|2.4597701149425286|7.739127915090128|
|  Male|20.744076433121034|3.0896178343949052|2.6305732484076434|6.742178656313673|
+------+------------------+------------------+------------------+-----------------+



In [166]:
tips2.filter(tips2.sex=='Female').select(mean('tipPctPP')).show()

+-----------------+
|    avg(tipPctPP)|
+-----------------+
|7.739127915090128|
+-----------------+



In [169]:
tips2.filter(tips2.sex=='Female').select(mean('tip'), mean('tipPctPP')).collect()

[Row(avg(tip)=2.833448275862069, avg(tipPctPP)=7.739127915090128)]

In [167]:
tips2.filter(tips2.sex=='Male').select(mean('tipPctPP')).show()

+-----------------+
|    avg(tipPctPP)|
+-----------------+
|6.742178656313673|
+-----------------+



## Group-By aggregation


In [5]:
simpleData = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]
schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema=schema)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+



In [8]:
#https://sparkbyexamples.com/pyspark-tutorial/
df1 = df.groupBy("department").sum("salary")

In [9]:
df1.show(truncate=False)

+----------+-----------+
|department|sum(salary)|
+----------+-----------+
|Sales     |257000     |
|Finance   |351000     |
|Marketing |171000     |
+----------+-----------+



### Exercises
1. Group by state and create a count for each state
2. Group by state and calculate the respective average salaries
3. Group by both department and state and calculate the mean bonus


In [11]:
df.groupBy("state").mean().show(truncate=False)

+-----+-----------+--------+----------+
|state|avg(salary)|avg(age)|avg(bonus)|
+-----+-----------+--------+----------+
|CA   |87500.0    |29.75   |22000.0   |
|NY   |85800.0    |45.8    |17000.0   |
+-----+-----------+--------+----------+



In [94]:
df.groupBy("state").count().show(truncate=False)

+-----+-----+
|state|count|
+-----+-----+
|CA   |4    |
|NY   |5    |
+-----+-----+



In [92]:
df.groupBy("department").max("salary")

DataFrame[department: string, max(salary): bigint]

In [14]:
df.groupBy("state", "department").mean("salary").show(truncate=False)

+-----+----------+-----------+
|state|department|avg(salary)|
+-----+----------+-----------+
|CA   |Sales     |81000.0    |
|CA   |Marketing |80000.0    |
|NY   |Sales     |88000.0    |
|CA   |Finance   |94500.0    |
|NY   |Finance   |81000.0    |
|NY   |Marketing |91000.0    |
+-----+----------+-----------+



### Filtering
Note the Array types. Identify columns that are of ArrayType. Note that this is different to most SQL databases like MySQL or SQLite. It's a good idea to use column names that qualify as
Caveat: don't use column names that are methods/attributes of a PySpark DataFrame object, like "mean" or "sum".

In [15]:
from pyspark.sql.types import StructType,StructField 
from pyspark.sql.types import StringType, IntegerType, ArrayType


data = [
    (("James","","Smith"),["Java","Scala","C++"],"OH","M"),
    (("Anna","Rose",""),["Spark","Java","C++"],"NY","F"),
    (("Julia","","Williams"),["CSharp","VB"],"OH","F"),
    (("Maria","Anne","Jones"),["CSharp","VB"],"NY","M"),
    (("Jen","Mary","Brown"),["CSharp","VB"],"NY","M"),
    (("Mike","Mary","Williams"),["Python","VB"],"OH","M")
 ]
        
schema = StructType([
     StructField('name', StructType([
        StructField('firstname', StringType(), True),
        StructField('middlename', StringType(), True),
         StructField('lastname', StringType(), True)
     ])),
     StructField('languages', ArrayType(StringType()), True),
     StructField('state', StringType(), True),
     StructField('gender', StringType(), True)
 ])

df = spark.createDataFrame(data = data, schema = schema)
df.printSchema()
df.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)

+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Anna, Rose, }        |[Spark, Java, C++]|NY   |F     |
|{Julia, , Williams}   |[CSharp, VB]      |OH   |F     |
|{Maria, Anne, Jones}  |[CSharp, VB]      |NY   |M     |
|{Jen, Mary, Brown}    |[CSharp, VB]      |NY   |M     |
|{Mike, Mary, Williams}|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+



In [None]:
df.filter(df.state="OH")

## Working with Time Series
Time Series usually contain a time stamp, often as index, which can then be used for quickly selecting individual times or periods of time.

In [196]:
dates = [("1","2019-07-01 12:01:19.111"),
    ("2","2019-06-24 12:01:19.222"),
    ("3","2019-11-16 16:44:55.406"),
    ("4","2019-11-16 16:50:59.406")
    ]
ts = spark.createDataFrame(data=dates, schema=["id","input"])
ts.show()

+---+--------------------+
| id|               input|
+---+--------------------+
|  1|2019-07-01 12:01:...|
|  2|2019-06-24 12:01:...|
|  3|2019-11-16 16:44:...|
|  4|2019-11-16 16:50:...|
+---+--------------------+



In [186]:
print(ts)

DataFrame[id: string, input: string]


### Converting strings to dates
Note that we can't easily select for example all entries from 2019 or from morning hours or from november (etc.), if the data type is still a string. We need to convert it.

In [177]:
from pyspark.sql.functions import *
current_timestamp?

In [178]:
current_timestamp()

Column<'current_timestamp()'>

In [145]:
to_timestamp?

In [197]:
ts = ts.withColumn('start_time',to_timestamp(col('input')))

In [198]:
ts.show()

+---+--------------------+--------------------+
| id|               input|          start_time|
+---+--------------------+--------------------+
|  1|2019-07-01 12:01:...|2019-07-01 12:01:...|
|  2|2019-06-24 12:01:...|2019-06-24 12:01:...|
|  3|2019-11-16 16:44:...|2019-11-16 16:44:...|
|  4|2019-11-16 16:50:...|2019-11-16 16:50:...|
+---+--------------------+--------------------+



In [199]:
print(ts)

DataFrame[id: string, input: string, start_time: timestamp]


In [200]:
ts = ts.withColumn('end_time', current_timestamp())
ts.show()

+---+--------------------+--------------------+--------------------+
| id|               input|          start_time|            end_time|
+---+--------------------+--------------------+--------------------+
|  1|2019-07-01 12:01:...|2019-07-01 12:01:...|2022-11-04 21:19:...|
|  2|2019-06-24 12:01:...|2019-06-24 12:01:...|2022-11-04 21:19:...|
|  3|2019-11-16 16:44:...|2019-11-16 16:44:...|2022-11-04 21:19:...|
|  4|2019-11-16 16:50:...|2019-11-16 16:50:...|2022-11-04 21:19:...|
+---+--------------------+--------------------+--------------------+



In [202]:
## Note, this is a rather complex calculation, not easy to do by yourself
ts = ts.withColumn('DiffInSeconds',col("end_time").cast("long") - col('start_time').cast("long"))
ts.show()

+---+--------------------+--------------------+--------------------+-------------+
| id|               input|          start_time|            end_time|DiffInSeconds|
+---+--------------------+--------------------+--------------------+-------------+
|  1|2019-07-01 12:01:...|2019-07-01 12:01:...|2022-11-04 21:20:...|    105614337|
|  2|2019-06-24 12:01:...|2019-06-24 12:01:...|2022-11-04 21:20:...|    106219137|
|  3|2019-11-16 16:44:...|2019-11-16 16:44:...|2022-11-04 21:20:...|     93674121|
|  4|2019-11-16 16:50:...|2019-11-16 16:50:...|2022-11-04 21:20:...|     93673757|
+---+--------------------+--------------------+--------------------+-------------+



## Joining Tables
Again, a complex operation, also in terms of $O$-notation. It is also refered to the cross-product. The join benefits from indexing the columns that the join is performed on.

### Types of Joins

Join String	|Equivalent SQL Join
---|---
inner|	INNER JOIN
outer, full, fullouter, full_outer|	FULL OUTER JOIN
left, leftouter, left_outer	|LEFT JOIN
right, rightouter, right_outer |RIGHT JOIN

In [16]:
emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- superior_emp_id: long (nullable = true)
 |-- year_joined: string (nullable = true)
 |-- emp_dept_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+-----

In [17]:
dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

root
 |-- dept_name: string (nullable = true)
 |-- dept_id: long (nullable = true)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+



Notice, how fast the next command is. Explain, why!

In [19]:
jempDF = empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id, "inner") 

Many operations could still be performed on the joined table, or rather, on the table "to be joined".

In [207]:
jempDF.filter(jempDF.name!='Smith').show()

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|     10|
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|     10|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|     20|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|     40|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [21]:
!pwd

/Users/ahenschel/Dropbox/COSC608/Code
