#Intro To Spark SQL

In [1]:
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType

##pyspark sessions automatically create a spark context, and sql context


In [3]:
dir(sc)

['PACKAGE_EXTENSIONS',
 '__class__',
 '__delattr__',
 '__dict__',
 '__doc__',
 '__enter__',
 '__exit__',
 '__format__',
 '__getattribute__',
 '__getnewargs__',
 '__hash__',
 '__init__',
 '__module__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_accumulatorServer',
 '_active_spark_context',
 '_batchSize',
 '_callsite',
 '_checkpointFile',
 '_conf',
 '_dictToJavaMap',
 '_do_init',
 '_ensure_initialized',
 '_gateway',
 '_getJavaStorageLevel',
 '_initialize_context',
 '_javaAccumulator',
 '_jsc',
 '_jvm',
 '_lock',
 '_next_accum_id',
 '_pickled_broadcast_vars',
 '_python_includes',
 '_temp_dir',
 '_unbatched_serializer',
 '_writeToFile',
 'accumulator',
 'addFile',
 'addPyFile',
 'appName',
 'binaryFiles',
 'binaryRecords',
 'broadcast',
 'cancelAllJobs',
 'cancelJobGroup',
 'clearFiles',
 'defaultMinPartitions',
 'defaultParallelism',
 'dump_profiles',
 'environment',
 'getLocalProperty',
 'ha

In [4]:
dir(sqlCtx)

['__class__',
 '__delattr__',
 '__dict__',
 '__doc__',
 '__format__',
 '__getattribute__',
 '__hash__',
 '__init__',
 '__module__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_get_hive_ctx',
 '_inferSchema',
 '_jsc',
 '_jvm',
 '_sc',
 '_scala_SQLContext',
 '_ssql_ctx',
 'applySchema',
 'cacheTable',
 'clearCache',
 'createDataFrame',
 'createExternalTable',
 'getConf',
 'inferSchema',
 'jsonFile',
 'jsonRDD',
 'load',
 'parquetFile',
 'registerDataFrameAsTable',
 'registerFunction',
 'setConf',
 'sql',
 'table',
 'tableNames',
 'tables',
 'uncacheTable']

##1. Working with dataframes


load a json file directly into a dataframe

In [5]:
reviews = sqlCtx.jsonFile("/Users/elliottcordo/Projects/Caserta/spark-techcon15/data/yelp/yelp_academic_dataset_review.json")

how many partitions do we have?

In [6]:
reviews.rdd.getNumPartitions()

43

let's repartition

In [7]:
reviews = reviews.repartition(2)
reviews.rdd.getNumPartitions()

2

let's see what the schema looks like

In [8]:
reviews.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: long (nullable = true)
 |-- text: string (nullable = true)
 |-- type: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- votes: struct (nullable = true)
 |    |-- cool: long (nullable = true)
 |    |-- funny: long (nullable = true)
 |    |-- useful: long (nullable = true)



take a sample of 5 rows

In [9]:
reviews.take(5)

[Row(business_id=u'vcNAWiLM4dR7D2nwwJ7nCA', date=u'2007-05-17', review_id=u'15SdjuK7DmYqUAj6rjGowg', stars=5, text=u"dr. goldberg offers everything i look for in a general practitioner.  he's nice and easy to talk to without being patronizing; he's always on time in seeing his patients; he's affiliated with a top-notch hospital (nyu) which my parents have explained to me is very important in case something happens and you need surgery; and you can get referrals to see specialists without having to see him first.  really, what more do you need?  i'm sitting here trying to think of any complaints i have about him, but i'm really drawing a blank.", type=u'review', user_id=u'Xqd0DzHaiyRqVH3WRG7hzg', votes=Row(cool=1, funny=0, useful=2)),
 Row(business_id=u'vcNAWiLM4dR7D2nwwJ7nCA', date=u'2012-02-14', review_id=u'-TsVN230RCkLYKBeLsuz7A', stars=4, text=u"Dr. Goldberg has been my doctor for years and I like him.  I've found his office to be fairly efficient.  Today I actually got to see the d

take a sample of 5 rows where users have voted a business "cool"

In [13]:
reviews.filter("votes.cool = 1").take(5)

[Row(business_id=u'vcNAWiLM4dR7D2nwwJ7nCA', date=u'2007-05-17', review_id=u'15SdjuK7DmYqUAj6rjGowg', stars=5, text=u"dr. goldberg offers everything i look for in a general practitioner.  he's nice and easy to talk to without being patronizing; he's always on time in seeing his patients; he's affiliated with a top-notch hospital (nyu) which my parents have explained to me is very important in case something happens and you need surgery; and you can get referrals to see specialists without having to see him first.  really, what more do you need?  i'm sitting here trying to think of any complaints i have about him, but i'm really drawing a blank.", type=u'review', user_id=u'Xqd0DzHaiyRqVH3WRG7hzg', votes=Row(cool=1, funny=0, useful=2)),
 Row(business_id=u'vcNAWiLM4dR7D2nwwJ7nCA', date=u'2012-02-14', review_id=u'-TsVN230RCkLYKBeLsuz7A', stars=4, text=u"Dr. Goldberg has been my doctor for years and I like him.  I've found his office to be fairly efficient.  Today I actually got to see the d

let's define a udf to use in our dataframe operation

In [14]:
calc_weight = udf(lambda votes, stars: stars if votes == 1 else 0, IntegerType())

create a new dataframe using this udf

In [15]:
cool_business = reviews.select("business_id", "votes.cool", "stars", calc_weight("votes.cool", "stars").alias("cool_weight"))

cool_business.take(5)

[Row(business_id=u'vcNAWiLM4dR7D2nwwJ7nCA', cool=1, stars=5, cool_weight=5),
 Row(business_id=u'vcNAWiLM4dR7D2nwwJ7nCA', cool=1, stars=4, cool_weight=4),
 Row(business_id=u'vcNAWiLM4dR7D2nwwJ7nCA', cool=1, stars=4, cool_weight=4),
 Row(business_id=u'vcNAWiLM4dR7D2nwwJ7nCA', cool=0, stars=5, cool_weight=0),
 Row(business_id=u'vcNAWiLM4dR7D2nwwJ7nCA', cool=0, stars=1, cool_weight=0)]

group by and sum

In [16]:
cool_business.select("business_id", "cool_weight").groupBy("business_id").sum("cool_weight").take(5)

[Row(business_id=u'05l84gPaZX2IWaIaoAq3hg', SUM(cool_weight#22)=5),
 Row(business_id=u'rLqvHfvtojxd_DtM9kqw8g', SUM(cool_weight#22)=8),
 Row(business_id=u'uy8jqFcLAA-dve66REHmRg', SUM(cool_weight#22)=64),
 Row(business_id=u'ZCE_ZHlZGpWPGICWpGNrdQ', SUM(cool_weight#22)=62),
 Row(business_id=u'UxiSHVZxMdey7vRwm1fQyA', SUM(cool_weight#22)=49)]

##2. Lets add a second dataframe

In [17]:
business = sqlCtx.jsonFile("/Users/elliottcordo/Projects/Caserta/spark-techcon15/data/yelp/yelp_academic_dataset_business.json")

lets cache business in memory

In [18]:
business.cache()

DataFrame[attributes: struct<Accepts Credit Cards:string,Accepts Insurance:boolean,Ages Allowed:string,Alcohol:string,Ambience:struct<casual:boolean,classy:boolean,divey:boolean,hipster:boolean,intimate:boolean,romantic:boolean,touristy:boolean,trendy:boolean,upscale:boolean>,Attire:string,BYOB:boolean,BYOB/Corkage:string,By Appointment Only:boolean,Caters:boolean,Coat Check:boolean,Corkage:boolean,Delivery:boolean,Dietary Restrictions:struct<dairy-free:boolean,gluten-free:boolean,halal:boolean,kosher:boolean,soy-free:boolean,vegan:boolean,vegetarian:boolean>,Dogs Allowed:boolean,Drive-Thru:boolean,Good For:struct<breakfast:boolean,brunch:boolean,dessert:boolean,dinner:boolean,latenight:boolean,lunch:boolean>,Good For Dancing:boolean,Good For Groups:boolean,Good For Kids:boolean,Good for Kids:boolean,Hair Types Specialized In:struct<africanamerican:boolean,asian:boolean,coloring:boolean,curly:boolean,extensions:boolean,kids:boolean,perms:boolean,straightperms:boolean>,Happy Hour:boolea

reviews the schema

In [19]:
business.printSchema()

root
 |-- attributes: struct (nullable = true)
 |    |-- Accepts Credit Cards: string (nullable = true)
 |    |-- Accepts Insurance: boolean (nullable = true)
 |    |-- Ages Allowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: struct (nullable = true)
 |    |    |-- casual: boolean (nullable = true)
 |    |    |-- classy: boolean (nullable = true)
 |    |    |-- divey: boolean (nullable = true)
 |    |    |-- hipster: boolean (nullable = true)
 |    |    |-- intimate: boolean (nullable = true)
 |    |    |-- romantic: boolean (nullable = true)
 |    |    |-- touristy: boolean (nullable = true)
 |    |    |-- trendy: boolean (nullable = true)
 |    |    |-- upscale: boolean (nullable = true)
 |    |-- Attire: string (nullable = true)
 |    |-- BYOB: boolean (nullable = true)
 |    |-- BYOB/Corkage: string (nullable = true)
 |    |-- By Appointment Only: boolean (nullable = true)
 |    |-- Caters: boolean (nullable = true)
 |    |-- Coat Check

join the data

In [20]:
joined = cool_business.join(business, reviews.business_id == business.business_id, "left_outer")

group by city

In [21]:
joined.select(joined.city, joined.cool_weight).groupBy("city").sum("cool_weight").take(10)

[Row(city=u'Summerlin South', SUM(cool_weight#22)=8),
 Row(city=u'South Hills', SUM(cool_weight#22)=0),
 Row(city=u'Ahwatukee', SUM(cool_weight#22)=271),
 Row(city=u'Newbridge', SUM(cool_weight#22)=9),
 Row(city=u'Green Tree', SUM(cool_weight#22)=12),
 Row(city=u'Roslin', SUM(cool_weight#22)=4),
 Row(city=u'Castle Shannon', SUM(cool_weight#22)=3),
 Row(city=u'Pittsburgh/Waterfront', SUM(cool_weight#22)=38),
 Row(city=u'Stanfield', SUM(cool_weight#22)=0),
 Row(city=u'De Forest', SUM(cool_weight#22)=26)]

#3. Finally some sql

first register our dataframes as temporary tables:

In [22]:
business.registerTempTable("business")
reviews.registerTempTable("reviews")

now a SQL statement!

In [24]:
sql = """
    select city, count(1) as cnt
    from reviews r
      join business b on b.business_id = r.business_id
    group by city """

sqlCtx.sql(sql).take(5)

[Row(city=u'Summerlin South', cnt=9),
 Row(city=u'South Hills', cnt=4),
 Row(city=u'Ahwatukee', cnt=291),
 Row(city=u'Newbridge', cnt=16),
 Row(city=u'Green Tree', cnt=44)]

create a udf that can be leveraged in SQL

In [25]:
sqlCtx.registerFunction("good_or_bad", lambda x: 'good' if x >=3 else 'bad')


run SQL leverating this SQL statement

In [26]:
sql = """
    select name, cnt
    from (
      select b.name, count(1) as cnt
      from reviews r
        join business b on b.business_id = r.business_id
      where good_or_bad(r.stars) = "good"
      group by b.name) a
    order by cnt desc"""

sqlCtx.sql(sql).take(5)


[Row(name=u'Starbucks', cnt=4169),
 Row(name=u'Hash House A Go Go', cnt=3905),
 Row(name=u'Mon Ami Gabi', cnt=3830),
 Row(name=u'Earl of Sandwich', cnt=3498),
 Row(name=u'Wicked Spoon', cnt=2841)]