<a href="https://colab.research.google.com/github/Saha3902/Big-data/blob/main/tp_Big_data02.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:


import os
import pandas as pd
import numpy as np
import dask.dataframe as dd
from time import perf_counter
import glob


# 1. توليد ملف CSV كبير نسبيًا (لتجربة حقيقية)

csv_path = "large_sample.csv"
rows = 3_000_000  # عدد الأسطر (3 مليون = ~100MB تقريباً)
cols = 5

if not os.path.exists(csv_path):
    print("🔹 Generating sample CSV file...")
    np.random.seed(42)
    df = pd.DataFrame({
        'id': np.arange(rows),
        'fare_amount': np.random.rand(rows) * 100,
        'distance_km': np.random.rand(rows) * 50,
        'passengers': np.random.randint(1, 5, size=rows),
        'payment_type': np.random.choice(['cash', 'card'], size=rows)
    })
    df.to_csv(csv_path, index=False)
    print(f" File created: {csv_path} ({os.path.getsize(csv_path)/(1024**2):.1f} MB)")
else:
    print(f" File already exists: {csv_path}")


# 2. الطريقة الأولى: pandas + chunksize

print("\n=== Method 1: pandas.read_csv(chunksize) ===")

start = perf_counter()
total_sum = 0
count = 0
chunksize = 200_000

for chunk in pd.read_csv(csv_path, chunksize=chunksize, usecols=['fare_amount']):
    s = chunk['fare_amount'].dropna()
    total_sum += s.sum()
    count += s.count()

pandas_mean = total_sum / count
end = perf_counter()
pandas_time = end - start
print(f" pandas mean(fare_amount): {pandas_mean:.4f}")
print(f" Time: {pandas_time:.2f} seconds")


# 3. الطريقة الثانية: Dask DataFrame

print("\n=== Method 2: Dask DataFrame ===")

start = perf_counter()
ddf = dd.read_csv(csv_path, assume_missing=True, usecols=['fare_amount'])
dask_mean = ddf['fare_amount'].mean().compute()
end = perf_counter()
dask_time = end - start

print(f" Dask mean(fare_amount): {dask_mean:.4f}")
print(f" Time: {dask_time:.2f} seconds")

# ------------------------------------------------
# 4. الطريقة الثالثة: التحويل إلى Parquet (ضغط وقراءة أسرع)
# ------------------------------------------------
print("\n=== Method 3: Convert CSV -> Parquet ===")

out_dir = "parquet_dataset"
os.makedirs(out_dir, exist_ok=True)

start = perf_counter()
i = 0
for chunk in pd.read_csv(csv_path, chunksize=500_000):
    fn = os.path.join(out_dir, f"part_{i}.parquet")
    chunk.to_parquet(fn, engine='pyarrow', index=False)
    i += 1
end = perf_counter()
parquet_time = end - start

parquet_files = glob.glob(os.path.join(out_dir, "*.parquet"))
total_parquet_size = sum(os.path.getsize(f) for f in parquet_files) / (1024**2)

print(f" Parquet dataset created with {len(parquet_files)} files")
print(f"Total Parquet size: {total_parquet_size:.2f} MB")
print(f" Conversion time: {parquet_time:.2f} seconds")

# ------------------------------------------------
# 5. المقارنة النهائية
# ------------------------------------------------
print("\n=== 📊 Summary Comparison ===")
csv_size = os.path.getsize(csv_path) / (1024**2)
print(f"Original CSV size: {csv_size:.2f} MB")
print(f"Pandas chunks mean: {pandas_mean:.4f}, time: {pandas_time:.2f}s")
print(f"Dask mean: {dask_mean:.4f}, time: {dask_time:.2f}s")
print(f"Parquet total size: {total_parquet_size:.2f} MB, conversion time: {parquet_time:.2f}s")
print("\n Done.")


 File already exists: large_sample.csv

=== Method 1: pandas.read_csv(chunksize) ===
 pandas mean(fare_amount): 49.9978
 Time: 0.88 seconds

=== Method 2: Dask DataFrame ===
 Dask mean(fare_amount): 49.9978
 Time: 1.17 seconds

=== Method 3: Convert CSV -> Parquet ===
 Parquet dataset created with 6 files
Total Parquet size: 63.13 MB
 Conversion time: 1.71 seconds

=== 📊 Summary Comparison ===
Original CSV size: 146.58 MB
Pandas chunks mean: 49.9978, time: 0.88s
Dask mean: 49.9978, time: 1.17s
Parquet total size: 63.13 MB, conversion time: 1.71s

 Done.
