# Select, Filter, and Mutate

In this lecture, we will look at three important actions used to process data frames.  While each framework uses different names for these functions, we will use the names from the `R` library `dplyr`, namely `select`, `mutate`, and `filter`.  The most important takeaway will be that, regardless of framework or scale, we can process data frames in the same way by applying the same sequence of data verbs.

## R and Python can interact!

In [1]:
#!pip install rpy2 tzlocal
import rpy2
%load_ext rpy2.ipython

ModuleNotFoundError: No module named 'rpy2'

In [2]:
import warnings
warnings.filterwarnings('ignore')

In [3]:
%%R
rnorm(5, 2, 3)

UsageError: Cell magic `%%R` not found.


## We love dplyr!

In [38]:
%%R 
library(dplyr)
artists <- read.csv('./data/Artists.csv')

(artists %>%
  select(BeginDate, 
         DisplayName, 
         Nationality) %>%
  filter(BeginDate > 0) %>%
  head) -> output
output

  BeginDate     DisplayName Nationality
1      1930  Robert Arneson    American
2      1936  Doroteo Arnaiz     Spanish
3      1941     Bill Arnold    American
4      1946 Charles Arnoldi    American
5      1941     Per Arnoldi      Danish
6      1925   Danilo Aroldi     Italian


## What makes `dplyr` so great?

* Focus on data verbs
* Pipes lead to code that is
    * More readable
    * Easy to compose and debug

## Set up

Let's read in a data set in each of the three frameworks

#### `pandas` and `dfply`

In [24]:
import pandas as pd
from dfply import *
heroes = pd.read_csv('./data/heroes_information.csv')

#### `sqlalchemy` 

In [5]:
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine, func
from sqlalchemy.ext.automap import automap_base

engine = create_engine("sqlite:///databases/heroes_2_1.db")

Base = automap_base()
Base.prepare(engine, reflect=True)
Hero = Base.classes.heroes

Session = sessionmaker(bind=engine)
session = Session()

#### `pyspark`

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean

spark1 = SparkSession.builder.appName('Ops').getOrCreate()
df_spark = spark1.read.csv('data/heroes_information.csv', inferSchema=True, header=True)

## Selecting Columns

The first verb, `select` 

* filters the *columns*
* At the core of `SQL` statements

## How to select

* `pandas`: pipe (`>>`) into `select`
* `sqlalchemy`: Use `session.query`
* `pyspark`: Use the `select` method

#### `select` in `pandas` + `dfply`

In [26]:
from dfply import select as select_dfply
(heroes >>
   select_dfply(X.name, 
                X.Gender, 
                X.Weight) >>
   head
)

Unnamed: 0,name,Gender,Weight
0,A-Bomb,Male,441.0
1,Abe Sapien,Male,65.0
2,Abin Sur,Male,90.0
3,Abomination,Male,441.0
4,Abraxas,Male,-99.0


#### `select` expression in `sqlalchemy`

In [27]:
from sqlalchemy import select as select_sql
stmt = (select_sql([Hero.name, Hero.gender, Hero.weight]).
          select_from(Hero).
          limit(5))
print(stmt)

SELECT heroes.name, heroes.gender, heroes.weight 
FROM heroes
 LIMIT :param_1


In [28]:
session.execute(stmt).fetchall()

[('A-Bomb', 'Male', 441.0),
 ('Abe Sapien', 'Male', 65.0),
 ('Abin Sur', 'Male', 90.0),
 ('Abomination', 'Male', 441.0),
 ('Abraxas', 'Male', None)]

#### Convert the result to a `pandas.DataFrame`

In [29]:
pd.read_sql_query(stmt, con=engine)

Unnamed: 0,name,gender,weight
0,A-Bomb,Male,441.0
1,Abe Sapien,Male,65.0
2,Abin Sur,Male,90.0
3,Abomination,Male,441.0
4,Abraxas,Male,


#### `select` in `pyspark`

In [30]:
(df_spark.
    select(df_spark.name, 
           df_spark.Gender, 
           df_spark.Weight).
    take(5))

[Row(name='A-Bomb', Gender='Male', Weight=441.0),
 Row(name='Abe Sapien', Gender='Male', Weight=65.0),
 Row(name='Abin Sur', Gender='Male', Weight=90.0),
 Row(name='Abomination', Gender='Male', Weight=441.0),
 Row(name='Abraxas', Gender='Male', Weight=-99.0)]

#### Convert the result to a `pandas.DataFrame`

In [31]:
result = (df_spark.
            select(df_spark.name, 
                   df_spark.Gender, 
                   df_spark.Weight).
            take(5))
spark1.createDataFrame(result).toPandas()

Py4JJavaError: An error occurred while calling o171.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 (TID 11, localhost, executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164)
	... 29 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3258)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3255)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3255)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164)
	... 29 more


## Filtering Rows

The next verb, `filter` 

* filters the *rows*
* is related to the `SQL` `WHERE` clause

## How to filter

* `pandas`: pipe (`>>`) into `filter_by`
* `sqlalchemy`: Use `session.query.filter` or `session.query.filter_by`
* `pyspark`: Use the `where` method

#### `filter_by` in `pandas` + `dfply`

In [32]:
(heroes >>
  filter_by(X.Gender == 'Male') >>
  head)

Unnamed: 0,Id,name,Gender,Eye color,Race,Hair color,Height,Publisher,Skin color,Alignment,Weight
0,0,A-Bomb,Male,yellow,Human,No Hair,203.0,Marvel Comics,-,good,441.0
1,1,Abe Sapien,Male,blue,Icthyo Sapien,No Hair,191.0,Dark Horse Comics,blue,good,65.0
2,2,Abin Sur,Male,blue,Ungaran,No Hair,185.0,DC Comics,red,good,90.0
3,3,Abomination,Male,green,Human / Radiation,No Hair,203.0,Marvel Comics,-,bad,441.0
4,4,Abraxas,Male,blue,Cosmic Entity,Black,-99.0,Marvel Comics,-,bad,-99.0


#### `where` in a `sqlalchemy` `select` expression

In [33]:
# All SQL statements start with a select
f_stmt = (select_sql('*').
           where(Hero.gender == 'Male').
           limit(5))
session.execute(f_stmt).fetchall()

[(0, 'A-Bomb', 'Male', 'yellow', 'Human', 'No Hair', 203.0, 'Marvel Comics', None, 'good', 441.0),
 (1, 'Abe Sapien', 'Male', 'blue', 'Icthyo Sapien', 'No Hair', 191.0, 'Dark Horse Comics', 'blue', 'good', 65.0),
 (2, 'Abin Sur', 'Male', 'blue', 'Ungaran', 'No Hair', 185.0, 'DC Comics', 'red', 'good', 90.0),
 (3, 'Abomination', 'Male', 'green', 'Human / Radiation', 'No Hair', 203.0, 'Marvel Comics', None, 'bad', 441.0),
 (4, 'Abraxas', 'Male', 'blue', 'Cosmic Entity', 'Black', None, 'Marvel Comics', None, 'bad', None)]

#### Convert the result to a `pandas.DataFrame`

In [34]:
pd.read_sql_query(f_stmt, con = engine)

Unnamed: 0,id,name,gender,eye_color,race,hair_color,height,publisher,skin_color,alignment,weight
0,0,A-Bomb,Male,yellow,Human,No Hair,203.0,Marvel Comics,,good,441.0
1,1,Abe Sapien,Male,blue,Icthyo Sapien,No Hair,191.0,Dark Horse Comics,blue,good,65.0
2,2,Abin Sur,Male,blue,Ungaran,No Hair,185.0,DC Comics,red,good,90.0
3,3,Abomination,Male,green,Human / Radiation,No Hair,203.0,Marvel Comics,,bad,441.0
4,4,Abraxas,Male,blue,Cosmic Entity,Black,,Marvel Comics,,bad,


#### `where` in `pyspark`

In [35]:
f_result = (df_spark.
            where(df_spark.Gender == 'Male').
            take(5))
f_result

[Row(Id=0, name='A-Bomb', Gender='Male', Eye color='yellow', Race='Human', Hair color='No Hair', Height=203.0, Publisher='Marvel Comics', Skin color='-', Alignment='good', Weight=441.0),
 Row(Id=1, name='Abe Sapien', Gender='Male', Eye color='blue', Race='Icthyo Sapien', Hair color='No Hair', Height=191.0, Publisher='Dark Horse Comics', Skin color='blue', Alignment='good', Weight=65.0),
 Row(Id=2, name='Abin Sur', Gender='Male', Eye color='blue', Race='Ungaran', Hair color='No Hair', Height=185.0, Publisher='DC Comics', Skin color='red', Alignment='good', Weight=90.0),
 Row(Id=3, name='Abomination', Gender='Male', Eye color='green', Race='Human / Radiation', Hair color='No Hair', Height=203.0, Publisher='Marvel Comics', Skin color='-', Alignment='bad', Weight=441.0),
 Row(Id=4, name='Abraxas', Gender='Male', Eye color='blue', Race='Cosmic Entity', Hair color='Black', Height=-99.0, Publisher='Marvel Comics', Skin color='-', Alignment='bad', Weight=-99.0)]

In [None]:
spark1.createDataFrame(f_result).toPandas()

## Chaining Data Verbs

* Processing df $\rightarrow$ chaining data verbs
* Accomplished through pipes/dot-chains

## Example 1 - `select` + `filter`

#### `pandas` + `dfply`

In [36]:
(heroes >>
   filter_by(X.Gender == 'Male') >>
   select_dfply(X.name, X.Gender, X.Weight) >>
   head)

Unnamed: 0,name,Gender,Weight
0,A-Bomb,Male,441.0
1,Abe Sapien,Male,65.0
2,Abin Sur,Male,90.0
3,Abomination,Male,441.0
4,Abraxas,Male,-99.0


#### `select`  expression in `sqlalchemy`

In [37]:
from sqlalchemy import select
# Make an SQL expression
sel_filt_stmt = (select_sql([Hero.name, 
                             Hero.gender, 
                             Hero.weight]).
                   where(Hero.gender == 'Male').
                   limit(5))
# Excute the expression
pd.read_sql_query(sel_filt_stmt, con=engine)

Unnamed: 0,name,gender,weight
0,A-Bomb,Male,441.0
1,Abe Sapien,Male,65.0
2,Abin Sur,Male,90.0
3,Abomination,Male,441.0
4,Abraxas,Male,


####  `pyspark`

In [38]:
sf_result = (df_spark.
            where(df_spark.Gender == 'Male').
            select(df_spark.name, 
                   df_spark.Gender, 
                   df_spark.Weight).
            take(5))
spark1.createDataFrame(sf_result).toPandas()

Py4JJavaError: An error occurred while calling o253.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 1 times, most recent failure: Lost task 0.0 in stage 11.0 (TID 17, localhost, executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164)
	... 29 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3258)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3255)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3255)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164)
	... 29 more


## Example 2 - `filter` + `filter`

Note that chaining `filter`s is an `and` operation.

####  `pandas` + `dfply`

In [None]:
(heroes >>
   select_dfply(X.name, X.Gender, X.Weight) >>
   filter_by(X.Gender == 'Male') >>
   filter_by(X.Weight > 0) >>
   head)

#### `select`  expression in `sqlalchemy`

In [None]:
from sqlalchemy import select
# Make an SQL expression
ff_stmt = (select_sql([Hero.name, 
                       Hero.gender, 
                       Hero.weight]).
            where(Hero.gender == 'Male').
            where(Hero.weight > 0).
            limit(5))
# Excute the expression
pd.read_sql_query(ff_stmt, con=engine)

####  `pyspark`

In [None]:
ff_result = (df_spark.
               select(df_spark.name, 
                      df_spark.Gender, 
                      df_spark.Weight).
               where(df_spark.Gender == 'Male').
               where(df_spark.Weight > 0).
               take(5))
ff_result

In [14]:
example_row = ff_result[0]
row_dict = [r.asDict() for r in ff_result]
row_dict

[{'name': 'A-Bomb', 'Gender': 'Male', 'Weight': 441.0},
 {'name': 'Abe Sapien', 'Gender': 'Male', 'Weight': 65.0},
 {'name': 'Abin Sur', 'Gender': 'Male', 'Weight': 90.0},
 {'name': 'Abomination', 'Gender': 'Male', 'Weight': 441.0},
 {'name': 'Absorbing Man', 'Gender': 'Male', 'Weight': 122.0}]

In [16]:
pd.DataFrame(row_dict)

Unnamed: 0,Gender,Weight,name
0,Male,441.0,A-Bomb
1,Male,65.0,Abe Sapien
2,Male,90.0,Abin Sur
3,Male,441.0,Abomination
4,Male,122.0,Absorbing Man


In [17]:
pd.DataFrame([r.asDict() for r in ff_result])

Unnamed: 0,Gender,Weight,name
0,Male,441.0,A-Bomb
1,Male,65.0,Abe Sapien
2,Male,90.0,Abin Sur
3,Male,441.0,Abomination
4,Male,122.0,Absorbing Man


In [19]:
to_pandas = lambda rows: pd.DataFrame([r.asDict() for r in rows])
to_pandas(ff_result)

Unnamed: 0,Gender,Weight,name
0,Male,441.0,A-Bomb
1,Male,65.0,Abe Sapien
2,Male,90.0,Abin Sur
3,Male,441.0,Abomination
4,Male,122.0,Absorbing Man


In [21]:
from functoolz import pipeable
to_pandas = pipeable(lambda rows: pd.DataFrame([r.asDict() for r in rows]))
ff_result >> to_pandas

Unnamed: 0,Gender,Weight,name
0,Male,441.0,A-Bomb
1,Male,65.0,Abe Sapien
2,Male,90.0,Abin Sur
3,Male,441.0,Abomination
4,Male,122.0,Absorbing Man


In [23]:
from more_pyspark import to_pandas
ff_result >> to_pandas

Unnamed: 0,Gender,Weight,name
0,Male,441.0,A-Bomb
1,Male,65.0,Abe Sapien
2,Male,90.0,Abin Sur
3,Male,441.0,Abomination
4,Male,122.0,Absorbing Man


In [7]:
spark1.createDataFrame(ff_result).toPandas()

Py4JJavaError: An error occurred while calling o61.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 3.0 failed 1 times, most recent failure: Lost task 3.0 in stage 3.0 (TID 6, localhost, executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164)
	... 29 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3258)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3255)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3255)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164)
	... 29 more


## <font color="red"> Exercise 1: Blue-eyed Heroes </font>

Create a query that

1. Selects the name, Gender, and Eye Color columns
2. Filters on eye_color == 'blue'

####  `pandas` + `dfply`

In [42]:
from dfply import select as select_dfply
(heroes >>
    select_dfply(X.name, X.Gender, X['Eye color']) >>
    filter_by(X['Eye color'] == 'blue') >>
   head)

Unnamed: 0,name,Gender,Eye color
1,Abe Sapien,Male,blue
2,Abin Sur,Male,blue
4,Abraxas,Male,blue
5,Absorbing Man,Male,blue
6,Adam Monroe,Male,blue


#### `query`  in `sqlalchemy`

#### `select`  expression in `sqlalchemy`

In [None]:
from sqlalchemy import select as select_sql
#Make and SQL expression
ff_stmt = (select_sql([Hero.name,
                      Hero.gender,
                      Hero['eye_color']]).
          where)

####  `pyspark`

In [48]:
ff_result = (df_spark.
               select(df_spark.name, 
                      df_spark.Gender, 
                      df_spark['Eye color']).
               where(df_spark['Eye color'] == 'blue').
               where(df_spark.Weight > 0).
               take(5))
from more_pyspark import to_pandas
ff_result >> to_pandas

Unnamed: 0,Eye color,Gender,name
0,blue,Male,Abe Sapien
1,blue,Male,Abin Sur
2,blue,Male,Absorbing Man
3,blue,Male,Adam Strange
4,blue,Female,Agent 13


## Constructing New Columns

The third verb, `mutate` 

* Creates new columns
* Changes existing columns

## How to mutate

* `pandas`: pipe (`>>`) into `mutate`
* `sqlalchemy`: Use a formula and alias in `session.query` or `select`
* `pyspark`: Use the `withColumns` method

## Example 3 - Converting Weight to kilograms

Currently, the weight column is in pounds.  Let's convert to kilograms.

####  `pandas` + `dfply`

In [44]:
(heroes >>
   select_dfply(X.name, 
                X.Gender, 
                X.Weight) >>
   mutate(Weight_kg = X.Weight/2.2046) >>
   head)

Unnamed: 0,name,Gender,Weight,Weight_kg
0,A-Bomb,Male,441.0,200.036288
1,Abe Sapien,Male,65.0,29.483807
2,Abin Sur,Male,90.0,40.823732
3,Abomination,Male,441.0,200.036288
4,Abraxas,Male,-99.0,-44.906105


#### `select`  expression in `sqlalchemy`

In [45]:
from sqlalchemy import select
m_stmt = (select_sql([Hero.name, 
                      Hero.gender, 
                      Hero.weight, 
                      (Hero.weight/2.2046).label('Weight_kg')]).
            limit(5))
pd.read_sql_query(m_stmt, con=engine)

Unnamed: 0,name,gender,weight,Weight_kg
0,A-Bomb,Male,441.0,200.036288
1,Abe Sapien,Male,65.0,29.483807
2,Abin Sur,Male,90.0,40.823732
3,Abomination,Male,441.0,200.036288
4,Abraxas,Male,,


####  `pyspark`

In [49]:
m_result = (df_spark.
              select(df_spark.name, 
                     df_spark.Gender, 
                     df_spark.Weight).
              withColumn('Weight_kg', df_spark.Weight/2.2046).
              take(5))
from more_pyspark import to_pandas
m_result >> to_pandas

Unnamed: 0,Gender,Weight,Weight_kg,name
0,Male,441.0,200.036288,A-Bomb
1,Male,65.0,29.483807,Abe Sapien
2,Male,90.0,40.823732,Abin Sur
3,Male,441.0,200.036288,Abomination
4,Male,-99.0,-44.906105,Abraxas


## Referencing a new column

Each framework provides a way to reference a new column.

* `pandas` + `dfply`: Use the `X` `Intention`
* `sqlalchemy`: Use `column` function with the label from `select`
* `pyspark`: Use the `col` function with the label from `withColumn`

## Example 4 - Converting Weight to kilograms and filter

Let's find all heroes with a weight under 100kg.

####  `pandas` + `dfply`

In [50]:
(heroes >>
   select_dfply(X.name, X.Gender, X.Weight) >>
   mutate(Weight_kg = X.Weight/2.2046) >>
   filter_by(X.Weight_kg < 100) >>
   head)

Unnamed: 0,name,Gender,Weight,Weight_kg
1,Abe Sapien,Male,65.0,29.483807
2,Abin Sur,Male,90.0,40.823732
4,Abraxas,Male,-99.0,-44.906105
5,Absorbing Man,Male,122.0,55.338837
6,Adam Monroe,Male,-99.0,-44.906105


#### `select`  expression in `sqlalchemy`

In [51]:
from sqlalchemy import column
new_col_stmt = (select_sql([Hero.name, 
                            Hero.gender, 
                            Hero.weight, 
                            (Hero.weight/2.2046).label('Weight_kg')]).
                  where(column('Weight_kg') < 100).
                  limit(5))
pd.read_sql_query(new_col_stmt, con=engine)

Unnamed: 0,name,gender,weight,Weight_kg
0,Abe Sapien,Male,65.0,29.483807
1,Abin Sur,Male,90.0,40.823732
2,Absorbing Man,Male,122.0,55.338837
3,Adam Strange,Male,88.0,39.916538
4,Agent 13,Female,61.0,27.669418


####  `pyspark`

In [52]:
from pyspark.sql.functions import col
new_col_result = (df_spark.
                   select(df_spark.name, df_spark.Gender, df_spark.Weight).
                   withColumn('Weight_kg', df_spark.Weight/2.2046).
                   where(col('Weight_kg') < 100 ).
                   take(5))

new_col_result >> to_pandas

Unnamed: 0,Gender,Weight,Weight_kg,name
0,Male,65.0,29.483807,Abe Sapien
1,Male,90.0,40.823732,Abin Sur
2,Male,-99.0,-44.906105,Abraxas
3,Male,122.0,55.338837,Absorbing Man
4,Male,-99.0,-44.906105,Adam Monroe


## <font color="red"> Exercise 2: Tall Heroes </font>

Create a query that

1. Selects the name, Gender, and Height columns
2. Compute the height in inches
3. Filters on height_in > 72

####  `pandas` + `dfply`

#### `query`  in `sqlalchemy`

#### `select`  expression in `sqlalchemy`

####  `pyspark`

# <font color="red"> TODO </font>

* More complicated mutations
    * Add many similar transforms with `**kwarg` unpacking (example below)

In [58]:
# Students will likely come up with a solution like this, we will fix this next
fix_parentheses = lambda df: (df >> 
                             mutate(
                                 ArtistBio = X.ArtistBio.str.replace('[()]', ''),
                                 Nationality = X.Nationality.str.replace('[()]', ''),
                                 BeginDate = X.BeginDate.str.replace('[()]', ''),
                                 EndDate = X.EndDate.str.replace('[()]', ''),
                                 Gender = X.Gender.str.replace('[()]', ''),
                             )
                            )
fix_parentheses(first_chuck).head()

NameError: name 'first_chuck' is not defined