### Apache Spark with Python: Running SQL Queries on Spark DataFrames

This notebook is designed to introduce some basic concepts and help get you familiar with using Spark in Python.

In this notebook, we will load and explore the titanic dataset. Specifically, this tutorial covers:

  - Loading data in memory
  - Creating SQLContext
  - Creating Spark DataFrame
  - Group data by columns
  - Operating on columns
  - Running SQL Queries from a Spark DataFrame

##### by John Ryan using IBM Data Science Workbench.

#### Load packages

In [47]:
#First, we import some Python packages that we need:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import udf
from pyspark.mllib.linalg import Vectors
from pyspark.ml.param import Param, Params
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.stat import Statistics
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from IPython.display import display
from ipywidgets import interact
import sys
import numpy as np
import pandas as pd
import time
import datetime
import matplotlib.pyplot as plt
import os.path
%matplotlib inline

#### Loading Data

In [48]:
import pandas as pd
data = pd.read_csv('/resources/data/titanic.csv')

In [49]:
#view the first four rows of the pandas dataframe 
data.head()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S


**Pre- processing: Missing Value Inputation**

Counting and filling the missing values for each column.

In [68]:
#Data Munging
#Checking for missing values in the data
data.apply(lambda x: sum(x.isnull()),axis=0)

PassengerId      0
Survived         0
Pclass           0
Name             0
Sex              0
Age            177
SibSp            0
Parch            0
Ticket           0
Fare             0
Cabin          687
Embarked         2
dtype: int64

In [72]:
#Display of the total Education value counts
data['Cabin'].value_counts()

C23 C25 C27        4
G6                 4
B96 B98            4
D                  3
C22 C26            3
E101               3
F2                 3
F33                3
B57 B59 B63 B66    2
C68                2
B58 B60            2
E121               2
D20                2
E8                 2
E44                2
B77                2
C65                2
D26                2
E24                2
E25                2
B20                2
C93                2
D33                2
E67                2
D35                2
D36                2
C52                2
F4                 2
C125               2
C124               2
                  ..
F G63              1
A6                 1
D45                1
D6                 1
D56                1
C101               1
C54                1
D28                1
D37                1
B102               1
D30                1
E17                1
E58                1
F E69              1
D10 D12            1
E50                1
A14          

In [74]:
#Fill in the NaN values with the most common cabin type in the data
data['Cabin'].fillna('G6',inplace=True)

In [75]:
#Display of the total Embarked value counts
data['Embarked'].value_counts()

S    644
C    168
Q     77
Name: Embarked, dtype: int64

In [76]:
#Fill in the NaN values with the most common Embarked type in the data
data['Embarked'].fillna('s',inplace=True)

In [78]:
#Display of the total Age value counts
data['Age'].value_counts()

24.00    30
22.00    27
18.00    26
30.00    25
28.00    25
19.00    25
21.00    24
25.00    23
36.00    22
29.00    20
32.00    18
27.00    18
35.00    18
26.00    18
16.00    17
31.00    17
23.00    15
34.00    15
33.00    15
20.00    15
39.00    14
17.00    13
40.00    13
42.00    13
45.00    12
38.00    11
50.00    10
2.00     10
4.00     10
47.00     9
         ..
28.50     2
40.50     2
63.00     2
13.00     2
10.00     2
45.50     2
70.00     2
30.50     2
71.00     2
59.00     2
57.00     2
55.00     2
0.75      2
64.00     2
23.50     1
14.50     1
0.67      1
53.00     1
0.92      1
0.42      1
70.50     1
36.50     1
80.00     1
66.00     1
74.00     1
12.00     1
55.50     1
34.50     1
24.50     1
20.50     1
Name: Age, dtype: int64

In [79]:
data['Age'].fillna(24.00)

0      22.0
1      38.0
2      26.0
3      35.0
4      35.0
5      24.0
6      54.0
7       2.0
8      27.0
9      14.0
10      4.0
11     58.0
12     20.0
13     39.0
14     14.0
15     55.0
16      2.0
17     24.0
18     31.0
19     24.0
20     35.0
21     34.0
22     15.0
23     28.0
24      8.0
25     38.0
26     24.0
27     19.0
28     24.0
29     24.0
       ... 
861    21.0
862    48.0
863    24.0
864    24.0
865    42.0
866    27.0
867    31.0
868    24.0
869     4.0
870    26.0
871    47.0
872    33.0
873    47.0
874    28.0
875    15.0
876    20.0
877    19.0
878    24.0
879    56.0
880    25.0
881    33.0
882    22.0
883    28.0
884    25.0
885    39.0
886    27.0
887    19.0
888    24.0
889    26.0
890    32.0
Name: Age, dtype: float64

In [84]:
#fill missing values with the mean
data = data.fillna(data.mean())

#### Checking for missing values in the data

In [85]:
#Checking for missing values in the data
data.apply(lambda x: sum(x.isnull()),axis=0)

PassengerId    0
Survived       0
Pclass         0
Name           0
Sex            0
Age            0
SibSp          0
Parch          0
Ticket         0
Fare           0
Cabin          0
Embarked       0
dtype: int64

 #### Create a Spark DataFrame
 
 In order to work with Spark Data Frames we need to initialize SQLContext

In [86]:
# the spark SQL Context is created using SQLContext(sc)
sqlContext = SQLContext(sc)

In [87]:
#With the combination of the SQLContext and the pandas dataframe we can now create the 
#spark dataframe
sdata = sqlContext.createDataFrame(data)
sdata.printSchema()#prints out the data schema for the new data frame

root
 |-- PassengerId: long (nullable = true)
 |-- Survived: long (nullable = true)
 |-- Pclass: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: long (nullable = true)
 |-- Parch: long (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



__Selecting Data Columns__

In [88]:
#Taking a look at some of the columns
sdata.select('Fare','Ticket', 'Survived').show()

+-------+----------------+--------+
|   Fare|          Ticket|Survived|
+-------+----------------+--------+
|   7.25|       A/5 21171|       0|
|71.2833|        PC 17599|       1|
|  7.925|STON/O2. 3101282|       1|
|   53.1|          113803|       1|
|   8.05|          373450|       0|
| 8.4583|          330877|       0|
|51.8625|           17463|       0|
| 21.075|          349909|       0|
|11.1333|          347742|       1|
|30.0708|          237736|       1|
|   16.7|         PP 9549|       1|
|  26.55|          113783|       1|
|   8.05|       A/5. 2151|       0|
| 31.275|          347082|       0|
| 7.8542|          350406|       0|
|   16.0|          248706|       1|
| 29.125|          382652|       0|
|   13.0|          244373|       1|
|   18.0|          345763|       0|
|  7.225|            2649|       1|
+-------+----------------+--------+
only showing top 20 rows



__Grouping by and Aggregation__

When using the sql group by command we can dig further into the data and group all titanic survivers and non survivers by the average fair paid.  

In [89]:
#Grouping by survivor column and aggregating the fare column to return the average fare.
sdata.groupby(['Survived'])\
.agg({"Fare": "AVG"})\
.show(5)

+--------+-----------------+
|Survived|        avg(Fare)|
+--------+-----------------+
|       0| 22.1178868852459|
|       1|48.39540760233918|
+--------+-----------------+



In [90]:
#using sql group by command we can dig further into the data to aggregate and sort. 
sdata.groupby(['Survived'])\
.agg({"Fare": "count"})\
.sort("count(Fare)", ascending=True)\
.show(5)

+--------+-----------+
|Survived|count(Fare)|
+--------+-----------+
|       1|        342|
|       0|        549|
+--------+-----------+



__SQL Queries using Spark__

In [91]:
#Step 1 convert spark dataframe to a table
sdata.registerTempTable("titanic")

In [92]:
#use the SQL method part of SQLcontext to query the table
query1 = sqlContext.sql("SELECT Ticket FROM titanic WHERE Fare < 1000")
query1.show()

+----------------+
|          Ticket|
+----------------+
|       A/5 21171|
|        PC 17599|
|STON/O2. 3101282|
|          113803|
|          373450|
|          330877|
|           17463|
|          349909|
|          347742|
|          237736|
|         PP 9549|
|          113783|
|       A/5. 2151|
|          347082|
|          350406|
|          248706|
|          382652|
|          244373|
|          345763|
|            2649|
+----------------+
only showing top 20 rows

