In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/b-tree-dataset/dataset_1000000000.csv


In [2]:
# --- Imports and utilities ---
import os, time, gzip, shutil
import pandas as pd
import psutil
import multiprocessing
from pathlib import Path

# Optional libs (install if needed)
# pip install dask[complete] polars pyarrow duckdb memory-profiler

import dask.dataframe as dd
import polars as pl
import pyarrow as pa
import pyarrow.parquet as pq
import duckdb

# File path (the one you gave)
CSV_PATH = "/kaggle/input/b-tree-dataset/dataset_1000000000.csv"
OUT_DIR = "./output_bigdata"
os.makedirs(OUT_DIR, exist_ok=True)

def file_size_mb(path): 
    return os.path.getsize(path)/(1024*1024)

print("File exists:", os.path.exists(CSV_PATH))
print("CSV size (MB):", round(file_size_mb(CSV_PATH),2))
print("CPU cores:", multiprocessing.cpu_count())
proc = psutil.Process(os.getpid())


File exists: True
CSV size (MB): 13722.46
CPU cores: 4


=14 G

In [3]:
# --- 1) Smart dtype inference on a small sample ---
SAMPLE_ROWS = 200000  # adjust: small sample to infer types
sample = pd.read_csv(CSV_PATH, nrows=SAMPLE_ROWS)
print("Sample shape:", sample.shape)

# Infer dtypes with pandas -> map to efficient dtypes
def optimize_dtypes_from_df(df):
    dtype_map = {}
    for col in df.columns:
        ser = df[col]
        if pd.api.types.is_integer_dtype(ser) or pd.api.types.is_integer_dtype(ser.dropna()):
            # choose smallest integer that fits
            dtype_map[col] = 'Int64'
        elif pd.api.types.is_float_dtype(ser):
            dtype_map[col] = 'float32'
        elif pd.api.types.is_bool_dtype(ser):
            dtype_map[col] = 'bool'
        else:
            # for object columns, check unique ratio and length
            if ser.nunique() / len(ser) < 0.5 and ser.nunique() < 100000:
                dtype_map[col] = 'category'
            else:
                dtype_map[col] = 'string'  # pandas nullable string
    return dtype_map

dtype_map = optimize_dtypes_from_df(sample)
print("Inferred dtype map (preview):")
for k,v in list(dtype_map.items())[:10]:
    print(k, "->", v)


Sample shape: (200000, 2)
Inferred dtype map (preview):
numeric_column -> Int64
alphabet_column -> string


In [4]:
# Convert dtype_map to pandas-friendly dtypes for read_csv
pandas_dtypes = {}
for col, kind in dtype_map.items():
    if kind == 'Int64':
        pandas_dtypes[col] = 'Int64'  # pandas nullable int
    elif kind == 'string':
        pandas_dtypes[col] = 'string'


In [5]:
def method_pandas_chunks(path, dtypes, chunksize=200_000, max_rows=None):
    proc = psutil.Process(os.getpid())
    start_mem = proc.memory_info().rss/(1024*1024)
    start = time.time()
    total = 0
    peak_mem = start_mem
    for chunk in pd.read_csv(path, dtype=dtypes, chunksize=chunksize, low_memory=False, on_bad_lines='skip'):
        total += len(chunk)
        mem = proc.memory_info().rss/(1024*1024)
        if mem > peak_mem: peak_mem = mem
        if max_rows and total >= max_rows: break
    elapsed = time.time() - start
    return {"method":"pandas_chunks", "time_s":elapsed, "peak_ram_mb":peak_mem, "rows":total}

res_a = method_pandas_chunks(CSV_PATH, pandas_dtypes, chunksize=200_000)
print(res_a)

{'method': 'pandas_chunks', 'time_s': 1841.608158826828, 'peak_ram_mb': 353.90625, 'rows': 1000000000}


In [6]:
# -------------------------
# Method B: Dask
# -------------------------
def method_dask(path, dtypes=None, blocksize="64MB"):
    proc = psutil.Process(os.getpid())
    start = time.time()
    # Dask can accept dtype mapping; if some dtypes incompatible, handle exceptions
    ddf = dd.read_csv(path, dtype=dtypes, blocksize=blocksize, assume_missing=True, on_bad_lines='skip')
    # Trigger computation (len)
    total = ddf.shape[0].compute()
    elapsed = time.time() - start
    mem = proc.memory_info().rss/(1024*1024)
    return {"method":"dask", "time_s":elapsed, "peak_ram_mb":mem, "rows":int(total)}

res_b = method_dask(CSV_PATH, dtypes=None, blocksize="64MB")
print(res_b)

{'method': 'dask', 'time_s': 712.9201920032501, 'peak_ram_mb': 1082.26953125, 'rows': 1000000000}


In [None]:
# -------------------------
# Method c: compression, compress file                                                                                                                                    
import gzip, shutil, os, time, psutil, pandas as pd

# مسار الملف الأصلي والمضغوط
CSV_PATH = "/kaggle/input/b-tree-dataset/dataset_1000000000.csv"
GZ_PATH = "/kaggle/working/dataset_1000000000.csv.gz"   # مجلد قابل للكتابة

# دالة لحساب الحجم بالميغابايت
def file_size_mb(path):
    return os.path.getsize(path) / 1024**2

# دالة لحساب الذاكرة الحالية
def memory_usage_mb():
    return psutil.Process(os.getpid()).memory_info().rss / 1024**2

# ---------------------------
#  ضغط الملف 
# ---------------------------
start = time.time()
with open(CSV_PATH, 'rb') as f_in, gzip.open(GZ_PATH, 'wb') as f_out:
    shutil.copyfileobj(f_in, f_out)
elapsed = time.time() - start

print(f" Compression completed in {elapsed/60:.2f} min")
print(f"Compressed file size: {file_size_mb(GZ_PATH):.1f} MB")


 Compression completed in 41.08 min
Compressed file size: 8592.6 MB


In [1]:
import gzip, shutil, os, time, psutil, pandas as pd

# مسار الملف الأصلي والمضغوط
CSV_PATH = "/kaggle/input/b-tree-dataset/dataset_1000000000.csv"
GZ_PATH = "/kaggle/working/dataset_1000000000.csv.gz"   # مجلد قابل للكتابة
# دالة لحساب الحجم بالميغابايت
def file_size_mb(path):
    return os.path.getsize(path) / 1024**2

# دالة لحساب الذاكرة الحالية
def memory_usage_mb():
    return psutil.Process(os.getpid()).memory_info().rss / 1024**2
start = time.time()
df = pd.read_csv(GZ_PATH, compression='gzip', nrows=1_000_000)
elapsed = time.time() - start
mem = memory_usage_mb()

print(f"Pandas (gzip): {elapsed:.2f}s | RAM={mem:.1f}MB | Sample=1M rows")



Pandas (gzip): 1.11s | RAM=257.3MB | Sample=1M rows


In [2]:
import pandas as pd

results = [
    {'method': 'pandas_gzip', 'time_s': 1.11, 'peak_ram_mb': 257.3, 'rows': 1_000_000},
    {'method': 'pandas_chunks', 'time_s': 1841.61, 'peak_ram_mb': 353.9, 'rows': 1_000_000_000},
    {'method': 'dask', 'time_s': 712.92, 'peak_ram_mb': 1082.27, 'rows': 1_000_000_000},
    {'method': 'gzip_compression', 'time_s': 41.08*60, 'peak_ram_mb': None, 'rows': 1_000_000_000},
]

df_results = pd.DataFrame(results)
df_results['time_min'] = df_results['time_s'] / 60
df_results


  has_large_values = (abs_vals > 1e6).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()


Unnamed: 0,method,time_s,peak_ram_mb,rows,time_min
0,pandas_gzip,1.11,257.3,1000000,0.0185
1,pandas_chunks,1841.61,353.9,1000000000,30.6935
2,dask,712.92,1082.27,1000000000,11.882
3,gzip_compression,2464.8,,1000000000,41.08


عند مقارنة تقنيات التحميل والضغط    
تبيّن أن مكتبة   Dask  هي الأسرع في معالجة ملفات ضخمة (1 مليار صف)،
 بينما Pandas مع chunking أكثر استقرارًا وأقل استهلاكًا للذاكرة.
أما الضغط باستخدام gzip فقلل حجم الملف إلى 8.6GB لكنه استغرق أكثر من 40 دقيقة.