In [1]:
# Imports
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from sklearn.cluster import DBSCAN
import numpy as np
from datetime import datetime
from hashlib import md5
from collections import Counter
from scipy.stats import zscore
from langdetect import detect
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

In [2]:
# business.json
# Helpers
def parse_and_clean_cats(cat_field):
    """
    - If cat_field is a str, split on commas and strip each piece.
    - Otherwise return [].
    - Then lowercase & remove trailing “ food” if desired.
    """
    if not isinstance(cat_field, str):
        return []
    # split & strip
    raw = [c.strip() for c in cat_field.split(",") if c.strip()]
    # normalize
    out = []
    for c in raw:
        c2 = c.lower()
        # e.g. unify "mexican food" -> "mexican"
        if c2.endswith(" food"):
            c2 = c2[: -len(" food")]
        out.append(c2)
    return out

# 1. Load
df = pd.read_json("Data/business.json", lines=True)

# 2. Drop incomplete
mandatory = ["business_id","name","city","state","latitude","longitude","stars","review_count"]
df.dropna(subset=mandatory, inplace=True)

# 3. Unique IDs
df.sort_values("review_count", ascending=False, inplace=True)
df = df.drop_duplicates("business_id", keep="first")

# 4. Geographic box
'''
lat_ok = df.latitude.between(32, 42)
lon_ok = df.longitude.between(-125, -114)
df = df[lat_ok & lon_ok]
'''

# 5. DBSCAN for geo‐outliers
'''
coords = df[["latitude","longitude"]].to_numpy()
clust = DBSCAN(eps=0.1, min_samples=5).fit(coords)
df = df[clust.labels_ != -1]
'''

# 6. is_open & review_count
# df = df[df.is_open == 1]
df = df[df.review_count >= 3]

# 7. Stars bounds & rounding
df = df[df.stars.between(0, 5)]
df["stars"] = (df.stars * 2).round() / 2.0

# 8. Hours validation
def valid_hours(day_hours):
    try:
        start,end = day_hours.split("-")
        return int(start.replace(":", "")) < int(end.replace(":", ""))
    except:
        return False

def valid_hours_dict(h):
    """
    Returns True if h is a dict and every value string parses to start<end.
    """
    if not isinstance(h, dict):
        return False
    # all() over its values (e.g. "10:00-21:00")
    return all(
        isinstance(v, str) and valid_hours(v)
        for v in h.values()
    )

# Now filter:
df = df[df.hours.apply(valid_hours_dict)]

df = df[df.hours.apply(lambda h: all(valid_hours(v) for v in h.values()))]

# 9. Outlier numeric
for col in ["review_count","stars"]:
    z = (df[col] - df[col].mean())/df[col].std()
    df = df[np.abs(z) <= 3]

# 10. Normalize categories

# normalize
df["categories"] = df.categories.apply(parse_and_clean_cats)
def clean_cats(cat_list):
    return [c.strip().lower().replace("food","") for c in cat_list]
df["categories"] = df.categories.apply(clean_cats)


# Final clean subset
clean_df = df.reset_index(drop=True)
clean_df.index += 1
clean_df.head()

Unnamed: 0,business_id,name,address,city,state,postal_code,latitude,longitude,stars,review_count,is_open,attributes,categories,hours
1,GEuzwtdp2DX9c9HyhOT9KQ,The Landing Restaurant,22 N Main St,New Hope,PA,18938,40.365223,-74.95118,3.0,377,1,"{'OutdoorSeating': 'True', 'RestaurantsReserva...","[american (new), restaurants]","{'Monday': '11:0-22:0', 'Tuesday': '11:0-22:0'..."
2,a62d7e_xXeljJcOUHkJjkg,Circles Contemporary Asian,1516 Tasker St,Philadelphia,PA,19145,39.930647,-75.170729,3.5,377,1,"{'RestaurantsDelivery': 'True', 'RestaurantsAt...","[vegetarian, asian fusion, thai, restaurants]","{'Monday': '17:0-22:45', 'Tuesday': '11:0-22:4..."
3,56TulnfOZy0JI96zwVv27w,Gioia's Deli,1934 Macklind Ave,Saint Louis,MO,63110,38.617402,-90.276806,4.5,376,1,"{'BikeParking': 'True', 'RestaurantsAttire': '...","[sandwiches, delis, italian, restaurants]","{'Monday': '10:0-16:0', 'Tuesday': '10:0-16:0'..."
4,140WNflw_n01GVGtB-PwJQ,Tarpon Turtle Grill & Marina,1513 Lake Tarpon Ave,Tarpon Springs,FL,34689,28.146169,-82.728302,3.5,376,1,"{'BusinessParking': '{'garage': False, 'street...","[american (new), restaurants, sea, caribbean]","{'Monday': '11:0-21:0', 'Tuesday': '11:0-21:0'..."
5,4klUc-NroGEM1vGzp2-0Qg,Rudie's Seafood & Sausage,1402 McGavock Pike,Nashville,TN,37216,36.204719,-86.723909,4.5,376,0,"{'BikeParking': 'True', 'RestaurantsTakeOut': ...","[sea, restaurants, american (new), cajun/creole]","{'Monday': '16:0-22:0', 'Tuesday': '16:0-22:0'..."


In [3]:
# checkin.json

# 1. Load & split/merge any duplicate business rows
raw = pd.read_json("Data/checkin.json", lines=True)
# If there are multiple rows per business, merge:
raw = raw.groupby("business_id").agg({"date": lambda L: ",".join(L)}).reset_index()

# 2. Parse dates
def parse_dates(s):
    out = []
    for part in s.split(","):
        try:
            out.append(datetime.strptime(part.strip(), "%Y-%m-%d %H:%M:%S"))
        except ValueError:
            continue
    return sorted(set(out))  # dedupe & sort

raw["checkins"] = raw.date.apply(parse_dates)
raw = raw[raw.checkins.str.len() > 0]  # drop empty

# 3. Expand to one row per event
rows = []
for _, r in raw.iterrows():
    for ts in r.checkins:
        rows.append({"business_id": r.business_id, "checkin_time": ts})
events = pd.DataFrame(rows)

# 4. Aggregate & filter
agg = events.groupby("business_id").checkin_time.count().rename("n_checkins")
# drop too‑few
good = agg[agg >= 5].index
events = events[events.business_id.isin(good)]

# 5. Save clean set
events.reset_index(drop=True).to_parquet("clean_checkins.parquet")
events.index += 1

events.head()

Unnamed: 0,business_id,checkin_time
1,---kPU91CF4Lq2-WlRu9Lw,2020-03-13 21:10:56
2,---kPU91CF4Lq2-WlRu9Lw,2020-06-02 22:18:06
3,---kPU91CF4Lq2-WlRu9Lw,2020-07-24 22:42:27
4,---kPU91CF4Lq2-WlRu9Lw,2020-10-24 21:36:13
5,---kPU91CF4Lq2-WlRu9Lw,2020-12-09 21:23:33


In [4]:
# review.json
# set up your output writer
schema = None
writer = None
analyzer = SentimentIntensityAnalyzer()

def parse_date(s):
    try:
        return datetime.strptime(s, "%Y-%m-%d")
    except:
        return pd.NaT

def sentiment_ok(row):
    s = analyzer.polarity_scores(row.text)["compound"]
    if row.stars >= 4 and s < 0:   return False
    if row.stars <= 2 and s > 0.5: return False
    return True

chunksize = 100_000
reader = pd.read_json("Data/review.json", lines=True, chunksize=chunksize)

for i, chunk in enumerate(reader):
    # 1. mandatory columns + dedupe by review_id within chunk
    mandatory = ["review_id","user_id","business_id","stars","date","text"]
    chunk = chunk.dropna(subset=mandatory)
    chunk = chunk.drop_duplicates("review_id", keep="first")
    
    # 2. parse + date range
    chunk["date_parsed"] = chunk.date.map(parse_date)
    mask = chunk.date_parsed.between("2010-01-01", "2020-12-31")
    chunk = chunk[mask]

    # 3. stars bounds, text length
    chunk = chunk[chunk.stars.between(1,5)]
    chunk = chunk[chunk.text.str.len() >= 5]

    # 4. hash‑based dedupe (within chunk)
    chunk["text_hash"] = chunk.text.map(lambda t: md5(t.encode("utf8")).hexdigest())
    chunk = chunk.drop_duplicates("text_hash", keep="first")

    # 5. vote counts ≥0
    for col in ["useful","funny","cool"]:
        chunk = chunk[chunk[col] >= 0]

    # 6. z‑score outliers — approximate by computing on chunk
    for col in ["useful","funny","cool"]:
        zs = zscore(chunk[col])
        chunk = chunk[zs < 3]

    # 7. sentiment mismatch
    chunk = chunk[chunk.apply(sentiment_ok, axis=1)]

    # 8. append to parquet
    table = pa.Table.from_pandas(chunk.drop(columns=["text_hash"]), preserve_index=False)
    if writer is None:
        writer = pq.ParquetWriter("clean_reviews.parquet", table.schema)
    writer.write_table(table)

writer.close()

In [5]:
# tip.json
# Configuration
INPUT_FILE = "Data/tip.json"
OUTPUT_FILE = "clean_tips.parquet"
DATE_MIN = "2010-01-01"
DATE_MAX = "2020-12-31"
TEXT_MIN_LEN = 5
OUTLIER_Z = 3

# 1. Load the full JSON into a DataFrame
df = pd.read_json(INPUT_FILE, lines=True)

# 2. Drop rows with missing mandatory fields
mandatory = ["text", "date", "compliment_count", "business_id", "user_id"]
df.dropna(subset=mandatory, inplace=True)

# 3. Parse dates and filter by range
df["date_parsed"] = pd.to_datetime(df["date"], format="%Y-%m-%d", errors="coerce")
mask_date = df["date_parsed"].notna() & (
    df["date_parsed"] >= pd.to_datetime(DATE_MIN)
) & (
    df["date_parsed"] <= pd.to_datetime(DATE_MAX)
)
df = df[mask_date]

# 4. Enforce non-negative compliments and remove outliers
df = df[df["compliment_count"] >= 0]
if not df.empty:
    # compute z-scores and filter
    z_scores = zscore(df["compliment_count"].astype(float), nan_policy='omit')
    mask_outlier = np.abs(z_scores) < OUTLIER_Z
    df = df[mask_outlier]

# 5. Filter by minimum text length
df["text_len"] = df["text"].str.len().fillna(0)
df = df[df["text_len"] >= TEXT_MIN_LEN]

# 6. De-duplicate by text hash
df["text_hash"] = df["text"].apply(lambda t: md5(t.encode("utf-8")).hexdigest())
df.drop_duplicates(subset=["text_hash"], inplace=True)

# 7. Export cleaned tips
df.drop(columns=["text_hash", "text_len"], inplace=True)
df.to_parquet(OUTPUT_FILE, index=False)

print(f"Cleaning complete! Saved to {OUTPUT_FILE} with {len(df)} records.")

df.index += 1
df.head()

Cleaning complete! Saved to clean_tips.parquet with 806995 records.


Unnamed: 0,user_id,business_id,text,date,compliment_count,date_parsed
1,AGNUgVwnZUey3gcPCJ76iw,3uLgwr0qeCNMjKenHJwPGQ,Avengers time with the ladies.,2012-05-18 02:17:21,0,2012-05-18 02:17:21
2,NBN4MgHP9D3cw--SnauTkA,QoezRbYQncpRqyrLH6Iqjg,They have lots of good deserts and tasty cuban...,2013-02-05 18:35:10,0,2013-02-05 18:35:10
3,-copOvldyKh1qr-vzkDEvw,MYoRNLb5chwjQe3c_k37Gg,It's open even when you think it isn't,2013-08-18 00:56:08,0,2013-08-18 00:56:08
4,FjMQVZjSqY8syIO-53KFKw,hV-bABTK-glh5wj31ps_Jw,Very decent fried chicken,2017-06-27 23:05:38,0,2017-06-27 23:05:38
5,ld0AperBXk1h6UbqmM80zw,_uN0OudeJ3Zl_tf6nxg5ww,Appetizers.. platter special for lunch,2012-10-06 19:43:09,0,2012-10-06 19:43:09


In [6]:
# Configuration
INPUT_FILE = "Data/user.json"
OUTPUT_FILE = "clean_users.parquet"
DATE_MIN = "2010-01-01"
DATE_MAX = "2020-12-31"
OUTLIER_Z = 3
CHUNK_SIZE = 100_000  # adjust based on available memory

# Mandatory and numeric fields
mandatory = [
    "user_id", "review_count", "yelping_since",
    "useful", "funny", "cool", "fans", "average_stars"
]
numeric_cols = [
    "review_count", "useful", "funny", "cool", "fans", "average_stars",
    "compliment_hot", "compliment_more", "compliment_profile", "compliment_cute",
    "compliment_list", "compliment_note", "compliment_plain", "compliment_cool",
    "compliment_funny", "compliment_writer", "compliment_photos"
]

# Helper: remove outliers by z-score
def remove_outliers(df, cols, z=OUTLIER_Z):
    for c in cols:
        if c in df:
            scores = zscore(df[c].astype(float), nan_policy='omit')
            df = df[np.abs(scores) < z]
    return df

# Prepare Parquet writer and dedupe tracking
writer = None
seen_ids = set()

# Process in chunks
for chunk in pd.read_json(INPUT_FILE, lines=True, chunksize=CHUNK_SIZE):
    # 1. Drop missing mandatory fields
    chunk.dropna(subset=mandatory, inplace=True)

    # 2. Ensure non-negative numeric values
    for col in numeric_cols:
        if col in chunk:
            chunk = chunk[chunk[col] >= 0]

    # 3. Parse dates and filter by range
    chunk['joined'] = pd.to_datetime(chunk['yelping_since'], format="%Y-%m-%d", errors='coerce')
    mask = (
        chunk['joined'].notna() &
        (chunk['joined'] >= pd.to_datetime(DATE_MIN)) &
        (chunk['joined'] <= pd.to_datetime(DATE_MAX))
    )
    chunk = chunk[mask]

    # 4. Remove outliers
    chunk = remove_outliers(chunk, numeric_cols)

    # 5. Validate average_stars
    chunk = chunk[(chunk['average_stars'] >= 1) & (chunk['average_stars'] <= 5)]

    # 6. De-duplicate across chunks
    mask_new = ~chunk['user_id'].isin(seen_ids)
    chunk = chunk[mask_new]
    seen_ids.update(chunk['user_id'].tolist())

    if chunk.empty:
        continue

    # 7. Drop and reorder columns
    chunk.drop(columns=['yelping_since'], inplace=True)

    # 8. Write to Parquet (append if writer exists)
    table = pa.Table.from_pandas(chunk, preserve_index=False)
    if writer is None:
        writer = pq.ParquetWriter(OUTPUT_FILE, table.schema)
    writer.write_table(table)

# Finalize
if writer:
    writer.close()
print(f"Cleaning complete! Saved to {OUTPUT_FILE} with {len(seen_ids)} records.")


Cleaning complete! Saved to clean_users.parquet with 0 records.
