In [98]:
import re
import pdb
from numpy import *
from pandas import *
import pyspark
from pyspark.sql.functions import split
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [99]:
tipDF = spark.read.csv("gs://pda-yelp-bucket/input/tip/tip.csv",header='true')
tipDF.show()

+--------------------+----------+-----+--------------------+--------------------+--------------------+
|                text|      date|likes|         business_id|             user_id|       business_name|
+--------------------+----------+-----+--------------------+--------------------+--------------------+
|Happy hour 5 7 Mo...|2016-10-12|    0|dAa0hB2yrnHzVmsCk...|ulQ8Nyj7jCUR8M83S...|        Rock Run Inn|
|Come early on Sun...|2016-07-03|    0|SqW3igh1_Png336VI...|ulQ8Nyj7jCUR8M83S...|           Bob Evans|
|    Love their soup |2016-01-07|    0|KNpcPGqDORDdvtekX...|ulQ8Nyj7jCUR8M83S...|Carriage Inn Rest...|
|Soups are fantastic |2016-05-22|    0|KNpcPGqDORDdvtekX...|ulQ8Nyj7jCUR8M83S...|Carriage Inn Rest...|
|Thursday night is...|2016-06-09|    0|KNpcPGqDORDdvtekX...|ulQ8Nyj7jCUR8M83S...|Carriage Inn Rest...|
|Very good for lun...|2016-06-01|    0|8qNOI6Q1-rJrvWWD5...|ulQ8Nyj7jCUR8M83S...|Luciano's Italian...|
|Tuesday spaghetti...|2016-06-14|    0|8qNOI6Q1-rJrvWWD5...|ulQ8Nyj7jCUR8

In [100]:
tipDF=tipDF.select('business_id','business_name','text')

In [None]:
tipDF.columns
tipDF.select('business_id').count()

In [103]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, lower, regexp_replace
from pyspark.ml.feature import Tokenizer, StopWordsRemover

# Clean text
tipDF = tipDF.select('business_id', 'business_name', (lower(regexp_replace('text', "[^a-zA-Z\\s]", "")).alias('text')))

tipDF.show(5)

+--------------------+--------------------+--------------------+
|         business_id|       business_name|                text|
+--------------------+--------------------+--------------------+
|dAa0hB2yrnHzVmsCk...|        Rock Run Inn|happy hour   mond...|
|SqW3igh1_Png336VI...|           Bob Evans|come early on sun...|
|KNpcPGqDORDdvtekX...|Carriage Inn Rest...|    love their soup |
|KNpcPGqDORDdvtekX...|Carriage Inn Rest...|soups are fantastic |
|KNpcPGqDORDdvtekX...|Carriage Inn Rest...|thursday night is...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



In [130]:
businessDF = spark.read.csv("gs://pda-yelp-bucket/input/business/business.csv",header='true')
businessDF = businessDF.select('business_id', 'city')
businessDF.show(5)

+--------------------+--------------+
|         business_id|          city|
+--------------------+--------------+
|FYWN1wneV18bWNgQj...|     Ahwatukee|
|He-G7vWjzVUysIKrf...|      McMurray|
|KQPW8lFf1y5BT2Mxi...|       Phoenix|
|8DShNS-LuFqpEWIp0...|         Tempe|
|PfOCPjBrlQAnz__NX...|Cuyahoga Falls|
+--------------------+--------------+
only showing top 5 rows



In [131]:
food_list=['pasta','pizza','burger','banana cream pie','salad','truffle','salmon steak','steak','chips','sizzler','icecream','spaghetti','ice cream','salad','chinese','dimsum','pie','fries','noodle','pudding','pancake','brownie','sushi','eggroll','bacon and eggs','french toast','porridge','biryani','apple pie','hamburger','clam chowder','barbecue','taco','fajita','salmon','meatloaf','macaroni','crabcake','sandwich','popcorn','waffle','enchiladas','lobster','buffalo wing','cookie','soups','nacho','hot dog','cheeseburger','soup']

In [132]:
new_df = businessDF.join(tipDF, on=['business_id'], how='left_outer')
new_df.show(5)

+--------------------+---------+--------------------+--------------------+
|         business_id|     city|       business_name|                text|
+--------------------+---------+--------------------+--------------------+
|--9e1ONYQuAa-CB_R...|Las Vegas|Delmonico Steakhouse|don t leave witho...|
|--9e1ONYQuAa-CB_R...|Las Vegas|Delmonico Steakhouse| currently closed...|
|--9e1ONYQuAa-CB_R...|Las Vegas|Delmonico Steakhouse|truffle   parmesa...|
|--9e1ONYQuAa-CB_R...|Las Vegas|Delmonico Steakhouse|make sure you sav...|
|--9e1ONYQuAa-CB_R...|Las Vegas|Delmonico Steakhouse|restaurant week  ...|
+--------------------+---------+--------------------+--------------------+
only showing top 5 rows



In [133]:
import pyspark.sql.functions as psf
new_df.filter(psf.col('text').rlike('(^|\s)(' + '|'.join(food_list) + ')(\s|$)')).show(5)

+--------------------+---------+--------------------+--------------------+
|         business_id|     city|       business_name|                text|
+--------------------+---------+--------------------+--------------------+
|KNpcPGqDORDdvtekX...|Elizabeth|Carriage Inn Rest...|    love their soup |
|KNpcPGqDORDdvtekX...|Elizabeth|Carriage Inn Rest...|soups are fantastic |
|KNpcPGqDORDdvtekX...|Elizabeth|Carriage Inn Rest...|thursday night is...|
|8qNOI6Q1-rJrvWWD5...|White Oak|Luciano's Italian...|very good for lun...|
|8qNOI6Q1-rJrvWWD5...|White Oak|Luciano's Italian...|tuesday spaghetti...|
+--------------------+---------+--------------------+--------------------+
only showing top 5 rows



In [136]:
final_df=new_df.withColumn(
        'top_dishes', 
        psf.regexp_extract('text', '(?=^|\s)(' + '|'.join(food_list) + ')(?=\s|$)', 0))\

In [137]:
final_df = final_df.select('business_id', 'business_name', 'city', 'text', 'top_dishes')
final_df.show(5)

+--------------------+--------------------+---------+--------------------+----------+
|         business_id|       business_name|     city|                text|top_dishes|
+--------------------+--------------------+---------+--------------------+----------+
|--9e1ONYQuAa-CB_R...|Delmonico Steakhouse|Las Vegas|don t leave witho...|          |
|--9e1ONYQuAa-CB_R...|Delmonico Steakhouse|Las Vegas| currently closed...|          |
|--9e1ONYQuAa-CB_R...|Delmonico Steakhouse|Las Vegas|truffle   parmesa...|   truffle|
|--9e1ONYQuAa-CB_R...|Delmonico Steakhouse|Las Vegas|make sure you sav...|          |
|--9e1ONYQuAa-CB_R...|Delmonico Steakhouse|Las Vegas|restaurant week  ...|          |
+--------------------+--------------------+---------+--------------------+----------+
only showing top 5 rows



In [138]:
final_df = final_df.select('business_id', 'business_name', 'city', 'top_dishes')
outputpath="gs://pda-yelp-bucket/output/topDishes/"

In [139]:
final_df.coalesce(1).write.save(outputpath,sep=",",mode="append",format="csv",header="TRUE")