In [1]:
import pandas as pd
url = "https://raw.githubusercontent.com/mwaskom/seaborn-data/master/tips.csv"
df = pd.read_csv(url)
print(df.shape)
print(df.dtypes)
df.head()


(244, 7)
total_bill    float64
tip           float64
sex            object
smoker         object
day            object
time           object
size            int64
dtype: object


Unnamed: 0,total_bill,tip,sex,smoker,day,time,size
0,16.99,1.01,Female,No,Sun,Dinner,2
1,10.34,1.66,Male,No,Sun,Dinner,3
2,21.01,3.5,Male,No,Sun,Dinner,3
3,23.68,3.31,Male,No,Sun,Dinner,2
4,24.59,3.61,Female,No,Sun,Dinner,4


In [2]:
clean = df.copy()
clean = clean.dropna()
for col in ["total_bill", "tip", "size"]:
    clean = clean[clean[col] >= 0]
clean["tip_ratio"] = (clean["tip"] / clean["total_bill"]).clip(0, 1)
clean["day_std"] = clean["day"].str.title().str.strip()
clean = clean.reset_index(drop=True)
print(clean.shape)
clean.head()


(244, 9)


Unnamed: 0,total_bill,tip,sex,smoker,day,time,size,tip_ratio,day_std
0,16.99,1.01,Female,No,Sun,Dinner,2,0.059447,Sun
1,10.34,1.66,Male,No,Sun,Dinner,3,0.160542,Sun
2,21.01,3.5,Male,No,Sun,Dinner,3,0.166587,Sun
3,23.68,3.31,Male,No,Sun,Dinner,2,0.13978,Sun
4,24.59,3.61,Female,No,Sun,Dinner,4,0.146808,Sun


In [3]:
!pip -q install pyarrow duckdb
from pathlib import Path

base_path = Path("/content/lakehouse_parquet")
data_path = base_path / "tips_partitioned"
data_path.mkdir(parents=True, exist_ok=True)

for sex_value, chunk in clean.groupby("sex"):
    out_file = data_path / f"sex={sex_value}/part-000.parquet"
    out_file.parent.mkdir(parents=True, exist_ok=True)
    chunk.to_parquet(out_file, index=False)

sorted(str(p) for p in data_path.rglob("*.parquet"))[:5]


['/content/lakehouse_parquet/tips_partitioned/sex=Female/part-000.parquet',
 '/content/lakehouse_parquet/tips_partitioned/sex=Male/part-000.parquet']

In [4]:
import duckdb

con = duckdb.connect(database=":memory:")
parquet_glob = str(data_path / "**/*.parquet")

con.execute(f"CREATE OR REPLACE VIEW tips_v AS SELECT * FROM read_parquet('{parquet_glob}');")

res_summary = con.execute("""
    SELECT
        sex,
        ROUND(AVG(total_bill), 2) AS avg_total_bill,
        ROUND(AVG(tip), 2)        AS avg_tip,
        ROUND(AVG(tip_ratio), 3)  AS avg_tip_ratio,
        COUNT(*)                  AS n
    FROM tips_v
    GROUP BY sex
    ORDER BY avg_total_bill DESC
""").fetchdf()

res_summary


Unnamed: 0,sex,avg_total_bill,avg_tip,avg_tip_ratio,n
0,Male,20.74,3.09,0.158,157
1,Female,18.06,2.83,0.166,87


In [5]:
import json, hashlib, time
from pathlib import Path

lineage = {
    "source_url": url,
    "partitions": sorted({p.parent.name for p in data_path.rglob("*.parquet")}),
    "rows": int(clean.shape[0]),
    "columns": list(clean.columns),
    "snapshot_ts": int(time.time()),
    "hash": hashlib.md5(clean.to_json().encode()).hexdigest()
}

meta_path = base_path / "metadata"
meta_path.mkdir(parents=True, exist_ok=True)
snap_file = meta_path / f"snapshot_{lineage['snapshot_ts']}.json"

with open(snap_file, "w") as f:
    json.dump(lineage, f, indent=2)

print(str(snap_file))
lineage


/content/lakehouse_parquet/metadata/snapshot_1761941171.json


{'source_url': 'https://raw.githubusercontent.com/mwaskom/seaborn-data/master/tips.csv',
 'partitions': ['sex=Female', 'sex=Male'],
 'rows': 244,
 'columns': ['total_bill',
  'tip',
  'sex',
  'smoker',
  'day',
  'time',
  'size',
  'tip_ratio',
  'day_std'],
 'snapshot_ts': 1761941171,
 'hash': '9ba11f11b7ea939f8cb97205c404016f'}

In [6]:
new_batch = clean.sample(20, replace=True, random_state=42).copy()
new_batch["tip"] = (new_batch["tip"] * 1.05).round(2)
new_batch["total_bill"] = (new_batch["total_bill"] * 1.02).round(2)

for sex_value, chunk in new_batch.groupby("sex"):
    existing_parts = list((data_path / f"sex={sex_value}").glob("part-*.parquet"))
    next_idx = len(existing_parts)
    out_file = data_path / f"sex={sex_value}/part-{next_idx:03d}.parquet"
    chunk.to_parquet(out_file, index=False)

parquet_glob = str(data_path / "**/*.parquet")
con.execute("DROP VIEW IF EXISTS tips_v;")
con.execute(f"CREATE VIEW tips_v AS SELECT * FROM read_parquet('{parquet_glob}');")

res_after = con.execute("""
    SELECT
        sex,
        ROUND(AVG(total_bill), 2) AS avg_total_bill,
        ROUND(AVG(tip), 2)        AS avg_tip,
        ROUND(AVG(tip_ratio), 3)  AS avg_tip_ratio,
        COUNT(*)                  AS n
    FROM tips_v
    GROUP BY sex
    ORDER BY avg_total_bill DESC
""").fetchdf()

res_after


Unnamed: 0,sex,avg_total_bill,avg_tip,avg_tip_ratio,n
0,Male,20.82,3.1,0.157,166
1,Female,18.49,2.85,0.165,98
