**Use of SQL in Spark: demonstrate data cleaning and merging**
- Set up pyspark and sqlcontext

In [2]:
# install pyspark on local machinery
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 67 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 60.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=28023bdfc3449b40c157e63ef4fbc043c0c909ae8563713f5b45da53bf8592fe
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [9]:
import pyspark
pyspark.__version__


'3.1.2'

In [10]:
try:
    print(sc)
except NameError:
    sc = pyspark.SparkContext('local[*]')

<SparkContext master=local[*] appName=pyspark-shell>


In [11]:
# Create a SQL context 
from pyspark.sql import SQLContext
try:
  print(sqlContext)
except NameError:
  sqlContext = SQLContext(sc)

<pyspark.sql.context.SQLContext object at 0x7f024cde9390>


In [13]:
from pyspark.sql import SparkSession
try:
  print(spark)
except NameError:
  spark = SparkSession(sc)

**Use california housing data set to demonstrate the simple SQL queries such as SELECT columns, JOIN for merging datasets, and WHERE to subsetting data**

In [15]:
# Load CSV file
df1 = spark.read.format('csv').load("california_housing_train.csv", header = True)
df1.show(5)

+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population|households|median_income|median_house_value|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|-114.310000|34.190000|         15.000000|5612.000000|   1283.000000|1015.000000|472.000000|     1.493600|      66900.000000|
|-114.470000|34.400000|         19.000000|7650.000000|   1901.000000|1129.000000|463.000000|     1.820000|      80100.000000|
|-114.560000|33.690000|         17.000000| 720.000000|    174.000000| 333.000000|117.000000|     1.650900|      85700.000000|
|-114.570000|33.640000|         14.000000|1501.000000|    337.000000| 515.000000|226.000000|     3.191700|      73400.000000|
|-114.570000|33.570000|         20.000000|1454.000000|    326.000000| 624.000000|262.000000|     1.925000|      65500.

In [16]:
# Register the dataframe df1 as temporary table to enable SQL commands
df1.registerTempTable("housing") 

**Run SQL queries to create a subset of dataset**

---
Select all columns and display rows that have `latitude` less than 34 and `longitude` greater than -115. Correctly make use of the `AND` operator.



In [18]:
data = sqlContext.sql("""
                         SELECT *
                         FROM housing
                         WHERE (latitude < 34 AND longitude > -115) """)

data.show(5)

+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population|households|median_income|median_house_value|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|-114.560000|33.690000|         17.000000| 720.000000|    174.000000| 333.000000|117.000000|     1.650900|      85700.000000|
|-114.570000|33.640000|         14.000000|1501.000000|    337.000000| 515.000000|226.000000|     3.191700|      73400.000000|
|-114.570000|33.570000|         20.000000|1454.000000|    326.000000| 624.000000|262.000000|     1.925000|      65500.000000|
|-114.580000|33.630000|         29.000000|1387.000000|    236.000000| 671.000000|239.000000|     3.343800|      74000.000000|
|-114.580000|33.610000|         25.000000|2907.000000|    680.000000|1841.000000|633.000000|     2.676800|      82400.

**Create a subset of data with condition on the `population` greater than or equal to 10000, for only `households` column.**

In [19]:
data2 = sqlContext.sql("""
                         SELECT households 
                         FROM housing
                         WHERE population >= 10000 """)

data2.show(5)

+-----------+
| households|
+-----------+
|4339.000000|
|4769.000000|
|3832.000000|
|5189.000000|
|3258.000000|
+-----------+
only showing top 5 rows




Load the US population, area and Abbreviations datasets as a temporary table. Data source: http://github.com/jakevdp/data-USstates/

In [20]:
!curl -O https://raw.githubusercontent.com/jakevdp/data-USstates/master/state-population.csv
!curl -O https://raw.githubusercontent.com/jakevdp/data-USstates/master/state-areas.csv
!curl -O https://raw.githubusercontent.com/jakevdp/data-USstates/master/state-abbrevs.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 57935  100 57935    0     0   342k      0 --:--:-- --:--:-- --:--:--  344k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   835  100   835    0     0   6139      0 --:--:-- --:--:-- --:--:--  6139
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   872  100   872    0     0   6140      0 --:--:-- --:--:-- --:--:--  6140


In [21]:
# register the tables for SQL commands
# 1. create a dataframe from the csv files (make use of .read() and .load() as shown above)
# 2. register the dataframe as a SQL table (again follow the steps given above.)

# code here
# Load dataset 

pop = spark.read.format('csv').load("state-population.csv", header = True)
area = spark.read.format('csv').load("state-areas.csv", header = True)
abbr = spark.read.format('csv').load("state-abbrevs.csv", header = True)

# Register

pop.registerTempTable("population") 
area.registerTempTable("area") 
abbr.registerTempTable("abbr") 

pop.show(5)
area.show(5)
abbr.show(5)

+------------+-------+----+----------+
|state/region|   ages|year|population|
+------------+-------+----+----------+
|          AL|under18|2012|   1117489|
|          AL|  total|2012|   4817528|
|          AL|under18|2010|   1130966|
|          AL|  total|2010|   4785570|
|          AL|under18|2011|   1125763|
+------------+-------+----+----------+
only showing top 5 rows

+----------+-------------+
|     state|area (sq. mi)|
+----------+-------------+
|   Alabama|        52423|
|    Alaska|       656425|
|   Arizona|       114006|
|  Arkansas|        53182|
|California|       163707|
+----------+-------------+
only showing top 5 rows

+----------+------------+
|     state|abbreviation|
+----------+------------+
|   Alabama|          AL|
|    Alaska|          AK|
|   Arizona|          AZ|
|  Arkansas|          AR|
|California|          CA|
+----------+------------+
only showing top 5 rows



# JOINS
join to merge the state-abbrevs with state-population on the state/region column in state-population table and on the abbreviation column in state-abbrevs table

In [22]:
prob5 = sqlContext.sql("""
                         SELECT *                             
                         FROM abbr
                         FULL OUTER JOIN population
                         ON abbr.abbreviation = population.`state/region`
                          """)

prob5.show(5)

+-------+------------+------------+-------+----+----------+
|  state|abbreviation|state/region|   ages|year|population|
+-------+------------+------------+-------+----+----------+
|Arizona|          AZ|          AZ|under18|2012|   1617149|
|Arizona|          AZ|          AZ|  total|2012|   6551149|
|Arizona|          AZ|          AZ|under18|2011|   1616353|
|Arizona|          AZ|          AZ|  total|2011|   6468796|
|Arizona|          AZ|          AZ|under18|2010|   1628563|
+-------+------------+------------+-------+----+----------+
only showing top 5 rows

