In [37]:
# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.
import kagglehub
import pyodbc
import pandas as pd
import json

yelp_dataset = kagglehub.dataset_download('yelp-dataset/yelp-dataset')

#load business data
business_data = pd.read_json(f"{yelp_dataset}/yelp_academic_dataset_business.json", lines=True)

#load reviews data in chunks
reviews_chunks = pd.read_json(f"{yelp_dataset}/yelp_academic_dataset_review.json", lines=True, chunksize=10000) 

#store chunks in list of chunks
reviews_df_list = []
for chunk in reviews_chunks:
    reviews_df_list.append(chunk)

#combine to one df
reviews_df = pd.concat(reviews_df_list, ignore_index=True)

#load checkin data
checkin_chunks = pd.read_json(f"{yelp_dataset}/yelp_academic_dataset_checkin.json", lines=True, chunksize=10000)
checkin_df_list = []

for chunk in checkin_chunks:
    checkin_df_list.append(chunk)
#combine to one df
checkin_df = pd.concat(checkin_df_list, ignore_index=True)

print('Data source import complete.')


Data source import complete.


### Flattening Business into 3 tables

In [None]:
def flatten_attributes(row):
    attributes = row.get('attributes', {})
    if attributes:
        attributes = json.loads(attributes) if isinstance(attributes, str) else attributes
        flattened = []
        for key, value in attributes.items():
            if key == 'RestaurantsPriceRange2':
                key = 'PriceRange'
            elif key == 'RestaurantsDelivery':
                key = 'Delivery'
            elif key == 'RestaurantsTakeOut':
                key = 'Takeout'
            flattened.append({'business_id': row['business_id'], 'attribute_key': key, 'attribute_value': value})
        return flattened
    return []

def flatten_hours(row):
    hours = row.get('hours', {})
    if hours:
        hours = json.loads(hours) if isinstance(hours, str) else hours
        flattened = []
        for day, time_range in hours.items():
            open_time, close_time = time_range.split('-')
            flattened.append({'business_id': row['business_id'], 'day': day, 'open_time': open_time, 'close_time': close_time})
        return flattened
    return []

# Flatten attributes and hours
attributes_data = []
hours_data = []

for _, row in business_data.iterrows():
    attributes_data.extend(flatten_attributes(row))
    hours_data.extend(flatten_hours(row))

# Convert to DataFrames
attributes_df = pd.DataFrame(attributes_data)
hours_df = pd.DataFrame(hours_data)

# Drop the original nested fields from the business table
business_flattened = business_data.drop(columns=['attributes', 'hours'])



### Create Tables Queries


In [39]:
create_business_table = """
IF OBJECT_ID('dbo.business','U') IS NULL
CREATE TABLE business (
    business_id VARCHAR(22) PRIMARY KEY,
    name VARCHAR(255),
    address VARCHAR(255),
    city VARCHAR(100),
    state VARCHAR(10),
    postal_code VARCHAR(20),
    latitude FLOAT,
    longitude FLOAT,
    stars FLOAT,
    review_count INT,
    is_open BIT,
    categories TEXT
);
"""

create_attributes_table = """
IF OBJECT_ID('dbo.attributes','U') IS NULL
CREATE TABLE attributes (
    attribute_id INT IDENTITY(1,1) PRIMARY KEY,
    business_id VARCHAR(22),
    attribute_key VARCHAR(100),
    attribute_value VARCHAR(MAX),
    FOREIGN KEY (business_id) REFERENCES business(business_id)
);
"""

create_hours_table = """
IF OBJECT_ID('dbo.hours','U') IS NULL
CREATE TABLE hours (
    hour_id INT IDENTITY(1,1) PRIMARY KEY,
    business_id VARCHAR(22),
    day VARCHAR(20),
    open_time VARCHAR(20),
    close_time VARCHAR(20),
    FOREIGN KEY (business_id) REFERENCES business(business_id)
);
"""
create_review_table = """
IF OBJECT_ID('dbo.review','U') IS NULL
CREATE TABLE review (
    review_id VARCHAR(22) PRIMARY KEY,
    business_id VARCHAR(22),
    user_id VARCHAR(22),
    stars INT,
    date DATETIME,
    text TEXT,
    useful INT,
    funny INT,
    cool INT,
    FOREIGN KEY (business_id) REFERENCES business(business_id)
);
"""

create_checkin_table = """
IF OBJECT_ID('dbo.checkin','U') IS NULL
CREATE TABLE checkin (
    business_id VARCHAR(22) PRIMARY KEY,
    date TEXT,
    FOREIGN KEY (business_id) REFERENCES business(business_id)
);
"""

### Database Information and connection

In [41]:
#connects to sql server
server = 'group15-server.database.windows.net'  
database = 'group15_Yelp_Database'  
username = 'group15'  
password = 'Badam15123'  
driver = '{ODBC Driver 18 for SQL Server}' 
conn = pyodbc.connect(
    f'DRIVER={driver};SERVER={server};DATABASE={database};UID={username};PWD={password};'
    'Encrypt=yes;TrustServerCertificate=no;Connection Timeout=60;'
)
cursor = conn.cursor()
print("Connected.")

Connected.


### Creates tables if not done already

In [42]:
#creates tables using queries above
cur = conn.cursor()
for sql in [
    create_business_table,
    create_attributes_table,
    create_hours_table,
    create_review_table,
    create_checkin_table,
]:
    cur.execute(sql)
conn.commit()
print("Tables created (or already existed).")

Tables created (or already existed).


### Widening columns

In [43]:
#makes the table attributes large enough to contain the large text required
cur = conn.cursor()
ddl = [
    "ALTER TABLE business   ALTER COLUMN categories NVARCHAR(MAX) NULL;",
    "ALTER TABLE review     ALTER COLUMN text       NVARCHAR(MAX) NULL;",
    "ALTER TABLE checkin    ALTER COLUMN date       NVARCHAR(MAX) NULL;",
    "ALTER TABLE attributes ALTER COLUMN attribute_value NVARCHAR(MAX) NULL;",
]
for s in ddl:
    try:
        cur.execute(s)
        conn.commit()
    except Exception as e:
        print(s, "->", e)
print("DDL applied.")

DDL applied.


### Upload Data in Chunks

In [None]:
# Cell: Philadelphia, PA filtered DataFrames (robust)

# Choose the business source (flattened if available)
business_src = business_flattened if 'business_flattened' in globals() else business_data

bd = business_src.copy()
bd["city_norm"] = bd["city"].astype(str).str.strip()
bd["state_norm"] = bd["state"].astype(str).str.strip().str.upper()

# Strict Philadelphia, PA filter
philadelphia_business_df = bd[
    (bd["city_norm"].str.lower() == "philadelphia") &
    (bd["state_norm"] == "PA")
].drop(columns=["city_norm","state_norm"], errors="ignore")

# If nothing found, print quick diagnostics
if len(philadelphia_business_df) == 0:
    print("No Philadelphia, PA businesses found. Diagnostics:")
    print("Unique states (sample):", bd["state_norm"].dropna().unique()[:10])
    print("Top cities in PA:")
    print(bd[bd["state_norm"] == "PA"]["city_norm"].value_counts().head(20))

philadelphia_bids = set(philadelphia_business_df["business_id"])

# Related tables filtered by business_id
philadelphia_attributes_df = attributes_df[attributes_df["business_id"].isin(philadelphia_bids)].copy()
philadelphia_hours_df = hours_df[hours_df["business_id"].isin(philadelphia_bids)].copy()
philadelphia_checkin_df = checkin_df[checkin_df["business_id"].isin(philadelphia_bids)].copy()

# Filter reviews by Philadelphia business_ids
philadelphia_reviews_df = reviews_df[reviews_df["business_id"].isin(philadelphia_bids)].copy()

# De-duplicate on natural keys
if len(philadelphia_attributes_df):
    philadelphia_attributes_df = (
        philadelphia_attributes_df.sort_values(["business_id","attribute_key"])
        .drop_duplicates(["business_id","attribute_key"], keep="last")
    )
if len(philadelphia_hours_df):
    philadelphia_hours_df = (
        philadelphia_hours_df.sort_values(["business_id","day"])
        .drop_duplicates(["business_id","day"], keep="last")
    )

print(f"Philadelphia, PA businesses: {len(philadelphia_business_df):,}")
print(f"Philadelphia, PA attributes: {len(philadelphia_attributes_df):,}")
print(f"Philadelphia, PA hours: {len(philadelphia_hours_df):,}")
print(f"Philadelphia, PA checkins: {len(philadelphia_checkin_df):,}")
print(f"Philadelphia, PA reviews: {len(philadelphia_reviews_df):,}")


Philadelphia, PA businesses: 14,575
Philadelphia, PA attributes: 127,463
Philadelphia, PA hours: 74,168
Philadelphia, PA checkins: 12,899
Philadelphia, PA reviews: 967,871


In [None]:
# Configurable batch size for Azure SQL (recommend 500â€“1000 on free tier)
BATCH_SIZE = 1000

def truncate_review_table(conn, retries=3):
    """
    Empties the review table before uploading with retry on connection failure.
    """
    attempt = 0
    while attempt < retries:
        try:
            cur = conn.cursor()
            cur.execute("TRUNCATE TABLE review;")
            conn.commit()
            print("Truncated table: review")
            return
        except Exception as e:
            print(f"Attempt {attempt+1} to truncate review failed with error: {e}")
            conn.close()
            # Recreate connection (assuming a function get_connection() exists)
            conn = get_connection()
            attempt += 1
    raise RuntimeError("Failed to truncate review table after multiple retries.")


def upload_with_query(conn, df, insert_sql, param_order, batch_size=1000, table_name=""):
    """
    Generic batch uploader with progress logs.
    Converts NaN to None and uses fast_executemany for speed.
    """
    total = len(df)
    if total == 0:
        print(f"[{table_name}] Nothing to upload.")
        return
    cur = conn.cursor()
    cur.fast_executemany = True
    for start in range(0, total, batch_size):
        end = min(start + batch_size, total)
        part = df.iloc[start:end]
        part = part[param_order].astype(object)
        part = part.where(pd.notna(part), None)  # NaN -> None
        values = part.values.tolist()
        cur.executemany(insert_sql, values)
        conn.commit()
        print(f"[{table_name}] Uploaded {end}/{total} rows; remaining {total - end}.")

# MERGE/UPSERT for review on primary key review_id to avoid duplicates on retries
merge_review_sql = """
MERGE review AS tgt
USING (VALUES (?,?,?,?,?,?,?,?,?)) AS src (review_id,business_id,user_id,stars,[date],[text],useful,funny,cool)
ON tgt.review_id = src.review_id
WHEN MATCHED THEN
  UPDATE SET
    tgt.business_id = src.business_id,
    tgt.user_id     = src.user_id,
    tgt.stars       = src.stars,
    tgt.[date]      = src.[date],
    tgt.[text]      = src.[text],
    tgt.useful      = src.useful,
    tgt.funny       = src.funny,
    tgt.cool        = src.cool
WHEN NOT MATCHED THEN
  INSERT (review_id,business_id,user_id,stars,[date],[text],useful,funny,cool)
  VALUES (src.review_id,src.business_id,src.user_id,src.stars,src.[date],src.[text],src.useful,src.funny,src.cool);
"""

def upload_reviews_upsert(conn, df, batch_size=1000):
    """
    Batch UPSERT for reviews using MERGE to prevent duplicates.
    """
    total = len(df)
    if total == 0:
        print("[review] Nothing to upload.")
        return
    cur = conn.cursor()
    cur.fast_executemany = True
    for start in range(0, total, batch_size):
        end = min(start + batch_size, total)
        part = df.iloc[start:end]
        part = part[["review_id","business_id","user_id","stars","date","text","useful","funny","cool"]].astype(object)
        part = part.where(pd.notna(part), None)
        values = part.values.tolist()
        cur.executemany(merge_review_sql, values)  # executes per-row merge efficiently
        conn.commit()
        print(f"[review] Upserted {end}/{total} rows; remaining {total - end}.")

def upload_business_upsert(conn, df, batch_size=BATCH_SIZE):
    merge_sql = """
    MERGE business AS tgt
    USING (VALUES (?,?,?,?,?,?,?,?,?,?,?,?)) AS src
      (business_id,name,address,city,state,postal_code,latitude,longitude,stars,review_count,is_open,categories)
    ON tgt.business_id = src.business_id
    WHEN MATCHED THEN UPDATE SET
      name = src.name,
      address = src.address,
      city = src.city,
      state = src.state,
      postal_code = src.postal_code,
      latitude = src.latitude,
      longitude = src.longitude,
      stars = src.stars,
      review_count = src.review_count,
      is_open = src.is_open,
      categories = src.categories
    WHEN NOT MATCHED THEN
      INSERT (business_id,name,address,city,state,postal_code,latitude,longitude,stars,review_count,is_open,categories)
      VALUES (src.business_id,src.name,src.address,src.city,src.state,src.postal_code,src.latitude,src.longitude,src.stars,src.review_count,src.is_open,src.categories);
    """
    cols = ["business_id","name","address","city","state","postal_code","latitude","longitude","stars","review_count","is_open","categories"]
    total = len(df)
    if total == 0:
        print("[business] Nothing to upsert.")
        return
    cur = conn.cursor()
    cur.fast_executemany = True
    for start in range(0, total, batch_size):
        end = min(start + batch_size, total)
        part_df = df.iloc[start:end][cols]
        part = part_df.astype(object).where(pd.notna(part_df), None)
        cur.executemany(merge_sql, part.values.tolist())
        conn.commit()
        print(f"[business] Upserted {end}/{total}; remaining {total - end}.")

def upload_checkin_upsert(conn, df, batch_size=BATCH_SIZE):
    merge_sql = """
    MERGE checkin AS tgt
    USING (VALUES (?,?)) AS src (business_id,[date])
    ON tgt.business_id = src.business_id
    WHEN MATCHED THEN UPDATE SET [date] = src.[date]
    WHEN NOT MATCHED THEN INSERT (business_id,[date]) VALUES (src.business_id,src.[date]);
    """
    cols = ["business_id","date"]
    total = len(df)
    if total == 0:
        print("[checkin] Nothing to upsert.")
        return
    cur = conn.cursor()
    cur.fast_executemany = True
    for start in range(0, total, batch_size):
        end = min(start + batch_size, total)
        part_df = df.iloc[start:end][cols]
        part = part_df.astype(object).where(pd.notna(part_df), None)
        cur.executemany(merge_sql, part.values.tolist())
        conn.commit()
        print(f"[checkin] Upserted {end}/{total}; remaining {total - end}.")

def delete_rows_for_business_ids(conn, table, business_ids, batch_size=BATCH_SIZE):
    assert table in {"attributes", "hours"}, "Only attributes/hours deletion is allowed."
    ids = list(set(business_ids))
    total = len(ids)
    if total == 0:
        print(f"[{table}] No rows to delete.")
        return
    cur = conn.cursor()
    for start in range(0, total, batch_size):
        end = min(start + batch_size, total)
        chunk = ids[start:end]
        cur.executemany(f"DELETE FROM {table} WHERE business_id = ?", [(x,) for x in chunk])
        conn.commit()
        print(f"[{table}] Deleted for {end}/{total} business_ids; remaining {total - end}.")

business_insert_sql = """
INSERT INTO business (
  business_id, name, address, city, state, postal_code,
  latitude, longitude, stars, review_count, is_open, categories
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
"""

attributes_insert_sql = "INSERT INTO attributes (business_id, attribute_key, attribute_value) VALUES (?,?,?)"
hours_insert_sql = "INSERT INTO hours (business_id, day, open_time, close_time) VALUES (?,?,?,?)"

# Bracket [date] and [text]
review_insert_sql = """
INSERT INTO review (
  review_id, business_id, user_id, stars, [date], [text], useful, funny, cool
) VALUES (?,?,?,?,?,?,?,?,?)
"""

# use business id as primary key
checkin_insert_sql = "INSERT INTO checkin (business_id, [date]) VALUES (?,?)"
checkin_params = ["business_id","date"]

# Param orders (column order for binding)
business_params = ["business_id","name","address","city","state","postal_code",
                   "latitude","longitude","stars","review_count","is_open","categories"]
attributes_params = ["business_id","attribute_key","attribute_value"]
hours_params = ["business_id","day","open_time","close_time"]
review_params = ["review_id","business_id","user_id","stars","date","text","useful","funny","cool"]
checkin_params = ["business_id","date"]


In [64]:

# Truncate review table before upload
truncate_review_table(conn)

# Upload (upsert) business and related filtered data
upload_business_upsert(conn, philadelphia_business_df, batch_size=BATCH_SIZE)
upload_checkin_upsert(conn, philadelphia_checkin_df, batch_size=BATCH_SIZE)
upload_with_query(conn, philadelphia_attributes_df, attributes_insert_sql, attributes_params, batch_size=BATCH_SIZE, table_name="attributes")
upload_with_query(conn, philadelphia_hours_df, hours_insert_sql, hours_params, batch_size=BATCH_SIZE, table_name="hours")
upload_reviews_upsert(conn, philadelphia_reviews_df, batch_size=BATCH_SIZE)

# Print sizes for verification
print(f"Philadelphia, PA businesses: {len(philadelphia_business_df):,}")
print(f"Philadelphia, PA attributes: {len(philadelphia_attributes_df):,}")
print(f"Philadelphia, PA hours: {len(philadelphia_hours_df):,}")
print(f"Philadelphia, PA checkins: {len(philadelphia_checkin_df):,}")
print(f"Philadelphia, PA reviews: {len(philadelphia_reviews_df):,}")


OperationalError: ('08S01', '[08S01] [Microsoft][ODBC Driver 18 for SQL Server]Communication link failure (0) (SQLExecDirectW)')