<a href="https://colab.research.google.com/github/Ddshsp/lab/blob/main/%D0%A8%D0%94%D0%94_%D0%9B%D0%A02_2024.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np

In [None]:
def func(n: int) -> pd.DataFrame:
    data = {
        "gender": np.random.choice(["Male", "Female"], n),
        "age": np.random.randint(18, 65, n),
        "income": np.random.randint(20000, 150000, n),
        "profession": np.random.choice(["Engineer", "Doctor", "Teacher", "Lawyer"], n),
    }
    return pd.DataFrame(data)

In [None]:
df = func(10_000_000)

In [None]:
from time import perf_counter, sleep
from contextlib import contextmanager
from typing import Callable


@contextmanager
def catchtime() -> Callable[[],float]:
    start = perf_counter()
    yield lambda x: perf_counter() - start
    print(f'Time: {perf_counter()-start:.3f} seconds')

In [None]:
with catchtime() as t:
    df.to_csv('df.csv',index=None)

Time: 19.315 seconds


In [None]:
with catchtime() as t:
    pd.read_csv("df.csv")

Time: 4.958 seconds


In [None]:
with catchtime() as t:
    df.to_json('df.json',index=None)

Time: 14.465 seconds


In [None]:
with catchtime() as t:
    pd.read_json("df.json")

Time: 131.619 seconds


In [None]:
!pip install fastavro



In [None]:
from fastavro import writer, parse_schema, reader

In [None]:
schema = {
    'doc': 'df',
    'name': 'df',
    'namespace': 'df',
    'type': 'record',
    'fields': [
        {'name': 'age', 'type': 'int'},
        {'name': 'gender', 'type': 'string'},
        {'name': 'income', 'type': 'float'},
        {'name': 'profession', 'type': 'string'},
    ]
}

In [None]:
list_of_dict = df.to_dict(orient='records')

In [None]:
with catchtime() as t:
    with open("df.avro",'wb') as out:
        writer(out,parse_schema(schema),list_of_dict)
del list_of_dict

Time: 58.142 seconds


In [None]:
with catchtime() as t:
    avro_records = []
    with open('df.avro', 'rb') as fo:
        avro_reader = reader(fo)
        for record in avro_reader:
            avro_records.append(record)
del avro_records

Time: 44.830 seconds


In [None]:
with catchtime() as t:
    df.to_orc('df.orc',index='records')

Time: 11.073 seconds


In [None]:
with catchtime() as t:
    pd.read_orc("df.orc")

Time: 3.367 seconds


In [None]:
with catchtime() as t:
    df.to_parquet('df.parquet')

Time: 7.981 seconds


In [None]:
with catchtime() as t:
    pd.read_parquet('df.parquet')

Time: 2.514 seconds


In [None]:
!pip install pyarrow



In [None]:
import pyarrow.feather as feather

In [None]:
with catchtime() as t:
    feather.write_feather(df, 'df.feather')

Time: 4.725 seconds


In [None]:
with catchtime() as t:
    feather.read_feather('df.feather')

Time: 4.728 seconds


In [None]:
!du -hS df*

84M	df100_data/Teacher/Female
84M	df100_data/Teacher/Male
4.0K	df100_data/Teacher
84M	df100_data/Lawyer/Female
84M	df100_data/Lawyer/Male
4.0K	df100_data/Lawyer
84M	df100_data/Doctor/Female
84M	df100_data/Doctor/Male
4.0K	df100_data/Doctor
84M	df100_data/Engineer/Female
84M	df100_data/Engineer/Male
4.0K	df100_data/Engineer
4.0K	df100_data
180M	df.avro
221M	df.csv
179M	df.feather
637M	df.json
164M	df.orc
37M	df.parquet


In [None]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000000 entries, 0 to 9999999
Data columns (total 4 columns):
 #   Column      Dtype 
---  ------      ----- 
 0   gender      object
 1   age         int64 
 2   income      int64 
 3   profession  object
dtypes: int64(2), object(2)
memory usage: 305.2+ MB


In [None]:
import pyarrow.parquet as pq
import pyarrow as pa
writer = pq.ParquetWriter('df100.parquet',pa.Table.from_pandas(df).schema)
del df

In [None]:
import numpy as np
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
import concurrent
import threading

locker = threading.Lock()
WORKER = 4

def func_t(n:int):
  def wrap() -> None:
    table = pa.Table.from_pandas(func(n)).to_batches()[0]
    with locker:
      writer.write_batch(table)
  return wrap


num_rows = 100_000_000
batch_size = 1_000_000
with ThreadPoolExecutor(max_workers=WORKER) as executor:
    futures = {executor.submit(func_t(batch_size)): i for i in tqdm(range(int(num_rows/batch_size)))}
    for future in tqdm(concurrent.futures.as_completed(futures)):
        image_data = future.result()

writer.close()

100%|██████████| 100/100 [00:00<00:00, 16855.43it/s]
100it [02:19,  1.39s/it]


In [None]:
%reset -f

In [None]:
!du -hS df*

369M	df100.parquet
180M	df.avro
221M	df.csv
179M	df.feather
637M	df.json
164M	df.orc
37M	df.parquet


In [None]:
import pyarrow.dataset as ds

ds_df = ds.dataset('df100.parquet')
ds.write_dataset(ds_df, 'df100_data', format='parquet', partitioning=['profession','gender'])


In [None]:
%reset -f

In [None]:
!ls -l df100_data/*/*/*.parquet| awk '{print $NF}'

df100_data/Doctor/Female/part-0.parquet
df100_data/Doctor/Male/part-0.parquet
df100_data/Engineer/Female/part-0.parquet
df100_data/Engineer/Male/part-0.parquet
df100_data/Lawyer/Female/part-0.parquet
df100_data/Lawyer/Male/part-0.parquet
df100_data/Teacher/Female/part-0.parquet
df100_data/Teacher/Male/part-0.parquet


In [None]:
import polars as pl
from glob import glob
from concurrent.futures import ThreadPoolExecutor

In [None]:
files = sorted(glob('df100_data/*/*/*.parquet'))

In [None]:
def process(path: str) -> list:
  df = pl.read_parquet(path,low_memory=True)
  co2 = df.select(pl.count("age")).item()
  su2 = df.select(pl.sum("age")).item()
  ma2 = df.select(pl.max("income")).item()
  prof, gen = path.split("/")[1:3]
  return prof,gen,co2,su2,ma2

In [None]:
schema={
    "profession":pl.String,
    "gender": pl.String,
    "count":pl.Int32,
    "sum":pl.Int64,
    "max":pl.Int32,
    }
df = pl.DataFrame(
    schema=schema,
    )
df

profession,gender,count,sum,max
str,str,i32,i64,i32


In [None]:
df_list = []
with ThreadPoolExecutor(max_workers=4) as executor:
  for i in executor.map(process,files):
    df_new = pl.DataFrame(
        {k:v for k,v in zip(schema.keys(),i)},
        schema=schema,
        )
    df_list.append(df_new)

In [None]:
df = pl.concat(df_list)

In [None]:
df

profession,gender,count,sum,max
str,str,i32,i64,i32
"""Doctor""","""Female""",12500328,512464400,149999
"""Doctor""","""Male""",12510875,512937616,149999
"""Engineer""","""Female""",12504190,512677423,149999
"""Engineer""","""Male""",12499456,512482206,149999
"""Lawyer""","""Female""",12492766,512072931,149999
"""Lawyer""","""Male""",12501130,512573174,149999
"""Teacher""","""Female""",12499563,512576202,149999
"""Teacher""","""Male""",12491692,512206076,149999


In [None]:
df.select(pl.sum("sum")).item()/df.select(pl.sum("count")).item()

40.99990028

In [None]:
df.select(pl.max("max")).item()

149999

In [None]:
df.group_by("gender").sum().select(["gender","count"])

gender,count
str,i32
"""Female""",49996847
"""Male""",50003153


In [None]:
import plotly.express as px
fig = px.pie(df,values="count",names="gender")
fig.show()

In [None]:
df.group_by("profession").sum().select(["profession","count"])

profession,count
str,i32
"""Doctor""",25011203
"""Lawyer""",24993896
"""Teacher""",24991255
"""Engineer""",25003646


In [None]:
fig = px.pie(df,values="count",names="profession")
fig.show()