## Goals for this deliverable:
- Do data transformation
- Subset columns and filter rows by using .select() or .where()
- use a groupby() operation and an aggregate function .agg() ( sum by group, average by group etc)
- use distinct(), count() operation
- For those of you who know sql, do a sql operation on a table
    - Register the table view
    - run sql query using spark.sql("SELECT....")
- Create a new column using .withColumnNamed()
- Apply a user defined function (UDF)



### Do a bit more exploration after we have gotten the syntax. 

Think of what one might do with these data:
- What are properties of average Yelp users?
- Age, gender distribution, average number of checkin,…?
- What types of food restaurants are rated higher?
- vegetarian restaurant, pub, steak restaurant, family restaurant, etc?
- New Yorkers are more likely to give higher review scores than people in other states?
- Female users tends to give higher review scores (and / or) more review counts than male users?
- More restaurants with high review scores in New York than other states?

In [6]:

import findspark  # this needs to be the first import                                                                        
findspark.init()

# jupyter notebook &

from pyspark.sql import DataFrame
from pyspark.sql import *
from pyspark.ml import Pipeline
from pyspark.ml.classification import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.tuning import *
from pyspark.ml.feature import Imputer


import numpy as np
import pandas as pd
import scipy as sc

In [7]:

from pyspark import SparkContext
from pyspark.sql.session import SparkSession
try:
    sc = SparkContext()     
except ValueError:
    pass
# spark = SparkSession(sc)


spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
    
    
# from DaeWon
# establish a spark session with 50 executors
from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .master("yarn") \
        .appName("testing") \
        .config("spark.executor.instances", "70") \
        .config("spark.executor.memory","4g") \
        .config("spark.driver.memory","30g") \
        .config("spark.executor.cores",'1') \
        .config("spark.scheduler.mode","FIFO") \
        .getOrCreate()

In [8]:
#  spark.read.json?

In [9]:
# dat = spark.read.json('./dataset/checkin.json').repartition(150)     # from local machine
dat = spark.read.json('hdfs://localhost:9000/yelp/checkin.json').repartition(150)    # from hadoop; for port, see etc/hadoop/core-site.xml 
dat.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- time: struct (nullable = true)
 |    |-- Friday: struct (nullable = true)
 |    |    |-- 0:00: long (nullable = true)
 |    |    |-- 10:00: long (nullable = true)
 |    |    |-- 11:00: long (nullable = true)
 |    |    |-- 12:00: long (nullable = true)
 |    |    |-- 13:00: long (nullable = true)
 |    |    |-- 14:00: long (nullable = true)
 |    |    |-- 15:00: long (nullable = true)
 |    |    |-- 16:00: long (nullable = true)
 |    |    |-- 17:00: long (nullable = true)
 |    |    |-- 18:00: long (nullable = true)
 |    |    |-- 19:00: long (nullable = true)
 |    |    |-- 1:00: long (nullable = true)
 |    |    |-- 20:00: long (nullable = true)
 |    |    |-- 21:00: long (nullable = true)
 |    |    |-- 22:00: long (nullable = true)
 |    |    |-- 23:00: long (nullable = true)
 |    |    |-- 2:00: long (nullable = true)
 |    |    |-- 3:00: long (nullable = true)
 |    |    |-- 4:00: long (nullable = true)
 |    |    |-- 5:00: lo

In [10]:
# _dum = spark.read.json('yelp/checkin.json').respartition(150)

In [11]:
# bus = spark.read.json('./dataset/business.json').repartition(150)
# bus.printSchema()

# phot = spark.read.json('./dataset/photos.json').repartition(150)
# phot.printSchema()

bus = spark.read.json('hdfs://localhost:9000/yelp/business.json').repartition(150)
# bus.printSchema()

phot = spark.read.json('hdfs://localhost:9000/yelp/photos.json').repartition(150)
# phot.printScheme()

review= spark.read.json('hdfs://localhost:9000/yelp/review.json').repartition(150)
# phot.printScheme()

In [12]:
print dat.count()
print bus.count()
print phot.count()
print review.count()

135148
156639
196278
4736897


In [13]:
dat.rdd.getNumPartitions()

150

In [14]:
dat.show()

+--------------------+--------------------+
|         business_id|                time|
+--------------------+--------------------+
|b2596IQibXyco49ct...|[[1,null,null,nul...|
|1njD5p7-FvV8uoHzw...|[[null,null,null,...|
|gE80aKaybUL4SZZE4...|[[null,null,null,...|
|2tMuTCM-UxbCjyxTL...|[null,null,[null,...|
|mzGrMnuHJpPJEF7E4...|[null,null,[1,nul...|
|w9nB3j5ElUThCkgXQ...|[null,null,null,n...|
|Nwd3NkLVZD85zAV_n...|[[1,null,null,nul...|
|ZjwBMsW0tmS7sAYnM...|[null,[null,null,...|
|TzRK4L5ZpZv71cArM...|[[1,null,null,nul...|
|uLdJrH2rJwULKLopi...|[[null,null,null,...|
|d0_yLkH7u16tlcvvU...|[[null,null,null,...|
|DqNpyLa9CWN3YKaIG...|[[null,1,null,nul...|
|k0pZyGb_oBdFpcekM...|[null,[null,null,...|
|oH3V-79Cldz_6sWXi...|[[null,null,1,nul...|
|1y64UsTx6ikxoaCAY...|[[null,null,null,...|
|X209KCF2ex6VvNJcb...|[[1,1,1,null,3,nu...|
|0mj6WOkZnwrdcWYlG...|[[null,null,2,nul...|
|XG0twdTiMzUS3v3p9...|[[1,null,null,nul...|
|85ot7LGtfcr8pj5yK...|[null,[null,null,...|
|YhLrPdDWKd26l-fIG...|[[1,null,n

In [15]:
dat.columns


['business_id', 'time']

In [16]:
phot.columns

['business_id', 'caption', 'label', 'photo_id']

In [17]:
bus.columns

['address',
 'attributes',
 'business_id',
 'categories',
 'city',
 'hours',
 'is_open',
 'latitude',
 'longitude',
 'name',
 'neighborhood',
 'postal_code',
 'review_count',
 'stars',
 'state']

In [18]:
review.columns

['business_id',
 'cool',
 'date',
 'funny',
 'review_id',
 'stars',
 'text',
 'useful',
 'user_id']

In [19]:
bus.show()

+--------------------+--------------------+--------------------+--------------------+----------+--------------------+-------+-------------+--------------+--------------------+--------------------+-----------+------------+-----+-----+
|             address|          attributes|         business_id|          categories|      city|               hours|is_open|     latitude|     longitude|                name|        neighborhood|postal_code|review_count|stars|state|
+--------------------+--------------------+--------------------+--------------------+----------+--------------------+-------+-------------+--------------+--------------------+--------------------+-----------+------------+-----+-----+
|  Silberburgstr. 171|[null,null,null,n...|IVtp8mznb2vRO0Ocw...|  [Shopping, Bridal]| Stuttgart|[10:00-18:00,10:0...|      1|   48.7714398|      9.168795|   Brautatelier Tara|                    |      70178|           3|  4.5|   BW|
|    7000 E Shea Blvd|[null,null,full_b...|XFQbescZxNRAb2U-N...|

In [20]:
bus.dtypes

[('address', 'string'),
 ('attributes',
  'struct<AcceptsInsurance:boolean,AgesAllowed:string,Alcohol:string,Ambience:struct<casual:boolean,classy:boolean,divey:boolean,hipster:boolean,intimate:boolean,romantic:boolean,touristy:boolean,trendy:boolean,upscale:boolean>,BYOB:boolean,BYOBCorkage:string,BestNights:struct<friday:boolean,monday:boolean,saturday:boolean,sunday:boolean,thursday:boolean,tuesday:boolean,wednesday:boolean>,BikeParking:boolean,BusinessAcceptsBitcoin:boolean,BusinessAcceptsCreditCards:boolean,BusinessParking:struct<garage:boolean,lot:boolean,street:boolean,valet:boolean,validated:boolean>,ByAppointmentOnly:boolean,Caters:boolean,CoatCheck:boolean,Corkage:boolean,DietaryRestrictions:struct<dairy-free:boolean,gluten-free:boolean,halal:boolean,kosher:boolean,soy-free:boolean,vegan:boolean,vegetarian:boolean>,DogsAllowed:boolean,DriveThru:boolean,GoodForDancing:boolean,GoodForKids:boolean,GoodForMeal:struct<breakfast:boolean,brunch:boolean,dessert:boolean,dinner:boolean

In [21]:
dat = dat.select('business_id','time')
print type(dat)

<class 'pyspark.sql.dataframe.DataFrame'>


In [22]:
print type(bus)

<class 'pyspark.sql.dataframe.DataFrame'>


In [23]:
# Comput summary stats
bus.describe().show() 

+-------+--------------------+--------------------+--------------+------------------+-----------------+------------------+------------------+------------+-----------------+-----------------+------------------+------------------+
|summary|             address|         business_id|          city|           is_open|         latitude|         longitude|              name|neighborhood|      postal_code|     review_count|             stars|             state|
+-------+--------------------+--------------------+--------------+------------------+-----------------+------------------+------------------+------------+-----------------+-----------------+------------------+------------------+
|  count|              156639|              156639|        156639|            156639|           156638|            156638|            156639|      156639|           156639|           156639|            156639|            156639|
|   mean|               100.0|                null|          null|0.8443746448840965

In [24]:
dat

DataFrame[business_id: string, time: struct<Friday:struct<0:00:bigint,10:00:bigint,11:00:bigint,12:00:bigint,13:00:bigint,14:00:bigint,15:00:bigint,16:00:bigint,17:00:bigint,18:00:bigint,19:00:bigint,1:00:bigint,20:00:bigint,21:00:bigint,22:00:bigint,23:00:bigint,2:00:bigint,3:00:bigint,4:00:bigint,5:00:bigint,6:00:bigint,7:00:bigint,8:00:bigint,9:00:bigint>,Monday:struct<0:00:bigint,10:00:bigint,11:00:bigint,12:00:bigint,13:00:bigint,14:00:bigint,15:00:bigint,16:00:bigint,17:00:bigint,18:00:bigint,19:00:bigint,1:00:bigint,20:00:bigint,21:00:bigint,22:00:bigint,23:00:bigint,2:00:bigint,3:00:bigint,4:00:bigint,5:00:bigint,6:00:bigint,7:00:bigint,8:00:bigint,9:00:bigint>,Saturday:struct<0:00:bigint,10:00:bigint,11:00:bigint,12:00:bigint,13:00:bigint,14:00:bigint,15:00:bigint,16:00:bigint,17:00:bigint,18:00:bigint,19:00:bigint,1:00:bigint,20:00:bigint,21:00:bigint,22:00:bigint,23:00:bigint,2:00:bigint,3:00:bigint,4:00:bigint,5:00:bigint,6:00:bigint,7:00:bigint,8:00:bigint,9:00:bigint>,Sun

In [25]:
# count num of unique classes (rows) in a column
dat.select(col("time")).distinct().count()

120782

In [26]:
# we can do an aggregate count to see the distribution of the classes
dat.groupBy(col("business_id")).count().collect()

[Row(business_id=u'y8e07i7HPWuYVQzg_2RbxA', count=1),
 Row(business_id=u'68htH_xVthYKRxvpTpu2ew', count=1),
 Row(business_id=u'yuBzzarZ13pt8UpspSxOBg', count=1),
 Row(business_id=u'CqTPLUHBM9AM3TEqPn0krg', count=1),
 Row(business_id=u'0859wfd1BQHG46Zpwhc0ZQ', count=1),
 Row(business_id=u'G-AW1PNolA2XnADKqqJO_A', count=1),
 Row(business_id=u'VmSrPPO2WXmOKjUW7pDGsQ', count=1),
 Row(business_id=u'FVgHb9w4pnkRBV5bZYJaZA', count=1),
 Row(business_id=u'xZpZvAqZSla5c72CuNKnJA', count=1),
 Row(business_id=u'zhBkNLn2KPnh5-NIueXVHA', count=1),
 Row(business_id=u'ARkNkb3EJzA42PqMC5VMbA', count=1),
 Row(business_id=u'OZJniEO9I-PqavZYuyNjng', count=1),
 Row(business_id=u'oF054vcRTDNx88qxBGzhFw', count=1),
 Row(business_id=u'4XMhO5LyfFHq5rdFHkXfXQ', count=1),
 Row(business_id=u'Cl-xl1vTUwHeaGgBxzdTRA', count=1),
 Row(business_id=u'XIAbir-WvErgW6xjB6YRlA', count=1),
 Row(business_id=u'rWxAgLSI0Sk1MCpQGd5HDQ', count=1),
 Row(business_id=u'SubDWJhpXWLs-TMNQxE67Q', count=1),
 Row(business_id=u'KkyOeKAro

In [27]:
bus.filter(bus["longitude"]>24).show()

+--------------------+--------------------+--------------------+--------------------+---------+--------------------+-------+---------+----------+-----------+------------+-----------+------------+-----+-----+
|             address|          attributes|         business_id|          categories|     city|               hours|is_open| latitude| longitude|       name|neighborhood|postal_code|review_count|stars|state|
+--------------------+--------------------+--------------------+--------------------+---------+--------------------+-------+---------+----------+-----------+------------+-----------+------------+-----+-----+
|Green Valley Ranc...|[null,null,full_b...|Zmp2_b2gpSloz4Dv2...|[Italian, Restaur...|Henderson|[17:00-23:00,17:0...|      0|36.019323|115.086769|Terra Verde|            |      89052|          70|  3.5|   NV|
+--------------------+--------------------+--------------------+--------------------+---------+--------------------+-------+---------+----------+-----------+-----------

In [28]:
# find mean of "cool" col
review.select("cool").agg({"cool": "avg"}).collect()

[Row(avg(cool)=0.5096600158289277)]

In [29]:
# find mean of "cool", but this time only consider those with # of reviews $gtreq$ 10
review_rect = bus.filter("review_count >= 10")

In [30]:
review_rect.columns

['address',
 'attributes',
 'business_id',
 'categories',
 'city',
 'hours',
 'is_open',
 'latitude',
 'longitude',
 'name',
 'neighborhood',
 'postal_code',
 'review_count',
 'stars',
 'state']

In [31]:
review_rect_stars4 = review_rect.filter("stars = 4")

In [32]:
review_rect_stars4.select("cool").agg({"cool": "avg"}).collect()
# --> Ok, need to join review and business table together.. 

AnalysisException: u"cannot resolve '`cool`' given input columns: [address, stars, neighborhood, hours, city, review_count, attributes, business_id, state, latitude, longitude, postal_code, is_open, name, categories];;\n'Project ['cool]\n+- Filter (stars#35 = cast(4 as double))\n   +- Filter (review_count#34L >= cast(10 as bigint))\n      +- Repartition 150, true\n         +- Relation[address#22,attributes#23,business_id#24,categories#25,city#26,hours#27,is_open#28L,latitude#29,longitude#30,name#31,neighborhood#32,postal_code#33,review_count#34L,stars#35,state#36] json\n"

## Play with join, inner, outer, cross, etc

In [None]:
bus.crossJoin?

In [None]:
func.col?
func.when?

In [38]:
review.join?

In [40]:
_xxx = review.join(bus, review.business_id == bus.business_id, 'inner')

In [44]:
_xxx.columns

['business_id',
 'cool',
 'date',
 'funny',
 'review_id',
 'stars',
 'text',
 'useful',
 'user_id',
 'address',
 'attributes',
 'business_id',
 'categories',
 'city',
 'hours',
 'is_open',
 'latitude',
 'longitude',
 'name',
 'neighborhood',
 'postal_code',
 'review_count',
 'stars',
 'state']

In [43]:
print review.columns, bus.columns

['business_id', 'cool', 'date', 'funny', 'review_id', 'stars', 'text', 'useful', 'user_id'] ['address', 'attributes', 'business_id', 'categories', 'city', 'hours', 'is_open', 'latitude', 'longitude', 'name', 'neighborhood', 'postal_code', 'review_count', 'stars', 'state']


In [42]:
print _xxx.show()

+--------------------+----+----------+-----+--------------------+-----+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+-------+---------+----------+--------------------+------------+-----------+------------+-----+-----+
|         business_id|cool|      date|funny|           review_id|stars|                text|useful|             user_id|             address|          attributes|         business_id|          categories|     city|               hours|is_open| latitude| longitude|                name|neighborhood|postal_code|review_count|stars|state|
+--------------------+----+----------+-----+--------------------+-----+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+-------+---------+----------+--------------------+------------+-----------+------------+-----

 So it "mergerd" fine but this is inner join, and therefore "business_id" (primary key) appear twice -- join duplicates the field. Bigger problem is we no longer easily call "business_id" because it will throw an error saying it doesn't know which field ("business_id") you are calling...

In [45]:
_xxx.select("business_id")

AnalysisException: u"Reference 'business_id' is ambiguous, could be: business_id#80, business_id#24.;"

### Let's experiment with other kinds of join
First, we will make business_id unique.

In [117]:
_test = review.join(bus, how='inner', on='business_id')

In [119]:
print _test.columns

['business_id', 'cool', 'date', 'funny', 'review_id', 'stars', 'text', 'useful', 'user_id', 'address', 'attributes', 'categories', 'city', 'hours', 'is_open', 'latitude', 'longitude', 'name', 'neighborhood', 'postal_code', 'review_count', 'stars', 'state']


In [62]:
_yyy = review.join(bus, review.business_id == bus.business_id, 'inner').drop(bus.business_id).drop(bus.stars)

In [63]:
print _yyy.columns

['business_id', 'cool', 'date', 'funny', 'review_id', 'stars', 'text', 'useful', 'user_id', 'address', 'attributes', 'categories', 'city', 'hours', 'is_open', 'latitude', 'longitude', 'name', 'neighborhood', 'postal_code', 'review_count', 'state']


In [50]:
# to test the syntax
_zzz = review.join(bus, 'business_id', 'inner')
print _zzz.columns

['business_id', 'cool', 'date', 'funny', 'review_id', 'stars', 'text', 'useful', 'user_id', 'address', 'attributes', 'categories', 'city', 'hours', 'is_open', 'latitude', 'longitude', 'name', 'neighborhood', 'postal_code', 'review_count', 'stars', 'state']


In [53]:
%matplotlib inline
from IPython.display import display
display(phot)

DataFrame[business_id: string, caption: string, label: string, photo_id: string]

In [56]:
_zzz.take?

In [55]:
_zzz.take(5)

[Row(business_id=u'--9e1ONYQuAa-CB_Rrw7Tw', cool=0, date=u'2009-12-13', funny=0, review_id=u'JUiz5fSxFqAfpGZ7rAJPxg', stars=4, text=u"wonderful decor and ambiance.  simple, modern and elegant.  comfy chairs too.   as others have stated the restaurant is a little loud (which I liked) but the tables are spaced comfortably apart so you can still enjoy an intimate dinner.  \n\nstellar service.  John and his group were attentive without being snooty and overbearing.  we felt comfortable.  i don't think there was ever a moment when our glasses reached half empty glass before they addressed it.  any little crumbs on the table were quickly taken care of.  definitely the service I was expecting from a top notch Vegas establishment.\n\nexpect to spend about $100 / person.  everything is a la carte.  entrees were about $50, sides about $10, cocktails about $15 and wine $$$$.\n\nhuge portions.  if you had any food earlier in the day,  don't expect to finish your entree, especially if you order the

In [57]:
newbus_rev = _zzz
print newbus_rev.select("cool").show()

+----+
|cool|
+----+
|   0|
|   0|
|   0|
|   0|
|   0|
|   0|
|   0|
|   1|
|   0|
|   0|
|   0|
|   0|
|   0|
|   0|
|   0|
|   0|
|   1|
|   1|
|   1|
|   0|
+----+
only showing top 20 rows

None


In [59]:
# mean val of "cool"
newbus_rev.select("cool").agg({"cool": "avg"}).collect()

[Row(avg(cool)=0.5096600158289277)]

In [64]:
# cal mean val of "cool" again, after filter the good restaurants with >= 10 reviews
slim = _yyy.filter("review_count >= 10")
slim2 = slim.filter("stars = 4")
slim2.select("cool").agg({"cool": "avg"}).collect()

[Row(avg(cool)=0.7317252216817208)]

In [69]:
# cal again, but for different criteria
slim2 = slim.filter('is_open="\True\"')
slim3 = slim2.filter('stars >= 3')
slim3.select("cool").agg({"cool": "avg"}).collect()

[Row(avg(cool)=None)]

In [70]:
# something must be wrong here...
slim3.count()

0

In [71]:
slim2.count()

0

In [72]:
slim.count()

4324067

In [73]:
# ok, should be 1 and not True
slim2 = slim.filter('is_open = 1')
slim2.count()

3847676

In [74]:
slim3 = slim2.filter('stars >= 3')
slim3.select("cool").agg({"cool": "avg"}).collect()

[Row(avg(cool)=0.5738976337462125)]

In [76]:
# find which state has the most reviews 
slimstate = slim.groupBy("state")
slim_ct = slimstate.agg({"review_count": "sum"}).orderBy(desc("SUM(review_count)")).collect()

In [81]:
print slim_ct[:3]

[Row(state=u'NV', sum(review_count)=1112608328), Row(state=u'AZ', sum(review_count)=298839665), Row(state=u'ON', sum(review_count)=60893394)]


#### keep playing

In [82]:
_yyy.filter("useful >= 1").count()

1981990

In [83]:
_yyy.select("useful").agg({"useful": "max"}).collect()

[Row(max(useful)=1125)]

In [87]:
_dum = _yyy.select("useful")

In [90]:
_dum.count()

4736897

In [102]:
_dum.take(1125)[-5:]

[Row(useful=0), Row(useful=1), Row(useful=0), Row(useful=11), Row(useful=0)]

In [100]:
_yyy.select("business_id", (_yyy.stars/1*1).cast("int")).show(5)
print _yyy.stars

+--------------------+------------------------------+
|         business_id|CAST(((stars / 1) * 1) AS INT)|
+--------------------+------------------------------+
|dwQEZBFen2GdihLLf...|                             4|
|mr4FiPaXTWlJ3qGzp...|                             5|
|_mqUzNXs_sJ1EJYgY...|                             5|
|nFLjodhGSJ8B-KB_S...|                             5|
|e-hztV3mlfn5z8VmR...|                             5|
+--------------------+------------------------------+
only showing top 5 rows

Column<stars>


In [103]:
_yyy.select("business_id", (_yyy.stars*10).cast("int")).show(5)

+--------------------+-------------------------+
|         business_id|CAST((stars * 10) AS INT)|
+--------------------+-------------------------+
|dwQEZBFen2GdihLLf...|                       40|
|mr4FiPaXTWlJ3qGzp...|                       50|
|_mqUzNXs_sJ1EJYgY...|                       50|
|nFLjodhGSJ8B-KB_S...|                       50|
|e-hztV3mlfn5z8VmR...|                       50|
+--------------------+-------------------------+
only showing top 5 rows



### play with some attributes

In [107]:
import pyspark.sql.functions as func
xx = bus.groupBy("postal_code").agg(func.max("stars"), func.sum("review_count"))

In [108]:
xx

DataFrame[postal_code: string, max(stars): double, sum(review_count): bigint]

In [109]:
print xx.columns

['postal_code', 'max(stars)', 'sum(review_count)']


In [110]:
xx.select('postal_code').describe().show()

+-------+------------------+
|summary|       postal_code|
+-------+------------------+
|  count|             14508|
|   mean|58285.150715071504|
| stddev| 39517.14357170148|
|    min|                  |
|    max|          YO22 5LY|
+-------+------------------+



### Play with spark.sql

In [62]:
#we can do the same thing in sql code
# we must first create a "view" of the data in order to execute SQL functions
bus.createOrReplaceTempView("bus")
spark.sql("SELECT COUNT(*) FROM bus GROUP BY neighborhood").collect()

[Row(count(1)=70),
 Row(count(1)=5),
 Row(count(1)=51),
 Row(count(1)=239),
 Row(count(1)=122),
 Row(count(1)=5),
 Row(count(1)=81),
 Row(count(1)=4),
 Row(count(1)=2),
 Row(count(1)=3),
 Row(count(1)=3237),
 Row(count(1)=12),
 Row(count(1)=8),
 Row(count(1)=4),
 Row(count(1)=3),
 Row(count(1)=100),
 Row(count(1)=58),
 Row(count(1)=7),
 Row(count(1)=234),
 Row(count(1)=6),
 Row(count(1)=13),
 Row(count(1)=99),
 Row(count(1)=531),
 Row(count(1)=13),
 Row(count(1)=1),
 Row(count(1)=110),
 Row(count(1)=254),
 Row(count(1)=63),
 Row(count(1)=367),
 Row(count(1)=1),
 Row(count(1)=2),
 Row(count(1)=2),
 Row(count(1)=10),
 Row(count(1)=4),
 Row(count(1)=133),
 Row(count(1)=5),
 Row(count(1)=14),
 Row(count(1)=314),
 Row(count(1)=42),
 Row(count(1)=18),
 Row(count(1)=1751),
 Row(count(1)=269),
 Row(count(1)=149),
 Row(count(1)=18),
 Row(count(1)=23),
 Row(count(1)=8),
 Row(count(1)=20),
 Row(count(1)=276),
 Row(count(1)=24),
 Row(count(1)=34),
 Row(count(1)=118),
 Row(count(1)=5),
 Row(count(1

In [64]:
spark.sql("SELECT * FROM bus").show()

+--------------------+--------------------+--------------------+--------------------+----------+--------------------+-------+-------------+--------------+--------------------+--------------------+-----------+------------+-----+-----+
|             address|          attributes|         business_id|          categories|      city|               hours|is_open|     latitude|     longitude|                name|        neighborhood|postal_code|review_count|stars|state|
+--------------------+--------------------+--------------------+--------------------+----------+--------------------+-------+-------------+--------------+--------------------+--------------------+-----------+------------+-----+-----+
|  Silberburgstr. 171|[null,null,null,n...|IVtp8mznb2vRO0Ocw...|  [Shopping, Bridal]| Stuttgart|[10:00-18:00,10:0...|      1|   48.7714398|      9.168795|   Brautatelier Tara|                    |      70178|           3|  4.5|   BW|
|    7000 E Shea Blvd|[null,null,full_b...|XFQbescZxNRAb2U-N...|

In [28]:
#Convert string types(categorical) to integerTypes
types = [str(f.dataType) for f in dat.schema.fields]
ind_str = [i for i in range(len(types)) if types[i] is 'StringType']
ind_int = [i for i in range(len(types)) if types[i] is 'IntegerType']
stringcol= [dat.columns[i] for i in ind_str]
intcol = [dat.columns[i] for i in ind_int]
print(stringcol)
print(intcol)
del ind_str, ind_int, types

['business_id']
[]


In [33]:
dat = dat.drop('ID')
dat.drop?

In [34]:
# remove all string columns
for colname in stringcol:
    dat = dat.drop(colname)
    
# remove 'business_id' from inttype list
intcol = intcol[1:]

In [35]:
intcol

[]

In [69]:
print type('s')


<type 'str'>


In [80]:
# apply a user defined function (UDF)
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

taste_udf = udf(lambda stars: "haeven" if stars == 5 else "mediocore", StringType())

In [85]:
print bus['stars']

Column<stars>


In [86]:
print bus.select('stars').show()

+-----+
|stars|
+-----+
|  4.5|
|  3.5|
|  2.0|
|  3.5|
|  3.5|
|  5.0|
|  2.5|
|  3.0|
|  4.0|
|  4.5|
|  5.0|
|  4.5|
|  3.0|
|  3.5|
|  4.5|
|  2.5|
|  4.0|
|  3.5|
|  4.0|
|  4.0|
+-----+
only showing top 20 rows

None


In [90]:
# Add new column
print taste_udf(bus.stars)
# Add new column
buste = bus.withColumn('tastebud', taste_udf(bus.stars))

Column<<lambda>(stars)>


In [91]:
print buste.show()

+--------------------+--------------------+--------------------+--------------------+----------+--------------------+-------+-------------+--------------+--------------------+--------------------+-----------+------------+-----+-----+---------+
|             address|          attributes|         business_id|          categories|      city|               hours|is_open|     latitude|     longitude|                name|        neighborhood|postal_code|review_count|stars|state| tastebud|
+--------------------+--------------------+--------------------+--------------------+----------+--------------------+-------+-------------+--------------+--------------------+--------------------+-----------+------------+-----+-----+---------+
|  Silberburgstr. 171|[null,null,null,n...|IVtp8mznb2vRO0Ocw...|  [Shopping, Bridal]| Stuttgart|[10:00-18:00,10:0...|      1|   48.7714398|      9.168795|   Brautatelier Tara|                    |      70178|           3|  4.5|   BW|mediocore|
|    7000 E Shea Blvd|[n

In [12]:
review.describe()

DataFrame[summary: string, business_id: string, cool: string, date: string, funny: string, review_id: string, stars: string, text: string, useful: string, user_id: string]

In [29]:
primarykey = bus.select('business_id','state')

In [30]:
print primarykey.show()

+--------------------+-----+
|         business_id|state|
+--------------------+-----+
|IVtp8mznb2vRO0Ocw...|   BW|
|XFQbescZxNRAb2U-N...|   AZ|
|1Ds8V2c7LlwSAA3O-...|   AZ|
|cNd3G6WB3D8BUp46U...|   NV|
|fGAb16Qaz64tnnqoD...|   AZ|
|LM-_1Ye4Ufn9zxEuZ...|   BW|
|v_NBQhFPHDNlEJbdw...|   AZ|
|oXNNwLFJM4PNHOpSb...|  EDH|
|4ZLwV5HhzBfzYw3Jy...|   NC|
|VRjMOWAzUIbM7Yd5Q...|   NC|
|JGJKGPH5H4k8K02Wv...|   NC|
|aEDPioOyd9YjaY8Y-...|   NV|
|DMrnb11dwFdgbcNYz...|  EDH|
|oc50MoXt3lg1a4QzC...|   NC|
|BLEahNnfVfSj6g8ef...|  MLN|
|FQ9C3ZF0NOfRyvFdx...|   ON|
|-XNodvHlq9oDu-kQB...|   ON|
|Lj6tX9QOf-uxLNOZ8...|   NV|
|-rLP6lfoQmOO_5Pf2...|   WI|
|t5vhRhogtC_K7PaMq...|   NV|
+--------------------+-----+
only showing top 20 rows

None


In [32]:
primarykey.describe().show()

+-------+--------------------+------------------+
|summary|         business_id|             state|
+-------+--------------------+------------------+
|  count|              156639|            156639|
|   mean|                null|              10.5|
| stddev|                null|26.071330723886835|
|    min|--6MefnULPED_I942...|                01|
|    max|zzzaIBwimxVej4tY6...|               ZET|
+-------+--------------------+------------------+



In [33]:
primarykey.show()

+--------------------+-----+
|         business_id|state|
+--------------------+-----+
|IVtp8mznb2vRO0Ocw...|   BW|
|XFQbescZxNRAb2U-N...|   AZ|
|1Ds8V2c7LlwSAA3O-...|   AZ|
|cNd3G6WB3D8BUp46U...|   NV|
|fGAb16Qaz64tnnqoD...|   AZ|
|LM-_1Ye4Ufn9zxEuZ...|   BW|
|v_NBQhFPHDNlEJbdw...|   AZ|
|oXNNwLFJM4PNHOpSb...|  EDH|
|4ZLwV5HhzBfzYw3Jy...|   NC|
|VRjMOWAzUIbM7Yd5Q...|   NC|
|JGJKGPH5H4k8K02Wv...|   NC|
|aEDPioOyd9YjaY8Y-...|   NV|
|DMrnb11dwFdgbcNYz...|  EDH|
|oc50MoXt3lg1a4QzC...|   NC|
|BLEahNnfVfSj6g8ef...|  MLN|
|FQ9C3ZF0NOfRyvFdx...|   ON|
|-XNodvHlq9oDu-kQB...|   ON|
|Lj6tX9QOf-uxLNOZ8...|   NV|
|-rLP6lfoQmOO_5Pf2...|   WI|
|t5vhRhogtC_K7PaMq...|   NV|
+--------------------+-----+
only showing top 20 rows



In [35]:
review.groupBy("business_id").count().orderBy("count").collect()

[Row(business_id=u'qj4KHXGgwyutHpvWRLI87g', count=1),
 Row(business_id=u'iK6JSngIDcToad8SBl04Fg', count=1),
 Row(business_id=u'h5csTpDRI7_oo1RUOeHOtQ', count=2),
 Row(business_id=u'BiQGSKfQJgmjx-HSgeO2Mg', count=2),
 Row(business_id=u'Egu3Q_gHX2NmfGMi5voS8A', count=2),
 Row(business_id=u'mhjSXQR9LvL10SSHH6aHIg', count=2),
 Row(business_id=u'JoZjCxjbS-01rjBz3KR8Nw', count=2),
 Row(business_id=u'o5cjHbxPMcfqauzklOyPQw', count=2),
 Row(business_id=u'1z_l90NydQFJp9s5eYWkTw', count=2),
 Row(business_id=u'_iTsyRdHmwDKT2pQLW3LaA', count=2),
 Row(business_id=u'IW1j1wqAdiBcj6fWGkyQCA', count=2),
 Row(business_id=u'Ar7lMMYZIo1ND3AUzhW-Gw', count=2),
 Row(business_id=u'roxnZ0kodKkj5iR72UlQSw', count=2),
 Row(business_id=u'RtlEHMt-96EROQ3du7B8dA', count=2),
 Row(business_id=u'1J3eSZk7SoXef_cbJJ5jHQ', count=2),
 Row(business_id=u'HHgHEtXyIO228Sybr6vdIg', count=2),
 Row(business_id=u'sih1hTxuGP0vtTD3tciKKg', count=2),
 Row(business_id=u'YHX61tYzUKlz9H4gdROlDA', count=2),
 Row(business_id=u'uPOCtXFMp

In [36]:
review.groupBy("business_id").count().orderBy("count")

DataFrame[business_id: string, count: bigint]

In [39]:
# to understand what collect() does
_dummy = review.groupBy("business_id").count().orderBy("count")
_dummy.collect?

In [None]:
# find the avg ratings by state
review.groupBy(col("state")).agg(func.max("stars"), func.sum("review_count")).collect()
.groupBy("state").agg({"review_count":"sum"}).orderBy(desc("SUM(review_count#16)")).collect()


#  Implement an ML model

In [111]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

categoricalColumns = bus['categories']

In [112]:
print categoricalColumns

Column<categories>


In [161]:
# join based on user and business keys
# other objects get garbage collected

# _yyy.select("stars")
review_business = _yyy.withColumnRenamed("stars", "business_avg_stars").withColumnRenamed("attributes", "business_type").withColumnRenamed(\
                                         "review_count", "business_review_count")

In [152]:
del user
user = spark.read.json('hdfs://localhost:9000/yelp/user.json').repartition(150)

In [162]:
review_business.columns

['business_id',
 'cool',
 'date',
 'funny',
 'review_id',
 'business_avg_stars',
 'text',
 'useful',
 'user_id',
 'address',
 'business_type',
 'categories',
 'city',
 'hours',
 'is_open',
 'latitude',
 'longitude',
 'name',
 'neighborhood',
 'postal_code',
 'business_review_count',
 'state']

In [154]:
_yyy.columns

['business_id',
 'cool',
 'date',
 'funny',
 'review_id',
 'stars',
 'text',
 'useful',
 'user_id',
 'address',
 'attributes',
 'categories',
 'city',
 'hours',
 'is_open',
 'latitude',
 'longitude',
 'name',
 'neighborhood',
 'postal_code',
 'review_count',
 'state']

In [156]:
# To avoid clashing ... beforer merging with review_business
user = user.withColumnRenamed("review_count", "user_review_count")


In [157]:
user.columns

['average_stars',
 'compliment_cool',
 'compliment_cute',
 'compliment_funny',
 'compliment_hot',
 'compliment_list',
 'compliment_more',
 'compliment_note',
 'compliment_photos',
 'compliment_plain',
 'compliment_profile',
 'compliment_writer',
 'cool',
 'elite',
 'fans',
 'friends',
 'funny',
 'name',
 'user_review_count',
 'useful',
 'user_id',
 'yelping_since']

In [158]:
user_review = _yyy.join(user, how=None, on='user_id').drop(user.name)

In [141]:
user_review.columns, user.columns

(['user_id',
  'business_id',
  'cool',
  'date',
  'funny',
  'review_id',
  'stars',
  'text',
  'useful',
  'address',
  'attributes',
  'categories',
  'city',
  'hours',
  'is_open',
  'latitude',
  'longitude',
  'name',
  'neighborhood',
  'postal_code',
  'review_count',
  'state',
  'average_stars',
  'compliment_cool',
  'compliment_cute',
  'compliment_funny',
  'compliment_hot',
  'compliment_list',
  'compliment_more',
  'compliment_note',
  'compliment_photos',
  'compliment_plain',
  'compliment_profile',
  'compliment_writer',
  'cool',
  'elite',
  'fans',
  'friends',
  'funny',
  'review_count',
  'useful',
  'yelping_since'],
 ['average_stars',
  'compliment_cool',
  'compliment_cute',
  'compliment_funny',
  'compliment_hot',
  'compliment_list',
  'compliment_more',
  'compliment_note',
  'compliment_photos',
  'compliment_plain',
  'compliment_profile',
  'compliment_writer',
  'cool',
  'elite',
  'fans',
  'friends',
  'funny',
  'name',
  'review_count',
  'us

In [160]:
user_review = user_review.withColumnRenamed("name", "user_name").withColumnRenamed("elite", "user_type").withColumnRenamed(\
                                "average_stars", "user_avg_stars").withColumnRenamed("review_count", "user_review_count")

In [179]:
yelp_review = user_review.join(review_business, on='review_id').drop(review_business.user_id).drop(review_business.business_id)

In [180]:
yelp_review.columns

['review_id',
 'cool',
 'date',
 'funny',
 'stars',
 'text',
 'useful',
 'address',
 'attributes',
 'categories',
 'city',
 'hours',
 'is_open',
 'latitude',
 'longitude',
 'user_name',
 'neighborhood',
 'postal_code',
 'user_review_count',
 'state',
 'user_avg_stars',
 'compliment_cool',
 'compliment_cute',
 'compliment_funny',
 'compliment_hot',
 'compliment_list',
 'compliment_more',
 'compliment_note',
 'compliment_photos',
 'compliment_plain',
 'compliment_profile',
 'compliment_writer',
 'cool',
 'user_type',
 'fans',
 'friends',
 'funny',
 'user_review_count',
 'useful',
 'yelping_since',
 'business_id',
 'cool',
 'date',
 'funny',
 'business_avg_stars',
 'text',
 'useful',
 'user_id',
 'address',
 'business_type',
 'categories',
 'city',
 'hours',
 'is_open',
 'latitude',
 'longitude',
 'name',
 'neighborhood',
 'postal_code',
 'business_review_count',
 'state']

### Linear Regression w/o cross-val

In [188]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [189]:
yelp_review.columns

['review_id',
 'cool',
 'date',
 'funny',
 'stars',
 'text',
 'useful',
 'address',
 'attributes',
 'categories',
 'city',
 'hours',
 'is_open',
 'latitude',
 'longitude',
 'user_name',
 'neighborhood',
 'postal_code',
 'user_review_count',
 'state',
 'user_avg_stars',
 'compliment_cool',
 'compliment_cute',
 'compliment_funny',
 'compliment_hot',
 'compliment_list',
 'compliment_more',
 'compliment_note',
 'compliment_photos',
 'compliment_plain',
 'compliment_profile',
 'compliment_writer',
 'cool',
 'user_type',
 'fans',
 'friends',
 'funny',
 'user_review_count',
 'useful',
 'yelping_since',
 'business_id',
 'cool',
 'date',
 'funny',
 'business_avg_stars',
 'text',
 'useful',
 'user_id',
 'address',
 'business_type',
 'categories',
 'city',
 'hours',
 'is_open',
 'latitude',
 'longitude',
 'name',
 'neighborhood',
 'postal_code',
 'business_review_count',
 'state']

In [None]:
# transform data structure
from pyspark.ml.linalg import Vectors
yelp_df = yelp_review.rdd.map(lambda x: [Vector.dense(x[20],x[18],x[-10],x[-2],x[-17]), x[4]]).toDF(['features', 'label'])
yelp_df.show(3)

In [None]:
from pyspark.ml.regression import LinearRegression

### Linear regression + cross-val (k-fold)

In [171]:
train, test = yelp_review.randomSplit?

In [181]:
train, test = yelp_review.randomSplit([0.8, 0.2], seed=1123123)

In [175]:
train.count()

3790727

In [182]:
test.select("business_id").count()

KeyboardInterrupt: 

In [183]:
train.take(3)

KeyboardInterrupt: 

In [None]:
train.select('city').show()

In [None]:
from sklearn.model_selection import cross_val_predict
from sklearn import linear_model
from sklearni mport preprocessing

lr = linear_model.LinearRegression()
# how to proceed with dataFrame???

# yp = cross_val_predic(lr, x, y, cv=10)


In [None]:

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.util import MLUtils

train = train.toDF()
lr = LinearRegression(maxIter=10, regParam=0.3, eleasticNetParam=0.8)

# fit
lrModel = lr.fit(train)

print("Weights: " + str(lrModel.weights)) 
print("Intercept: " + str(lrModel.intercept)) 

evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='label')
evaluator.evaluate(pred, {evaluator.metricName: "r2"})

In [None]:
train.columns

In [None]:
lr = LinearRegression(featuresCol=['user_avg_stars', 'business_avg_stars', 'city', 'business_review_count', \
                                    'user_review_count'], labelCol='stars')
lrmode = lr.fit(train)

## system stuff

In [115]:
locals()

{'ArrayType': pyspark.sql.types.ArrayType,
 'BinaryClassificationEvaluator': pyspark.ml.evaluation.BinaryClassificationEvaluator,
 'BinaryLogisticRegressionSummary': pyspark.ml.classification.BinaryLogisticRegressionSummary,
 'BinaryLogisticRegressionTrainingSummary': pyspark.ml.classification.BinaryLogisticRegressionTrainingSummary,
 'BinaryType': pyspark.sql.types.BinaryType,
 'BooleanType': pyspark.sql.types.BooleanType,
 'ByteType': pyspark.sql.types.ByteType,
 'Column': pyspark.sql.column.Column,
 'CrossValidator': pyspark.ml.tuning.CrossValidator,
 'CrossValidatorModel': pyspark.ml.tuning.CrossValidatorModel,
 'DataFrame': pyspark.sql.dataframe.DataFrame,
 'DataFrameNaFunctions': pyspark.sql.dataframe.DataFrameNaFunctions,
 'DataFrameReader': pyspark.sql.readwriter.DataFrameReader,
 'DataFrameStatFunctions': pyspark.sql.dataframe.DataFrameStatFunctions,
 'DataFrameWriter': pyspark.sql.readwriter.DataFrameWriter,
 'DataType': pyspark.sql.types.DataType,
 'DateType': pyspark.sql.ty

In [116]:
'sc' in locals()

True

In [None]:
spark.stop()