## SparkSQL

In [1]:
import os
from os import path as filepath
from pyspark.sql import SQLContext

In [2]:
sqlContext = SQLContext(sc)

In [3]:
HDFS = "hdfs://{}".format(os.environ["HDFS"])
USER = filepath.join(HDFS, "user", "ec2-user")
FILE = filepath.join(USER, "sf_parking/sf_parking_clean.json")

In [13]:
parking = sqlContext.read.json(FILE)

In [14]:
parking.show(10)

+--------------------+--------+----------+--------------------+-----+----------+---------+------+----------+--------+
|             address|garorlot|landusetyp|          location_1|mccap|     owner|primetype|regcap|secondtype|valetcap|
+--------------------+--------+----------+--------------------+-----+----------+---------+------+----------+--------+
|      2110 Market St|       L|restaurant|[37.767378,-122.4...|    0|   Private|      PPA|    13|          |       0|
|         993 Potrero|       L|          |[37.757272,-122.4...|    0|     SFMTA|      PPA|    34|          |       0|
|601 Terry A Franc...|       L|          |[37.770135,-122.3...|    0|Port of SF|      PPA|    72|          |       0|
|   11 SOUTH VAN NESS|       G|          |[37.77415,-122.41...|    0|   Private|      PHO|   130|       CPO|       0|
|   101 CALIFORNIA ST|       G|          |[37.793243,-122.3...|    0|   Private|      PPA|   250|          |       0|
|        2000 POST ST|       G|          |[37.785078,-12

### Examine Schema and Change Data Types 

In [15]:
parking.printSchema()

root
 |-- address: string (nullable = true)
 |-- garorlot: string (nullable = true)
 |-- landusetyp: string (nullable = true)
 |-- location_1: struct (nullable = true)
 |    |-- latitude: string (nullable = true)
 |    |-- longitude: string (nullable = true)
 |    |-- needs_recoding: boolean (nullable = true)
 |-- mccap: string (nullable = true)
 |-- owner: string (nullable = true)
 |-- primetype: string (nullable = true)
 |-- regcap: string (nullable = true)
 |-- secondtype: string (nullable = true)
 |-- valetcap: string (nullable = true)



In [16]:
def convert_column(df, col, new_type):
    old_col = '%s_old' % col
    df = df.withColumnRenamed(col, old_col)
    df = df.withColumn(col, df[old_col].cast(new_type))
    df = df.drop(old_col)
    return df

int_columns = ['regcap', 'valetcap', 'mccap']

for col in int_columns:
    parking = convert_column(parking, col, 'int')

In [17]:
parking.printSchema()

root
 |-- address: string (nullable = true)
 |-- garorlot: string (nullable = true)
 |-- landusetyp: string (nullable = true)
 |-- location_1: struct (nullable = true)
 |    |-- latitude: string (nullable = true)
 |    |-- longitude: string (nullable = true)
 |    |-- needs_recoding: boolean (nullable = true)
 |-- owner: string (nullable = true)
 |-- primetype: string (nullable = true)
 |-- secondtype: string (nullable = true)
 |-- regcap: integer (nullable = true)
 |-- valetcap: integer (nullable = true)
 |-- mccap: integer (nullable = true)



In [18]:
parking.show(10)

+--------------------+--------+----------+--------------------+----------+---------+----------+------+--------+-----+
|             address|garorlot|landusetyp|          location_1|     owner|primetype|secondtype|regcap|valetcap|mccap|
+--------------------+--------+----------+--------------------+----------+---------+----------+------+--------+-----+
|      2110 Market St|       L|restaurant|[37.767378,-122.4...|   Private|      PPA|          |    13|       0|    0|
|         993 Potrero|       L|          |[37.757272,-122.4...|     SFMTA|      PPA|          |    34|       0|    0|
|601 Terry A Franc...|       L|          |[37.770135,-122.3...|Port of SF|      PPA|          |    72|       0|    0|
|   11 SOUTH VAN NESS|       G|          |[37.77415,-122.41...|   Private|      PHO|       CPO|   130|       0|    0|
|   101 CALIFORNIA ST|       G|          |[37.793243,-122.3...|   Private|      PPA|          |   250|       0|    0|
|        2000 POST ST|       G|          |[37.785078,-12

### Create and Query Temp Table

In [20]:
parking.registerTempTable("park")

In [21]:
aggr_by_type = sqlContext.sql("SELECT primetype, secondtype, count(1) AS count, round(avg(regcap), 0) AS avg_spaces " +
                              "FROM park " +
                              "GROUP BY primetype, secondtype " +
                              "HAVING trim(primetype) != '' " +
                              "ORDER BY count DESC")

aggr_by_type.show(10)

+---------+----------+-----+----------+
|primetype|secondtype|count|avg_spaces|
+---------+----------+-----+----------+
|      PPA|          |  462|     210.0|
|      PHO|          |  300|      69.0|
|      CPO|          |  163|      53.0|
|      CGO|          |   49|     135.0|
|      PPA|       PHO|   19|     178.0|
|      PPA|       CPO|    2|     263.0|
|      PHO|       CPO|    1|     130.0|
|      PPA|       RPO|    1|      87.0|
|      CPO|       PPA|    1|      12.0|
+---------+----------+-----+----------+



We can rewrite the SQL query in the previous example by chaining several simple DataFrame operations.

In [22]:
from pyspark.sql import functions as F

aggr_by_type = parking.select("primetype", "secondtype", "regcap") \
    .where("trim(primetype) != ''") \
    .groupBy("primetype", "secondtype") \
    .agg(
        F.count("*").alias("count"),
        F.round(F.avg("regcap"), 0).alias("avg_spaces")
        ) \
    .sort("count", ascending=False)

aggr_by_type.show(10)

+---------+----------+-----+----------+
|primetype|secondtype|count|avg_spaces|
+---------+----------+-----+----------+
|      PPA|          |  462|     210.0|
|      PHO|          |  300|      69.0|
|      CPO|          |  163|      53.0|
|      CGO|          |   49|     135.0|
|      PPA|       PHO|   19|     178.0|
|      PPA|       CPO|    2|     263.0|
|      PHO|       CPO|    1|     130.0|
|      PPA|       RPO|    1|      87.0|
|      CPO|       PPA|    1|      12.0|
+---------+----------+-----+----------+



### Using Describe and Crosstab to Summarize

In [23]:
parking.describe("regcap", "valetcap", "mccap").show()

+-------+------------------+------------------+------------------+
|summary|            regcap|          valetcap|             mccap|
+-------+------------------+------------------+------------------+
|  count|              1000|              1000|              1000|
|   mean|           137.294|             3.297|             0.184|
| stddev|361.05120902655824|22.624824279398823|1.9015151221485882|
|    min|                 0|                 0|                 0|
|    max|              9000|               430|                47|
+-------+------------------+------------------+------------------+



In [25]:
parking.stat.crosstab("owner", "primetype").show(10)

+-------------------+---+---+---+---+---+
|    owner_primetype|PPA|PHO|CPO|CGO|   |
+-------------------+---+---+---+---+---+
|         Port of SF|  7|  7|  0|  4|  0|
|               SFPD|  0|  3|  0|  6|  0|
|              SFMTA| 42| 14|  0|  0|  0|
|GG Bridge Authority|  2|  0|  0|  0|  0|
|               SFSU|  2|  6|  0|  0|  0|
|               SFRA|  2|  0|  0|  0|  0|
|                LHH|  0|  5|  0|  0|  0|
|                DMV|  0|  0|  1|  0|  0|
|           Caltrans|  0|  0|  0|  1|  0|
|           Presidio|  5|  1|  1|  2|  0|
+-------------------+---+---+---+---+---+
only showing top 10 rows



### Adding Neighborhood Name

Define another function that will take a “location_1” struct type and use Google’s Geocoding API to perform a lookup on the latitude and longitude to return the neighborhood name. 

In [26]:
import requests

def to_neighborhood(location):
    """
    Uses Google's Geocoding API to perform a reverse-lookup on latitude and
    longitude
    https://developers.google.com/maps/documentation/geocoding/
    intro#reverse-example
    """
    name = 'N/A'
    lat = location.latitude
    long = location.longitude

    r = requests.get(
        'https://maps.googleapis.com/maps/api/geocode/json?latlng=%s,%s' %(lat, long))

    if r.status_code == 200:
        content = r.json()
        # results is a list of matching places
        places = content['results']
        neighborhoods = [p['formatted_address'] for p in places if
        'neighborhood' in p['types']]

    if neighborhoods:
        # Addresses are formatted as Japantown, San Francisco, CA
        # so split on comma and just return neighborhood name
        name = neighborhoods[0].split(',')[0]

    return name

The `pyspark.sql.functions` module provides the `udf` function to register a user-defined function (UDF). We declare an inline UDF by passing UDF a callable Python function and the Spark SQL data type that corresponds to the return type. 

In this case, we are returning a string so we will use the StringType data type from `pyspark.sql.types`. Once registered, we can use the UDF to reformat the “location_1” column with a withColumn expression:

In [27]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

location_to_neighborhood=udf(to_neighborhood, StringType())

sfmta_parking = parking.filter(parking.owner == 'SFMTA') \
    .select("location_1", "primetype", "landusetyp","garorlot", "regcap", "valetcap", "mccap") \
    .withColumn("location_1",location_to_neighborhood("location_1")) \
    .sort("regcap", ascending=False)

sfmta_parking.show()

+---------------+---------+----------+--------+------+--------+-----+
|     location_1|primetype|landusetyp|garorlot|regcap|valetcap|mccap|
+---------------+---------+----------+--------+------+--------+-----+
|South of Market|      PPA|          |       G|  2585|       0|   47|
|            N/A|      PPA|          |       G|  1865|       0|    0|
|            N/A|      PPA|          |       G|  1095|       0|    0|
|            N/A|      PPA|          |       G|   985|       0|    0|
|     Tenderloin|      PPA|          |       G|   925|       0|    0|
|            N/A|      PPA|          |       G|   850|       0|    0|
|            N/A|      PPA|          |       G|   843|       0|    0|
|            N/A|      PPA|          |       G|   807|       0|    0|
|            N/A|      PPA|          |       G|   752|       0|    0|
|      Japantown|      PPA|          |       G|   747|       0|    0|
|      Chinatown|      PPA|          |       G|   700|       0|    0|
|            N/A|   