In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
import pyspark

conf = SparkConf().setAppName("building a warehouse")
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

In [21]:
import os 
yelp_dir="/media/diego/QData/datasets/yelp"
business_file = os.path.join(yelp_dir, 'business.json')
checkin_file = os.path.join(yelp_dir, 'checkin.json')
review_file = os.path.join(yelp_dir, 'review.json')
tip_file = os.path.join(yelp_dir, 'tip.json')
user_file = os.path.join(yelp_dir, 'user.json')


In [22]:
df_business = sqlContext.read.json(business_file)
df_checkin = sqlContext.read.json(checkin_file)
df_review = sqlContext.read.json(review_file)
df_tip = sqlContext.read.json(tip_file)
df_user = sqlContext.read.json(user_file)

In [25]:
df_business_count = df_business.count()
df_checkin_count = df_checkin.count()
df_review_count = df_review.count()
df_tip_count = df_tip.count()
df_user_count = df_user.count()

In [27]:
print(f'{df_business_count} businesses, {df_checkin_count} checkins,  {df_review_count} reviews, {df_tip_count} tips, {df_user_count} users')



192609 businesses, 161950 checkins,  6685900 reviews, 1223094 tips, 1637138 users


In [31]:
import pyspark.sql.functions as sf

df_business_cities=df_business.groupBy('city').agg(sf.count('business_id').alias('city_business')).sort(sf.desc('city_business'))

In [45]:
df_business_cities.head(100)

[Row(city='Las Vegas', city_business=29370),
 Row(city='Toronto', city_business=18906),
 Row(city='Phoenix', city_business=18766),
 Row(city='Charlotte', city_business=9509),
 Row(city='Scottsdale', city_business=8837),
 Row(city='Calgary', city_business=7736),
 Row(city='Pittsburgh', city_business=7017),
 Row(city='Montréal', city_business=6449),
 Row(city='Mesa', city_business=6080),
 Row(city='Henderson', city_business=4892),
 Row(city='Tempe', city_business=4550),
 Row(city='Chandler', city_business=4309),
 Row(city='Cleveland', city_business=3605),
 Row(city='Glendale', city_business=3543),
 Row(city='Madison', city_business=3494),
 Row(city='Gilbert', city_business=3462),
 Row(city='Mississauga', city_business=3112),
 Row(city='Peoria', city_business=1919),
 Row(city='Markham', city_business=1766),
 Row(city='North Las Vegas', city_business=1548),
 Row(city='Champaign', city_business=1262),
 Row(city='North York', city_business=1205),
 Row(city='Surprise', city_business=1148),
 R

In [42]:
from pyspark.sql.functions import min, max, col
# init your spark dataframe

expr = [min(col("latitude")),max(col("latitude")),min(col("longitude")),max(col("longitude"))]
df_business_city_locations = df_business.groupBy('city').agg(*expr)

In [47]:
df_business_locations = df_business_cities.join(df_business_city_locations, df_business_cities.city == df_business_city_locations.city).sort(sf.desc('city_business'))

In [48]:
df_business_locations.head(100)

[Row(city='Las Vegas', city_business=29370, city='Las Vegas', min(latitude)=35.9208835, max(latitude)=36.4382486207, min(longitude)=-115.4934712422, max(longitude)=-114.8875063285),
 Row(city='Toronto', city_business=18906, city='Toronto', min(latitude)=43.5603651, max(latitude)=43.8819420199, min(longitude)=-79.71393, max(longitude)=-79.019777165),
 Row(city='Phoenix', city_business=18766, city='Phoenix', min(latitude)=33.2562021195, max(latitude)=33.8484161, min(longitude)=-112.4410671, max(longitude)=-111.709554),
 Row(city='Charlotte', city_business=9509, city='Charlotte', min(latitude)=34.9274555, max(latitude)=35.4874459, min(longitude)=-81.036336, max(longitude)=-80.5216184),
 Row(city='Scottsdale', city_business=8837, city='Scottsdale', min(latitude)=33.2344417, max(latitude)=33.826752, min(longitude)=-112.3883099219, max(longitude)=-111.6828434127),
 Row(city='Calgary', city_business=7736, city='Calgary', min(latitude)=50.8640795, max(latitude)=51.20754, min(longitude)=-114.35

In [88]:
df_business_categories = df_business.select("categories")

In [89]:
df_business_categories.head(10)

[Row(categories='Golf, Active Life'),
 Row(categories='Specialty Food, Restaurants, Dim Sum, Imported Food, Food, Chinese, Ethnic Food, Seafood'),
 Row(categories='Sushi Bars, Restaurants, Japanese'),
 Row(categories='Insurance, Financial Services'),
 Row(categories='Plumbing, Shopping, Local Services, Home Services, Kitchen & Bath, Home & Garden, Water Heater Installation/Repair'),
 Row(categories='Shipping Centers, Couriers & Delivery Services, Local Services, Printing Services'),
 Row(categories='Beauty & Spas, Hair Salons'),
 Row(categories="Hair Salons, Hair Stylists, Barbers, Men's Hair Salons, Cosmetics & Beauty Supply, Shopping, Beauty & Spas"),
 Row(categories='Nail Salons, Beauty & Spas, Day Spas'),
 Row(categories='Beauty & Spas, Nail Salons, Day Spas, Massage')]

In [90]:
from pyspark.sql.functions import split, ltrim

In [91]:
split_categories = split(df_business_categories['categories'], ',')

In [99]:
df_split_categories = df_business_categories.select(split_categories).withColumnRenamed('split(categories, ,)','split_categories')

In [100]:
df_split_categories.head()

Row(split_categories=['Golf', ' Active Life'])

In [101]:
df_split_categories.columns

['split_categories']

In [85]:
from pyspark.sql.functions import explode

In [114]:
df_split_categories.select(explode('split_categories'))

[Row(col='Golf'),
 Row(col=' Active Life'),
 Row(col='Specialty Food'),
 Row(col=' Restaurants'),
 Row(col=' Dim Sum'),
 Row(col=' Imported Food'),
 Row(col=' Food'),
 Row(col=' Chinese'),
 Row(col=' Ethnic Food'),
 Row(col=' Seafood')]

In [117]:
df_exploded_categories = df_split_categories.select(explode('split_categories'))

In [118]:
df_exploded_categories.head(10)

[Row(col='Golf'),
 Row(col=' Active Life'),
 Row(col='Specialty Food'),
 Row(col=' Restaurants'),
 Row(col=' Dim Sum'),
 Row(col=' Imported Food'),
 Row(col=' Food'),
 Row(col=' Chinese'),
 Row(col=' Ethnic Food'),
 Row(col=' Seafood')]

In [119]:
df_exploded_categories.columns

['col']

In [127]:
df_expl_strip_cats = df_exploded_categories.withColumn("coll",ltrim(col("col")))

In [128]:
df_expl_strip_cats.head(20)

[Row(col='Golf', coll='Golf'),
 Row(col=' Active Life', coll='Active Life'),
 Row(col='Specialty Food', coll='Specialty Food'),
 Row(col=' Restaurants', coll='Restaurants'),
 Row(col=' Dim Sum', coll='Dim Sum'),
 Row(col=' Imported Food', coll='Imported Food'),
 Row(col=' Food', coll='Food'),
 Row(col=' Chinese', coll='Chinese'),
 Row(col=' Ethnic Food', coll='Ethnic Food'),
 Row(col=' Seafood', coll='Seafood'),
 Row(col='Sushi Bars', coll='Sushi Bars'),
 Row(col=' Restaurants', coll='Restaurants'),
 Row(col=' Japanese', coll='Japanese'),
 Row(col='Insurance', coll='Insurance'),
 Row(col=' Financial Services', coll='Financial Services'),
 Row(col='Plumbing', coll='Plumbing'),
 Row(col=' Shopping', coll='Shopping'),
 Row(col=' Local Services', coll='Local Services'),
 Row(col=' Home Services', coll='Home Services'),
 Row(col=' Kitchen & Bath', coll='Kitchen & Bath')]

In [135]:
from pyspark.sql.functions import desc
df_grouped_cats = df_expl_strip_cats.groupby('coll').agg({'coll':'count'}).sort(desc('count(coll)'))

In [136]:
df_grouped_cats.show(100)

+--------------------+-----------+
|                coll|count(coll)|
+--------------------+-----------+
|         Restaurants|      59371|
|            Shopping|      31878|
|                Food|      29989|
|       Home Services|      19729|
|       Beauty & Spas|      19370|
|    Health & Medical|      17171|
|      Local Services|      13932|
|          Automotive|      13203|
|           Nightlife|      13095|
|                Bars|      11341|
|Event Planning & ...|      10371|
|         Active Life|       9521|
|             Fashion|       7798|
|          Sandwiches|       7332|
|        Coffee & Tea|       7321|
|           Fast Food|       7257|
|American (Traditi...|       7107|
|         Hair Salons|       6955|
|               Pizza|       6804|
|       Home & Garden|       6489|
|Arts & Entertainment|       6304|
|Professional Serv...|       6276|
|         Auto Repair|       6140|
|     Hotels & Travel|       6033|
|             Doctors|       5867|
|         Real Estat

In [139]:
df_restaurants = df_business.where(df_business.categories.contains('Restaurants'))

In [141]:
df_restaurants_count= df_restaurants.count()

In [142]:
df_restaurants_count

59371