# SparkSQL and DataFrames 

In [5]:
from pyspark.sql import SparkSession

session = SparkSession.builder.getOrCreate()
session

## Creating DataFrames

In [161]:
import random

random.seed(42)

ids = range(12)

positions = [random.choice(['mechanic', 'sales', 'manager']) for id_ in ids]
positions

['manager',
 'mechanic',
 'mechanic',
 'manager',
 'sales',
 'mechanic',
 'mechanic',
 'mechanic',
 'manager',
 'mechanic',
 'manager',
 'manager']

In [190]:
df = session.createDataFrame(zip(ids, positions), schema=['id', 'position'])
df.show(5)

+---+--------+
| id|position|
+---+--------+
|  0| manager|
|  1|mechanic|
|  2|mechanic|
|  3| manager|
|  4|   sales|
+---+--------+
only showing top 5 rows



### From RDDs

In [13]:
rdd = session.sparkContext.textFile('coupon150720.csv')
rdd

coupon150720.csv MapPartitionsRDD[12] at textFile at NativeMethodAccessorImpl.java:0

In [14]:
rdd.take(5)

['79062005698500,1,MAA,AUH,9W,9W,56.79,USD,1,H,H,0526,150904,OK,IAF0',
 '79062005698500,2,AUH,CDG,9W,9W,84.34,USD,1,H,H,6120,150905,OK,IAF0',
 '79062005924069,1,CJB,MAA,9W,9W,60.0,USD,1,H,H,2768,150721,OK,IAA0',
 '79065668570385,1,DEL,DXB,9W,9W,160.63,USD,2,S,S,0546,150804,OK,INA0',
 '79065668737021,1,AUH,IXE,9W,9W,152.46,USD,1,V,V,0501,150803,OK,INA0']

In [15]:
split_rdd = rdd.map(lambda line: line.split(','))
split_rdd.take(1)

[['79062005698500',
  '1',
  'MAA',
  'AUH',
  '9W',
  '9W',
  '56.79',
  'USD',
  '1',
  'H',
  'H',
  '0526',
  '150904',
  'OK',
  'IAF0']]

In [17]:
new_df = session.createDataFrame(split_rdd)
new_df.show(3)

+--------------+---+---+---+---+---+-----+---+---+---+---+----+------+---+----+
|            _1| _2| _3| _4| _5| _6|   _7| _8| _9|_10|_11| _12|   _13|_14| _15|
+--------------+---+---+---+---+---+-----+---+---+---+---+----+------+---+----+
|79062005698500|  1|MAA|AUH| 9W| 9W|56.79|USD|  1|  H|  H|0526|150904| OK|IAF0|
|79062005698500|  2|AUH|CDG| 9W| 9W|84.34|USD|  1|  H|  H|6120|150905| OK|IAF0|
|79062005924069|  1|CJB|MAA| 9W| 9W| 60.0|USD|  1|  H|  H|2768|150721| OK|IAA0|
+--------------+---+---+---+---+---+-----+---+---+---+---+----+------+---+----+
only showing top 3 rows



### From csv files

In [21]:
# We can either read them directly into dataframes or read them as RDDs and transform that into a DataFrame
# RDDs are usfeul when working with unstructured data
df_from_csv = session.read.csv('coupon150720.csv')
df_from_csv.show(3)

+--------------+---+---+---+---+---+-----+---+---+---+----+----+------+----+----+
|           _c0|_c1|_c2|_c3|_c4|_c5|  _c6|_c7|_c8|_c9|_c10|_c11|  _c12|_c13|_c14|
+--------------+---+---+---+---+---+-----+---+---+---+----+----+------+----+----+
|79062005698500|  1|MAA|AUH| 9W| 9W|56.79|USD|  1|  H|   H|0526|150904|  OK|IAF0|
|79062005698500|  2|AUH|CDG| 9W| 9W|84.34|USD|  1|  H|   H|6120|150905|  OK|IAF0|
|79062005924069|  1|CJB|MAA| 9W| 9W| 60.0|USD|  1|  H|   H|2768|150721|  OK|IAA0|
+--------------+---+---+---+---+---+-----+---+---+---+----+----+------+----+----+
only showing top 3 rows



### SQL + csv

In [23]:
df_with_sql = session.sql('SELECT * FROM csv.`coupon150720.csv` where _c2="MAA"')
df_with_sql.show(3)

+--------------+---+---+---+---+---+-----+---+---+---+----+----+------+----+----+
|           _c0|_c1|_c2|_c3|_c4|_c5|  _c6|_c7|_c8|_c9|_c10|_c11|  _c12|_c13|_c14|
+--------------+---+---+---+---+---+-----+---+---+---+----+----+------+----+----+
|79062005698500|  1|MAA|AUH| 9W| 9W|56.79|USD|  1|  H|   H|0526|150904|  OK|IAF0|
|79062005305018|  1|MAA|BOM| 9W| 9W| 48.5|USD|  1|  O|   O|0460|150821|  OK|IAE0|
|79062005348349|  1|MAA|BOM| 9W| 9W| 56.0|USD|  1|  V|   V|0466|150808|  OK|IAE0|
+--------------+---+---+---+---+---+-----+---+---+---+----+----+------+----+----+
only showing top 3 rows



## Basic operations with DataFrames

In [28]:
df.show(2)

+---+--------+
| id|position|
+---+--------+
|  0| manager|
|  1|mechanic|
+---+--------+
only showing top 2 rows



### Filtering and selecting

In [27]:
# where can also be used instead of filter
filtered = df.filter(df['id'] > 5)
filtered.show()

+---+--------+
| id|position|
+---+--------+
|  6|mechanic|
|  7|mechanic|
|  8| manager|
|  9|mechanic|
| 10| manager|
| 11| manager|
+---+--------+



### Exercise

Extract all employee ids which correspond to managers

In [33]:
col = df['id']

In [36]:
df.where(df['position'] == 'manager').show()

+---+--------+
| id|position|
+---+--------+
|  0| manager|
|  3| manager|
|  8| manager|
| 10| manager|
| 11| manager|
+---+--------+



In [37]:
df.where(df['position'] == 'manager').select(col).show()

+---+
| id|
+---+
|  0|
|  3|
|  8|
| 10|
| 11|
+---+



### Adding columns

In [38]:
from pyspark.sql import functions

df.select(functions.sqrt('id')).show(3)

+------------------+
|          SQRT(id)|
+------------------+
|               0.0|
|               1.0|
|1.4142135623730951|
+------------------+
only showing top 3 rows



In [39]:
# a new df needs to be creating when adding new columns
df2 = df.select('id',
                'position',
                functions.sqrt('id'))

df2.show(3)

+---+--------+------------------+
| id|position|          SQRT(id)|
+---+--------+------------------+
|  0| manager|               0.0|
|  1|mechanic|               1.0|
|  2|mechanic|1.4142135623730951|
+---+--------+------------------+
only showing top 3 rows



In [40]:
def odd_letters(word):
    return word[::2]
    
odd_letters('manager')

'mngr'

In [43]:
# to use a function we need to create an object first
odd_udf = functions.udf(odd_letters)
df.select(odd_udf('position')).show(3)

+---------------------+
|odd_letters(position)|
+---------------------+
|                 mngr|
|                 mcai|
|                 mcai|
+---------------------+
only showing top 3 rows



### Exercise

Create a 'salary' field in our df. make it 30000 for mechanics, 40000 for salespeople and 70000 for managers.

In [44]:
def salary_from(position): 
    salaries = {'manager': 70000, 'sales' : 40000, 'mechanic' : 30000}
    return salaries.get(position)

salary_from('manager')

70000

If we have a column that is not the desired type, we can convert it with `cast`.

In [58]:
salary_udf = functions.udf(salary_from)

exercise_result = df.select('id',
                            'position',
                            salary_udf('position').alias('salary').cast("integer"))

exercise_result.show(3)

+---+--------+------+
| id|position|salary|
+---+--------+------+
|  0| manager| 70000|
|  1|mechanic| 30000|
|  2|mechanic| 30000|
+---+--------+------+
only showing top 3 rows



### Summary statistics

In [62]:
exercise_result.stat.corr('id', 'salary')

0.20742413513940736

In [63]:
exercise_result.stat.cov('id', 'salary')

14999.999999999995

### .crosstab()

In [211]:
location_udf = functions.udf(lambda : random.choice(['Madrid', 'Barcelona']))
df4 = exercise_result.withColumn('location', location_udf()).cache() #cache() saves df up to this point

In [214]:
df4.show(5)

+---+--------+------+---------+
| id|position|salary| location|
+---+--------+------+---------+
|  0| manager| 70000|   Madrid|
|  1|mechanic| 30000|   Madrid|
|  2|mechanic| 30000|Barcelona|
|  3| manager| 70000|   Madrid|
|  4|   sales| 40000|Barcelona|
+---+--------+------+---------+
only showing top 5 rows



In [216]:
df4.crosstab('position', 'location').show()

+-----------------+---------+------+
|position_location|Barcelona|Madrid|
+-----------------+---------+------+
|            sales|        1|     0|
|          manager|        2|     3|
|         mechanic|        3|     3|
+-----------------+---------+------+



### Grouping

In [217]:
df4.groupby('location').mean('salary').show()

+---------+-----------+
| location|avg(salary)|
+---------+-----------+
|   Madrid|    50000.0|
|Barcelona|    45000.0|
+---------+-----------+



In [218]:
stats = df4.groupby('location').agg(functions.mean('salary').alias('average'), 
                                    functions.stddev_pop('salary').alias('stddev')
                                   )
stats.show()

+---------+-------+------------------+
| location|average|            stddev|
+---------+-------+------------------+
|   Madrid|50000.0|           20000.0|
|Barcelona|45000.0|18027.756377319947|
+---------+-------+------------------+



### Intersections

In [219]:
random.seed(42)

data = list(zip([10, 11, 9, 10, 9],
                [5000, 10000, 2000, 0, 1000],
                [random.choice(['Madrid', 'Barcelona', 'Sevilla']) for _ in range(5)]))

new_df = session.createDataFrame(data, schema=['id', 'bonus', 'location'])
new_df.show()

+---+-----+---------+
| id|bonus| location|
+---+-----+---------+
| 10| 5000|  Sevilla|
| 11|10000|   Madrid|
|  9| 2000|   Madrid|
| 10|    0|  Sevilla|
|  9| 1000|Barcelona|
+---+-----+---------+



In [220]:
with_bonuses = df4.join(new_df, on=['id', 'location'], how='left')
with_bonuses.show(5)

+---+---------+--------+------+-----+
| id| location|position|salary|bonus|
+---+---------+--------+------+-----+
|  9|   Madrid|mechanic| 30000| 2000|
|  5|Barcelona|mechanic| 30000| null|
|  0|   Madrid| manager| 70000| null|
|  8|   Madrid| manager| 70000| null|
|  7|   Madrid|mechanic| 30000| null|
+---+---------+--------+------+-----+
only showing top 5 rows



### Exercise

Calculate the [z-score](http://www.statisticshowto.com/probability-and-statistics/z-score/) of each employee's salary for their location

In [174]:
from pyspark.sql.functions import round

df4.show(3)

+---+--------+------+---------+
| id|position|salary| location|
+---+--------+------+---------+
|  0| manager| 70000|Barcelona|
|  1|mechanic| 30000|   Madrid|
|  2|mechanic| 30000|Barcelona|
+---+--------+------+---------+
only showing top 3 rows



In [221]:
# calculate the mean and std of salary for each location 
stats = df4.groupby('location').agg(
    round(functions.mean('salary'), 2).alias('avg_salary'),
    round(functions.stddev_pop('salary'), 2).alias('std_salary')
    )

stats.show()

+---------+----------+----------+
| location|avg_salary|std_salary|
+---------+----------+----------+
|   Madrid|   50000.0|   20000.0|
|Barcelona|   45000.0|  18027.76|
+---------+----------+----------+



In [223]:
# annotate each employee with the stats corresponding to their location
df5 = df4.join(stats, on='location', how='left')
df5.show(3)

+---------+---+--------+------+----------+----------+
| location| id|position|salary|avg_salary|std_salary|
+---------+---+--------+------+----------+----------+
|   Madrid|  0| manager| 70000|   50000.0|   20000.0|
|   Madrid|  1|mechanic| 30000|   50000.0|   20000.0|
|Barcelona|  2|mechanic| 30000|   45000.0|  18027.76|
+---------+---+--------+------+----------+----------+
only showing top 3 rows



In [226]:
result = df5.select('id',
                    'position',
                    'location',
                    'salary',
                    round(((df5['salary'] - df5['avg_salary']) / df5['std_salary']), 3).alias('z-score'))

result.show(5)

+---+--------+---------+------+-------+
| id|position| location|salary|z-score|
+---+--------+---------+------+-------+
|  0| manager|   Madrid| 70000|    1.0|
|  1|mechanic|   Madrid| 30000|   -1.0|
|  2|mechanic|Barcelona| 30000| -0.832|
|  3| manager|   Madrid| 70000|    1.0|
|  4|   sales|Barcelona| 40000| -0.277|
+---+--------+---------+------+-------+
only showing top 5 rows



## SQL querying

In [237]:
df6.registerTempTable('df6_table')

session.sql('SELECT * FROM df6_table WHERE location IS NOT NULL AND position="manager"').show()

+---+--------+------+---------+
| id|position|salary| location|
+---+--------+------+---------+
|  0| manager| 70000|   Madrid|
|  3| manager| 70000|   Madrid|
|  8| manager| 70000|   Madrid|
| 10| manager| 70000|Barcelona|
| 11| manager| 70000|Barcelona|
+---+--------+------+---------+



In [238]:
session.sql("""SELECT location, 
                      avg(salary) as avg_salary, 
                      stddev_pop(salary) AS std_salary 
               FROM df6_table 
               GROUP BY location""").show()

+---------+----------+------------------+
| location|avg_salary|        std_salary|
+---------+----------+------------------+
|   Madrid|   50000.0|           20000.0|
|     null| 1200000.0|               0.0|
|Barcelona|   45000.0|18027.756377319947|
|    Haiti| 1500000.0|               0.0|
+---------+----------+------------------+



## Interoperation with Pandas

In [246]:
df5.show(3)

+---------+---+--------+------+----------+----------+
| location| id|position|salary|avg_salary|std_salary|
+---------+---+--------+------+----------+----------+
|   Madrid|  0| manager| 70000|   50000.0|   20000.0|
|   Madrid|  1|mechanic| 30000|   50000.0|   20000.0|
|Barcelona|  2|mechanic| 30000|   45000.0|  18027.76|
+---------+---+--------+------+----------+----------+
only showing top 3 rows



In [241]:
pd_df = df5.toPandas()
pd_df.head(5)

Unnamed: 0,location,id,position,salary,avg_salary,std_salary
0,Madrid,0,manager,70000,50000.0,20000.0
1,Madrid,1,mechanic,30000,50000.0,20000.0
2,Barcelona,2,mechanic,30000,45000.0,18027.76
3,Madrid,3,manager,70000,50000.0,20000.0
4,Barcelona,4,sales,40000,45000.0,18027.76


### Writing out


In [248]:
# we can write out our PySpark dataframe as a distributed file
df5.write.csv('df5')

## Exercise

Repeat the exercise from the previous notebook, but this time with DataFrames.

Get stats for all tickets with destination MAD from `coupons150720.csv`.

You will need to extract ticket amounts with destination MAD, and then calculate:

1. Total ticket amounts per origin
2. Top 10 airlines by average amount

1) Total ticket amounts per origin

In [250]:
raw_coupons = session.read.csv('coupon150720.csv')
raw_coupons.show(5)

+--------------+---+---+---+---+---+------+---+---+---+----+----+------+----+----+
|           _c0|_c1|_c2|_c3|_c4|_c5|   _c6|_c7|_c8|_c9|_c10|_c11|  _c12|_c13|_c14|
+--------------+---+---+---+---+---+------+---+---+---+----+----+------+----+----+
|79062005698500|  1|MAA|AUH| 9W| 9W| 56.79|USD|  1|  H|   H|0526|150904|  OK|IAF0|
|79062005698500|  2|AUH|CDG| 9W| 9W| 84.34|USD|  1|  H|   H|6120|150905|  OK|IAF0|
|79062005924069|  1|CJB|MAA| 9W| 9W|  60.0|USD|  1|  H|   H|2768|150721|  OK|IAA0|
|79065668570385|  1|DEL|DXB| 9W| 9W|160.63|USD|  2|  S|   S|0546|150804|  OK|INA0|
|79065668737021|  1|AUH|IXE| 9W| 9W|152.46|USD|  1|  V|   V|0501|150803|  OK|INA0|
+--------------+---+---+---+---+---+------+---+---+---+----+----+------+----+----+
only showing top 5 rows



In [255]:
coupons = raw_coupons.select(raw_coupons['_c0'].alias('tkt_number'),
                             raw_coupons['_c1'].alias('cpn_number').cast("integer"),
                             raw_coupons['_c2'].alias('origin'),
                             raw_coupons['_c3'].alias('destination'),
                             raw_coupons['_c4'].alias('airline'),
                             raw_coupons['_c6'].alias('amount').cast("float")
                            )
coupons.show(3)

+--------------+----------+------+-----------+-------+------+
|    tkt_number|cpn_number|origin|destination|airline|amount|
+--------------+----------+------+-----------+-------+------+
|79062005698500|         1|   MAA|        AUH|     9W| 56.79|
|79062005698500|         2|   AUH|        CDG|     9W| 84.34|
|79062005924069|         1|   CJB|        MAA|     9W|  60.0|
+--------------+----------+------+-----------+-------+------+
only showing top 3 rows



In [257]:
# same action using SQL
raw_coupons.registerTempTable('raw_coupons')

coupons = session.sql('''SELECT 
                            CAST(_c0 AS LONG) AS tkt_number,
                            CAST(_c1 AS INT) AS cpn_number,
                            _c2 AS origin,
                            _c3 AS destination,
                            _c4 AS airline,
                            CAST(_c6 AS FLOAT) AS amount
                         FROM raw_coupons''')

coupons.show(3)

+--------------+----------+------+-----------+-------+------+
|    tkt_number|cpn_number|origin|destination|airline|amount|
+--------------+----------+------+-----------+-------+------+
|79062005698500|         1|   MAA|        AUH|     9W| 56.79|
|79062005698500|         2|   AUH|        CDG|     9W| 84.34|
|79062005924069|         1|   CJB|        MAA|     9W|  60.0|
+--------------+----------+------+-----------+-------+------+
only showing top 3 rows



In [258]:
coupons[coupons['destination'] == 'MAD'].groupby('origin').sum('amount').show(3)

+------+------------------+
|origin|       sum(amount)|
+------+------------------+
|   PMI| 40547.17005729675|
|   YUL|284.44000244140625|
|   HEL| 8195.760055541992|
+------+------------------+
only showing top 3 rows



2) Top 10 Airlines by average amount



In [261]:
airlines = coupons[coupons['destination'] == 'MAD'].groupby('airline').mean('amount')
airlines.show(3)

+-------+-----------------+
|airline|      avg(amount)|
+-------+-----------------+
|     PC|64.16499996185303|
|     AZ|68.80148202401621|
|     UA|291.9203567504883|
+-------+-----------------+
only showing top 3 rows



In [262]:
airlines.sort('avg(amount)', ascending=False).show(10)

+-------+------------------+
|airline|       avg(amount)|
+-------+------------------+
|     V0| 5418.098665364583|
|     AC|  740.619985961914|
|     KE| 688.5261500431941|
|     SV|  553.174259916265|
|     OB| 535.5044420030382|
|     AR| 513.5304808843704|
|     AV| 450.1950941518613|
|     AM| 440.7342111687911|
|     C2| 397.8699951171875|
|     LA|379.95370341954606|
+-------+------------------+
only showing top 10 rows

