Data Source: https://business.yelp.com/data/resources/open-dataset/

This code filters the data contained in the JSON files to only include items that relate to specific businesses in specific cities.

It is based on a dumbed down version of chris' Data Loaders.

I find that dictionary-based data structures are fine enough for our purposes without over-complicating things. Must bear in mind that the ultimate goal is to convert these data into pytorch-geometric data structures to run GNN models on.

## Data Load and filtering

In [1]:
from itertools import islice
import json
from tqdm import tqdm
import pandas as pd
import torch
from datetime import datetime
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

Quick summary stats to work out what filters to apply

In [3]:
import json
from collections import Counter
from tqdm import tqdm
from itertools import islice
import heapq

city_count = Counter()
state_count = Counter()
category_count = Counter()
total_count = 0

with open("../../Data/yelp_academic_dataset_business.json", "r") as file:
    for line in tqdm(islice(file, None)):
        total_count += 1
        data = json.loads(line)

        city_count[data['city']] += 1
        state_count[data['state']] += 1

        if isinstance(data.get('categories'), str):
            for category in data['categories'].split(', '):
                category_count[category] += 1

print("Total number of businesses:", total_count)
print("=== Top 5 by count... ===")
print("Cities:", dict(heapq.nlargest(5, city_count.items(), key=lambda item: item[1])))
print("States:", dict(heapq.nlargest(5, state_count.items(), key=lambda item: item[1])))
print("Categories:", dict(heapq.nlargest(5, category_count.items(), key=lambda item: item[1])))


150346it [00:00, 207052.78it/s]

Total number of businesses: 150346
=== Top 5 by count... ===
Cities: {'Philadelphia': 14569, 'Tucson': 9250, 'Tampa': 9050, 'Indianapolis': 7540, 'Nashville': 6971}
States: {'PA': 34039, 'FL': 26330, 'TN': 12056, 'IN': 11247, 'MO': 10913}
Categories: {'Restaurants': 52268, 'Food': 27781, 'Shopping': 24395, 'Home Services': 14356, 'Beauty & Spas': 14292}





### Load and Filter Businesses

In [5]:
# Bare bones data structure, a list of dictionaries representing businesses in a selected city 
# and a list of unique business_ids to use to look up with customers to keep
business_limit = None
city='Philadelphia'
business_list = []
business_id_set=set()
with open("../../Data/yelp_academic_dataset_business.json", "r") as file:
    for line in tqdm(islice(file, business_limit)):
        business=json.loads(line)
        
        # Filter to selected city AND exclude duplicates
        if business['city'].upper()==city.upper() and business['business_id'] not in business_id_set:
            business_list.append(business)
            business_id_set.add(business['business_id'])

print(f'{len(business_id_set)} unique businesses from {city} loaded from yelp_academic_dataset_business.json')

# Load filtered lists to pandas dataframes for easier variable processing and file saving
business_df=pd.DataFrame(business_list)
business_df.to_parquet('../../Data/business.parquet', engine='pyarrow', compression='snappy')
del business_df

150346it [00:00, 263168.43it/s]


14576 unique businesses from Philadelphia loaded from yelp_academic_dataset_business.json


### Load and filter reviews

In [6]:
review_user_id_set = set()
review_id_set = set()
buffer = []

parquet_writer = None
batch_size = 10000
error_count = 0
max_errors_to_show = 5

with open("../../Data/yelp_academic_dataset_review.json", "r") as file:
    for i, line in enumerate(tqdm(islice(file, None))):
        try:
            review = json.loads(line)
            # Safeguard access to required keys
            if all(k in review for k in ['business_id', 'review_id', 'user_id']):
                if review['business_id'] in business_id_set and review['review_id'] not in review_id_set:
                    buffer.append(review)
                    review_user_id_set.add(review['user_id'])
                    review_id_set.add(review['review_id'])

                    if len(buffer) >= batch_size:
                        df = pd.DataFrame(buffer)
                        table = pa.Table.from_pandas(df)
                        if parquet_writer is None:
                            parquet_writer = pq.ParquetWriter('../../Data/review.parquet', table.schema, compression='snappy')
                        parquet_writer.write_table(table)
                        buffer.clear()
        except Exception as e:
            error_count += 1
            if error_count <= max_errors_to_show:
                print(f'Line {i} failed: {e}')
            elif error_count == max_errors_to_show + 1:
                print("More errors encountered, suppressing further error messages...")

# Write remaining records
if buffer:
    df = pd.DataFrame(buffer)
    table = pa.Table.from_pandas(df)
    if parquet_writer is None:
        parquet_writer = pq.ParquetWriter('../../Data/review.parquet', table.schema, compression='snappy')
    parquet_writer.write_table(table)

if parquet_writer:
    parquet_writer.close()

print(f'{len(review_id_set)} unique reviews of businesses from {city} posted from {len(review_user_id_set)} unique users have been loaded and saved to review.parquet')


4920154it [00:16, 293069.41it/s]

Line 4920153 failed: Expecting ',' delimiter: line 1 column 135 (char 134)
686422 unique reviews of businesses from Philadelphia posted from 227407 unique users have been loaded and saved to review.parquet





### Load and filter users

In [7]:
user_id_set = set()
buffer = []

parquet_writer = None
batch_size = 10000
error_count = 0
max_errors_to_show = 5

with open("../../Data/yelp_academic_dataset_user.json", "r") as file:
    for i, line in enumerate(tqdm(islice(file, None))):
        try:
            user = json.loads(line)
            if 'user_id' in user and user['user_id'] in review_user_id_set and user['user_id'] not in user_id_set:
                buffer.append(user)
                user_id_set.add(user['user_id'])

                if len(buffer) >= batch_size:
                    df = pd.DataFrame(buffer)
                    table = pa.Table.from_pandas(df)
                    if parquet_writer is None:
                        parquet_writer = pq.ParquetWriter('../../Data/user.parquet', table.schema, compression='snappy')
                    parquet_writer.write_table(table)
                    buffer.clear()
        except Exception as e:
            error_count += 1
            if error_count <= max_errors_to_show:
                print(f'Line {i} failed: {e}')
            elif error_count == max_errors_to_show + 1:
                print("More errors encountered, suppressing further error messages...")

# Write any remaining users
if buffer:
    df = pd.DataFrame(buffer)
    table = pa.Table.from_pandas(df)
    if parquet_writer is None:
        parquet_writer = pq.ParquetWriter('../../Data/user.parquet', table.schema, compression='snappy')
    parquet_writer.write_table(table)

if parquet_writer:
    parquet_writer.close()

print(f'{len(user_id_set)} unique users that have posted at least one review for a business in {city} have been loaded and saved to user.parquet')


1987897it [00:09, 204091.38it/s]

227407 unique users that have posted at least one review for a business in Philadelphia have been loaded and saved to user.parquet





In [8]:
del batch_size, buffer, business, business_id_set, business_limit, business_list, category, category_count, city, city_count, data
del df, error_count, file, i, line, max_errors_to_show, parquet_writer, review, review_id_set, review_user_id_set
del state_count, table, total_count, user, user_id_set
# Clean up any remaining variables
#del globals()[name]  # Uncomment to delete all variables


## Feature processing

### Identify most recent timestamp for normalisation

In [9]:
import pandas as pd
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

# Reload from .parquet files
user_df = pd.read_parquet('../../Data/user.parquet', engine='pyarrow')
review_df = pd.read_parquet('../../Data/review.parquet', engine='pyarrow')

# Convert TS to Datetime
user_df['yelping_since']=pd.to_datetime(user_df['yelping_since'])
review_df['date']=pd.to_datetime(review_df['date'])

# Get most recent timestamp across all records
ts_list=user_df['yelping_since'].to_list()+review_df['date'].to_list()

max_ts=max(ts_list)
min_ts=min(ts_list)
age_ts_list=[max_ts-ts for ts in ts_list]
age_days_list=[ts.days for ts in age_ts_list]
avg_age=sum(age_days_list)/len(age_days_list)

print(f"""
=== Record Age Summary ===
max_ts:         {max_ts}
min_ts:         {min_ts}
avg_age(days):  {avg_age}
""")

# Open the file in write mode ('w')
with open('../../Data/max_ts.txt', 'w') as file:
    # Write the timestamp to the file as a string
    file.write(str(max_ts))

del user_df, review_df, max_ts


=== Record Age Summary ===
max_ts:         2022-01-19 19:46:34
min_ts:         2004-10-12 09:36:53
avg_age(days):  2402.0728801559153



In [10]:
age_ts_list[0].days

5473

### Pre-processing: User Features

In [11]:
import pandas as pd
from sklearn.preprocessing import StandardScaler
from datetime import datetime

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

#=== Reload from .parquet files ===
user_df = pd.read_parquet('../../Data/user.parquet', engine='pyarrow')

#=== Reload max_ts from text file ===
with open('../../Data/max_ts.txt', "r") as file:
    max_ts=pd.to_datetime(file.read().strip())

#=== Calculate record age ===
user_df['yelping_since']=pd.to_datetime(user_df['yelping_since'])
user_df['record_age']=max_ts-user_df['yelping_since']
user_df['record_age']=user_df['record_age'].dt.days

#=== Call reset_index() to provide index mapping between [user_id] and a machine readable integer ===
user_df.reset_index(inplace=True)
user_df.rename(columns={'index':'user_id_index'},inplace=True)

#=== Convert friends to list of indices and filter to ensure only friends within the user_id_set are present ===
user_id_set={user_id for user_id in user_df['user_id'].to_list()} # Find unique set of users
user_id_map=pd.Series(user_df['user_id_index'].values, index=user_df['user_id']).to_dict() # Create mapping dict between user ID and its' index
user_df['friends']=user_df['friends'].apply(lambda friend_list: [user_id_map.get(friend) for friend in friend_list.split(', ') if friend in user_id_set])
user_df['friend_count']=user_df['friends'].apply(lambda friend_list: len(friend_list))

#=== Derive elite variables ===
user_df['elite']=user_df['elite'].apply(lambda elite_years: [pd.to_datetime(datetime(year=int(year), month=1, day=1), errors='coerce')
                                                             for year in elite_years.split(",")
                                                             if year.isdigit()
                                                             ])
user_df['elite_count']=user_df['elite'].apply(lambda elite_years: len(elite_years))
user_df['latest_elite']=user_df['elite'].apply(lambda elite_years: max(elite_years) if len(elite_years)>0 else pd.NaT)
user_df['elite_age']=user_df['latest_elite'].apply(lambda latest_elite: max_ts-latest_elite)
user_df['elite_age']=user_df['elite_age'].dt.days

#=== Drop columns that are no longer needed ===
user_df.drop(columns=['latest_elite','yelping_since'], inplace=True)

#=== Apply z-score scaling to numeric variables besides user_id_index ===
cols_to_scale=user_df.select_dtypes(exclude=['object']).columns.tolist()
cols_to_scale.remove('user_id_index')
scaler = StandardScaler()
user_df[cols_to_scale]=scaler.fit_transform(user_df[cols_to_scale])

#=== Write to .parquet file ===
user_df.to_parquet('../../Data/preprocessed_user.parquet', engine='pyarrow', compression='snappy')

# === Final Post-processed version user_df ===
user_df.head()

#=== remove unneeded variables ===
del cols_to_scale, scaler, user_id_set, user_id_map, user_df

## Pre-processing: Business features

In [12]:
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sentence_transformers import SentenceTransformer
import re
import torch
import pickle

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

#=== Reload from .parquet files ===
business_df = pd.read_parquet('../../Data/business.parquet', engine='pyarrow')

#=== Process categories ===
# Two competing methods applied here
# 1) Categories are treated as graph nodes, and relationships are recorded as edges
# 2) Categories are text embeddings, and the full list is transformed via pretrained LLM

#=== 1) Graph nodes + egdes
def split_text(string_var):
    """
    Helper function to allow .split(', ') to not fail if type is not Str.
    """
    if type(string_var)==str:
        return string_var.split(', ')
    else:
        return []

business_df['category_list']=business_df['categories'].apply(lambda x: split_text(x))

categories={category
            for category_list in business_df['category_list'].to_list()
            for category in category_list}
category_index={category:idx for idx,category in enumerate(categories)}

# Saving business_category_index for later use
with open('../../Data/business_category_index.pickle', 'wb') as handle:
    pickle.dump(category_index, handle, protocol=pickle.HIGHEST_PROTOCOL)

business_df['category_index']=business_df['category_list'].apply(lambda category_list: [category_index.get(category) for category in category_list])

#=== 2) Category Embeddings
print("=== Test business_df['categories'] for suitability of 'all-MiniLM-L6-v2' "
,"\n=== for embedding calculation")
print('Max num Characters:',max([len(category_list) 
     for category_list in business_df['categories'].to_list()
     if type(category_list)==str]))
print('Max num Tokens:',max([len(category_list.split(', ')) 
     for category_list in business_df['categories'].to_list()
     if type(category_list)==str]))

print('Num unique categories:',len(categories))

del categories, category_index

=== Test business_df['categories'] for suitability of 'all-MiniLM-L6-v2'  
=== for embedding calculation
Max num Characters: 503
Max num Tokens: 36
Num unique categories: 1027


**NOTE:** I'm using **all-MiniLM-L6-v2** like a hammer and all business_df columns as nails.  
Some other treatments we can do are:
* Create a 1-hot-encoded vector embedding for the 1027 categories - **EASY** - Vectors will be a sparse representation and would likely not be very predictive
* Create a dense vector representation for categories - **HARD** - Will need to create a separate predictive model (possibly ALBERT/BERT/ETC) to come up with this, value may or may not be better than just using **all-MiniLM-L6-v2**
* Add a Geo-encoding vector model for latitude & longitude - **MODERATE** - I haven't found one yet, probably straightforward if found
* Add a Geo-location hierarchy to the heterogenous graph network - **HARD** - This has been done in some papers. Interesting direction to take AFTER building POC
* Add another one-hot-encoded attribute vector from 'attributes' - **EASY** - This may be better off as a set of post model update features

In [13]:
# Text pre-processing
def preprocess_text(string_var):
    if type(string_var)==str:
        string_var=string_var.lower()
        string_var=re.sub(r'[^a-zA-Z0-9\s]', '', string_var)
        return string_var
    else:
        return ''
    
business_df['categories-cleaned']=business_df['categories'].apply(lambda x: preprocess_text(x))
business_df['name-cleaned']=business_df['name'].apply(lambda x: preprocess_text(x))

# Must append address + city if including multiple cities in this dataset
business_df['address-cleaned']=business_df['address'].apply(lambda x: preprocess_text(x))

model = SentenceTransformer('all-MiniLM-L6-v2')

with torch.no_grad():
    category_embeddings = model.encode(business_df['categories-cleaned'].to_list(), convert_to_tensor=True, show_progress_bar=True)
    name_embeddings     = model.encode(business_df['name-cleaned'].to_list(), convert_to_tensor=True, show_progress_bar=True)
    address_embeddings  = model.encode(business_df['address-cleaned'].to_list(), convert_to_tensor=True, show_progress_bar=True)

    category_embeddings = category_embeddings.cpu()
    name_embeddings     = name_embeddings.cpu()
    address_embeddings  = address_embeddings.cpu()

business_df['category_embeddings']=category_embeddings.tolist()
business_df['name_embeddings']=name_embeddings.tolist()
business_df['address_embeddings']=address_embeddings.tolist()

del model, category_embeddings, name_embeddings, address_embeddings



Batches:   0%|          | 0/456 [00:00<?, ?it/s]

Batches:   0%|          | 0/456 [00:00<?, ?it/s]

Batches:   0%|          | 0/456 [00:00<?, ?it/s]

In [14]:
# Z-score normalisation for review counts
scaler = StandardScaler()
business_df['review_count']=scaler.fit_transform(pd.DataFrame(business_df['review_count']))

# Min-max scaling for average ratings
business_df['stars']=(business_df['stars'] - 1.0)/4.0

business_df.reset_index(inplace=True)
business_df.rename(columns={'index':'business_id_index'},inplace=True)
business_df.head()

del scaler

In [15]:
business_df.to_parquet('../../Data/preprocessed_business.parquet', engine='pyarrow', compression='snappy')

In [16]:
del business_df

## Pre-processing: Review features

In [17]:
import pandas as pd
from datetime import datetime

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

#=== Reload from .parquet files ===
review_df = pd.read_parquet('../../Data/review.parquet', engine='pyarrow')
user_df = pd.read_parquet('../../Data/preprocessed_user.parquet', engine='pyarrow')[['user_id','user_id_index']]
business_df = pd.read_parquet('../../Data/preprocessed_business.parquet', engine='pyarrow')[['business_id','business_id_index']]

# Reload max_ts from text file
with open('../../Data/max_ts.txt', "r") as file:
    max_ts=pd.to_datetime(file.read().strip())

#=== Calculate record age ===
review_df['date']=pd.to_datetime(review_df['date'])
review_df['record_age']=max_ts-review_df['date']
review_df['record_age']=review_df['record_age'].dt.days

#=== Add id index mappings from user and business hierarchies ===
review_df = review_df.merge(user_df[['user_id','user_id_index']], on='user_id')
review_df = review_df.merge(business_df[['business_id','business_id_index']], on='business_id')
review_df.reset_index(inplace=True)
review_df.rename(columns={'index':'review_id_index'},inplace=True)

#=== Min-max scaling for average ratings ===
review_df['stars']=(review_df['stars'] - 1.0)/4.0

# !!! CANNOT PERFROM Z-SCORE NORMALISATION YET AS IT WOULD BIAS THE STAR REGRESSION PREDICTIONS !!!
# from sklearn.preprocessing import StandardScaler
# Z-score normalisation for review counts
# cols_to_scale=['useful','funny','cool']

# scaler = StandardScaler()
# business_df['review_count']=scaler.fit_transform(pd.DataFrame(business_df['review_count']))

print("=== Test review_df['text'] for suitability of 'all-MiniLM-L6-v2' "
,"\n=== for embedding calculation, must be 512 tokens or less...")
print('Max num Characters:',max([len(review_text)
     for review_text in review_df['text'].to_list()
     if type(review_text)==str]))
print('Max num Tokens:',max([len(review_text.split(' '))
     for review_text in review_df['text'].to_list()
     if type(review_text)==str]))

#=== Apply SentenceTransformer BERT model to generate embeddings of ===
# from sentence_transformers import SentenceTransformer
# import re
# Text pre-processing
# def preprocess_text(string_var):
#     if type(string_var)==str:
#         string_var=string_var.lower()
#         string_var=re.sub(r'[^a-zA-Z0-9\s]', '', string_var)
#         return string_var
#     else:
#         return ''
    
# review_df['text-cleaned']=review_df['text'].apply(lambda x: preprocess_text(x))

# model = SentenceTransformer('all-MiniLM-L6-v2')

# with torch.no_grad():
#     text_embeddings = model.encode(review_df['text-cleaned'].to_list(), convert_to_tensor=True, show_progress_bar=True)
#     text_embeddings = text_embeddings.cpu()

# review_df['text_embeddings']=text_embeddings.tolist()

# del model, text_embeddings

=== Test review_df['text'] for suitability of 'all-MiniLM-L6-v2'  
=== for embedding calculation, must be 512 tokens or less...
Max num Characters: 5000
Max num Tokens: 3079


As seen above, the maximum review length cannot be encoded by **all-MiniLM-L6-v2**. That said, a truncated review is better than nothing. hence the following block. We can potentially fix this later either via a chunking methodology, a long context model, or topic modelling.


In [18]:
review_df.head()

Unnamed: 0,review_id_index,review_id,user_id,business_id,stars,useful,funny,cool,text,date,record_age,user_id_index,business_id_index
0,0,BiTunyQ73aT9WBnpR9DZGw,OyoGAe7OKpv6SyGZT5g77Q,7ATYjTIgM3jUlt4UM3IypQ,1.0,1,0,1,I've taken a lot of spin classes over the year...,2012-01-03 15:28:18,3669,14759,1281
1,1,AqPFMleE6RsU23_auESxiA,_7bHUi9Uuf5__HHc_Q8guQ,kxX2SOes4o-D3ZQBkiMRfA,1.0,1,0,1,"Wow! Yummy, different, delicious. Our favo...",2015-01-04 00:01:03,2572,37700,622
2,2,JrIxlS1TzJ-iCu79ul40cQ,eUta8W_HdHMXPzLBBZhL1A,04UD14gamNjLY0IDYVhHJg,0.0,1,2,1,I am a long term frequent customer of this est...,2015-09-23 23:10:31,2309,31304,256
3,3,8JFGBuHMoiNDyfcxuWNtrA,smOvOajNG0lS4Pq7d8g4JQ,RZtGWDLCAtuipwaZ-UfjmQ,0.75,0,0,0,Good food--loved the gnocchi with marinara\nth...,2009-10-14 19:57:14,4479,7721,820
4,4,oyaMhzBSwfGgemSGuZCdwQ,Dd1jQj7S-BFGqRbApFzCFw,YtSqYv1Q_pOltsVPSx54SA,1.0,0,0,0,Tremendous service (Big shout out to Douglas) ...,2013-06-24 11:21:25,3131,29013,1130


In [19]:
review_df.to_parquet('../../Data/preprocessed_review.parquet', engine='pyarrow', compression='snappy')
del review_df, business_df, user_df

## Final Summary Statistics

In [20]:
import pandas as pd
review_df = pd.read_parquet('../../Data/preprocessed_review.parquet', engine='pyarrow')
user_df = pd.read_parquet('../../Data/preprocessed_user.parquet', engine='pyarrow')
business_df = pd.read_parquet('../../Data/preprocessed_business.parquet', engine='pyarrow')

In [21]:
print(f"""=== Total Counts post-filtering ===
business.json:{len(business_df)}
user.json:    {len(user_df)}
review.json:  {len(review_df)}
""")

=== Total Counts post-filtering ===
business.json:14576
user.json:    227407
review.json:  686422



In [22]:
sum([len(friends) for friends in user_df['friends'].to_list()])

1345962