Data Source

https://www.kaggle.com/yelp-dataset/yelp-dataset

In [1]:
import findspark
findspark.init()
import pyspark


In [2]:
sc = pyspark.SparkContext(appName='big-data')

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('big-data').getOrCreate()

## Reviews

In [4]:
reviews_df = spark.read.json('data/yelp_academic_dataset_review.json')

In [6]:
reviews_df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [7]:
reviews_df.first()

Row(business_id='ujmEBvifdJM6h6RLv4wQIg', cool=0, date='2013-05-07 04:34:36', funny=1, review_id='Q1sbwvVQXV2734tPgoKj4Q', stars=1.0, text='Total bill for this horrible service? Over $8Gs. These crooks actually had the nerve to charge us $69 for 3 pills. I checked online the pills can be had for 19 cents EACH! Avoid Hospital ERs at all costs.', useful=6, user_id='hG7b0MtEbXx5QzbzE6C_VA')

In [8]:
reviews_df.count()

6685900

In [9]:
reviews_df.na.drop()

DataFrame[business_id: string, cool: bigint, date: string, funny: bigint, review_id: string, stars: double, text: string, useful: bigint, user_id: string]

In [10]:
reviews_df.count()

6685900

In [11]:
train, val, test = reviews_df.randomSplit([0.8, 0.1, 0.1], seed=12)

DataFrame[business_id: string, cool: bigint, date: string, funny: bigint, review_id: string, stars: double, text: string, useful: bigint, user_id: string]

Column<b'cool'>

In [32]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

train_small = train.limit(100000)
test_small = test.limit(2000)

tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
hashtf = HashingTF(numFeatures=2**16, inputCol="tokens", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[tokenizer, hashtf, idf])

idf_model = pipeline.fit(train_small)
train_df = idf_model.transform(train_small)
val_df = idf_model.transform(test_small)
train_df.show(5)


+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+--------------------+--------------------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|              tokens|                  tf|            features|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+--------------------+--------------------+--------------------+
|--Gc998IMjLn8yr-H...|   0|2014-07-01 01:22:49|    1|PE8Uzlpx9fFQjrlqn...|  5.0|Delicious! 

Came...|     0|Ws6z36Ffk4X8B6XHD...|[delicious!, , , ...|(65536,[308,2071,...|(65536,[308,2071,...|
|--I7YYLada0tSLkOR...|   0|2014-11-14 21:29:09|    0|3RmVmn37Z0zEtx3Ww...|  4.0|My husband and I ...|     3|STcrjP1twU03Vk6sy...|[my, husband, and...|(65536,[248,2071,...|(65536,[248,2071,...|
|--I7YYLada0tSLkOR...|   0|2014-12-

In [33]:
lr = LogisticRegression(maxIter=100, labelCol="stars")
lr_model = lr.fit(train_df)



KeyboardInterrupt: 

In [None]:

predictions = lr_model.transform(val_df)


from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="stars", predictionCol="prediction")
evaluator.evaluate(predictions)

## Businesses

In [5]:
business_df = spark.read.json('data/yelp_academic_dataset_business.json')

In [7]:
business_df.printSchema()

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

In [12]:
business_df.first().categories

'Golf, Active Life'

In [2]:
business_df.show()

NameError: name 'business_df' is not defined

In [1]:
business_df.first().show()

NameError: name 'business_df' is not defined

In [6]:
reviews = reviews_df.alias('reviews')
business = business_df.alias('business')

In [13]:
j = reviews_df.join(business_df, reviews_df.business_id == business_df.business_id)

In [15]:
j.first()

Row(business_id='--9e1ONYQuAa-CB_Rrw7Tw', cool=0, date='2015-10-23 13:23:34', funny=0, review_id='eCJObv4SZupXH5TKWaUhSg', stars=2.0, text='Very busy and noisy restaurant.\nAsparagas was cooked perfectly, however quite flavorless. The mashed potatoes were tasty.  \nFor the price, the spinach should have been fresh and the cream sauce needs improvement. \nMy organic filet was good and nicely cooked to medium rare, however not near as tasty as other organic beef I have had for half the price.\nThe New Orleans gumbo was a tad too salty.  The yorkshire style buns were average and were cold.  \nThe key lime pie was average.  The tartness was lacking.  The apple pie was a disappointment, with a doughy flavoured crust.\nAnother thing that  high end restaurants need to learn is how to choose great coffees like good wines.  I asked where the beans were from and they had no idea.  I would expect excellence in all areas of my food consumption and yes, even with my coffee. The espresso was extreme

In [7]:
from pyspark.sql.functions import concat_ws

examples = reviews.join(business, reviews.business_id == business.business_id) \
    .select(concat_ws(' ', reviews.stars, business.city, business.categories).alias('context'), \
            reviews.text.alias('review'))
    


In [24]:
examples.show()

+--------------------+--------------------+
|             context|              review|
+--------------------+--------------------+
|1.0 Las Vegas Fit...|Total bill for th...|
|5.0 Las Vegas Bea...|I *adore* Travis ...|
|5.0 Chandler Heal...|I have to say tha...|
|5.0 Calgary Bars,...|Went in for a lun...|
|1.0 Scottsdale Te...|Today was my seco...|
|4.0 Pittsburgh Re...|I'll be the first...|
|3.0 Markham Food,...|Tracy dessert had...|
|1.0 Scottsdale Sp...|This place has go...|
|2.0 Cleveland Bre...|I was really look...|
|3.0 Las Vegas Sho...|It's a giant Best...|
|4.0 Las Vegas Per...|Like walking back...|
|1.0 Mesa Restaura...|Walked in around ...|
|4.0 Pittsburgh It...|Wow. So surprised...|
|4.0 Las Vegas Hot...|Michael from Red ...|
|1.0 Toronto Asian...|I cannot believe ...|
|5.0 Toronto Sandw...|You can't really ...|
|4.0 Orange Villag...|Great lunch today...|
|3.0 Phoenix Carib...|I love chinese fo...|
|5.0 Chandler Sand...|We've been a huge...|
|3.0 Toronto Resta...|Good selec

In [25]:
examples.count()

6685900

In [31]:
from pyspark.ml.feature import Tokenizer
from pyspark.ml import Pipeline

context_tokenizer = Tokenizer(inputCol="context", outputCol="context_tokens")
reviews_tokenizer = Tokenizer(inputCol="review", outputCol="review_tokens")

pipeline = Pipeline(stages=[context_tokenizer, reviews_tokenizer])

ex = examples.limit(1000)
token_model = pipeline.fit(ex)
tokenized_examples = token_model.transform(ex)

In [32]:
tokenized_examples.first()

Row(context='1.0 Las Vegas Fitness & Instruction, Doctors, Health & Medical, Active Life, Gyms, Emergency Rooms, Medical Centers, Hospitals', review='Total bill for this horrible service? Over $8Gs. These crooks actually had the nerve to charge us $69 for 3 pills. I checked online the pills can be had for 19 cents EACH! Avoid Hospital ERs at all costs.', context_tokens=['1.0', 'las', 'vegas', 'fitness', '&', 'instruction,', 'doctors,', 'health', '&', 'medical,', 'active', 'life,', 'gyms,', 'emergency', 'rooms,', 'medical', 'centers,', 'hospitals'], review_tokens=['total', 'bill', 'for', 'this', 'horrible', 'service?', 'over', '$8gs.', 'these', 'crooks', 'actually', 'had', 'the', 'nerve', 'to', 'charge', 'us', '$69', 'for', '3', 'pills.', 'i', 'checked', 'online', 'the', 'pills', 'can', 'be', 'had', 'for', '19', 'cents', 'each!', 'avoid', 'hospital', 'ers', 'at', 'all', 'costs.'])

Unfortunately, the default ML Tokenizer is really dumb, there is a RegexTokenizer but we could actually try to use an industrial-strength
Tokenizer from spacy.

Implementing an ML-Transformer would require a Java counter-part on top of the python implementation, so let's fall back to UDF's instead.

In [10]:
import spacy
from pyspark.sql.functions import pandas_udf

nlp = spacy.load("en_core_web_sm")

def tokenize_with_spacy(s):
    return ' '.join([token.text for token in nlp(s)])

@pandas_udf('string')
def tokenize(x):
    return x.apply(tokenize_with_spacy)


In [13]:
tokenized = examples.select(tokenize(examples.context).alias('context'), tokenize(examples.review).alias('review'))

In [None]:
tokenized.first()

In [None]:
sub = tokenized.limit(50)

In [11]:
sub.count()

5000

In [12]:
clean = sub.na.drop()

In [None]:
clean.count()

In [19]:
train, val, test = sub.randomSplit([0.9, 0.75, 0.25], seed=12)


In [None]:
train.count()
val.count()
test.count()

In [24]:
def write_df(df, file_name):
    df.select(df.context).write.format('csv').save(file_name + '_src.txt')
    df.select(df.review).write.format('csv').save(file_name + '_tgt.txt')
    


In [None]:
write_df(train, 'train')
write_df(val, 'val')
write_df(test, 'test')