In [3]:
import pyspark
print(pyspark.__version__)

3.5.5


### 1. Loading dataset using pyspark

In [None]:
from pyspark.sql import SparkSession

# Initialize Spark
spark = SparkSession.builder.appName("BigDataProcessing").getOrCreate()

# Load CSV from Local Drive
dataset_path = "combined.csv"  # Change to your local path
df = spark.read.csv(dataset_path, header=True, inferSchema=True)

# Show first few rows
df.show(5)


### 2. Loading dataset using chunking method of pandas

In [None]:
import pandas as pd

chunk_size = 5000000  # 5 million rows per chunk
csv_file = "combined.csv"

# Process the file in chunks
df = pd.read_csv(csv_file, chunksize=chunk_size)

In [None]:
df

<pandas.io.parsers.readers.TextFileReader at 0x1cc20046350>

### 3. Loading dataset using polar

In [None]:
! pip install polars

Defaulting to user installation because normal site-packages is not writeable
Collecting polars
  Downloading polars-1.23.0-cp39-abi3-win_amd64.whl.metadata (15 kB)
Downloading polars-1.23.0-cp39-abi3-win_amd64.whl (34.5 MB)
   ---------------------------------------- 0.0/34.5 MB ? eta -:--:--
   ---------------------------------------- 0.0/34.5 MB ? eta -:--:--
   ---------------------------------------- 0.0/34.5 MB ? eta -:--:--
   ---------------------------------------- 0.0/34.5 MB ? eta -:--:--
   ---------------------------------------- 0.3/34.5 MB ? eta -:--:--
    --------------------------------------- 0.5/34.5 MB 985.7 kB/s eta 0:00:35
   - -------------------------------------- 1.3/34.5 MB 2.0 MB/s eta 0:00:17
   -- ------------------------------------- 1.8/34.5 MB 2.2 MB/s eta 0:00:15
   -- ------------------------------------- 1.8/34.5 MB 2.2 MB/s eta 0:00:15
   -- ------------------------------------- 1.8/34.5 MB 2.2 MB/s eta 0:00:15
   --- -------------------------------


[notice] A new release of pip is available: 24.3.1 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:
import polars as pl
import time

start_time = time.time()

df = pl.read_csv("C:/Users/anike/Downloads/1-BRC.csv")

end_time = time.time()
print(f"Polars Load Time: {end_time - start_time:.2f} seconds")

### 4. Loading dataset using dash

In [8]:
import dask.dataframe as dd
import time
import numpy as np

In [10]:
st = time.time()

df = dd.read_csv(r"C:/Users/anike/Downloads/1-BRC.csv")

et = time.time()

print(np.round((et-st),3), "Seconds")

0.274 Seconds


In [3]:
df.head()

Unnamed: 0,timestamp,soil_moisture,soil_water_content,carbon_percent,nitrogen_percent,atmospheric_humidity,temperature,pH
0,0,48.87,65.23,2.28,3.05,4.68,-4.89,4.45
1,1,19.15,61.87,2.97,1.68,40.73,-6.75,6.03
2,2,1.5,59.92,8.34,3.59,53.55,9.82,8.01
3,3,59.39,82.4,2.05,2.71,67.54,39.78,5.03
4,4,36.43,97.22,6.98,1.82,52.52,-4.01,5.83


### Understanding and preprocessing.

In [12]:
from itertools import (takewhile,repeat)

def rawincount(filename):
    f = open(filename, 'rb')
    bufgen = takewhile(lambda x: x, (f.raw.read(1024*1024) for _ in repeat(None)))
    return sum( buf.count(b'\n') for buf in bufgen )

In [None]:
filename = "C:/Users/anike/Downloads/1-BRC.csv"
df = dd.read_csv(filename)

st = time.time()
df_shape = (rawincount(filename) - 1, len(df.columns))
et = time.time()

print(f"Shape: {df_shape}")
print(np.round((et-st),3), "Seconds")

In [14]:
st = time.time()

df = df[(df["carbon_percent"] >= 0) & (df["nitrogen_percent"] >= 0)]

et = time.time()
print(np.round((et-st),3), "Seconds")

0.043 Seconds


In [33]:
type(df)

dask.dataframe.dask_expr._collection.DataFrame

In [34]:
df.info()

<class 'dask.dataframe.dask_expr.DataFrame'>
Columns: 8 entries, timestamp to pH
dtypes: float64(7), int64(1)

In [36]:
df.columns

Index(['timestamp', 'soil_moisture', 'soil_water_content', 'carbon_percent',
       'nitrogen_percent', 'atmospheric_humidity', 'temperature', 'pH'],
      dtype='object')

In [15]:
import concurrent.futures

In [16]:
file_path = "C:/Users/anike/Downloads/1-BRC.csv"
df = dd.read_csv(file_path, usecols=["carbon_percent", "nitrogen_percent"])

In [17]:
def compute_statistics(column):
    start_time = time.time()

    # Convert Dask to Polars for faster computation
    pl_df = pl.from_pandas(df[column].compute())

    # Compute key statistics
    stats = {
        "Mean": pl_df.mean()[0],
        "Median": pl_df.median()[0],
        "Standard Deviation": pl_df.std()[0],
        "Q1": pl_df.quantile(0.25)[0],
        "Q3": pl_df.quantile(0.75)[0]
    }

    end_time = time.time()
    stats["Computation Time"] = round(end_time - start_time, 2)

    return stats

# Parallel execution using ThreadPoolExecutor
def parallel_compute():
    with concurrent.futures.ThreadPoolExecutor() as executor:
        future_carbon = executor.submit(compute_statistics, "carbon_percent")
        future_nitrogen = executor.submit(compute_statistics, "nitrogen_percent")

        carbon_stats = future_carbon.result()
        nitrogen_stats = future_nitrogen.result()

    return carbon_stats, nitrogen_stats


In [None]:
from dask.diagnostics import ProgressBar

ProgressBar().register()

start_time = time.time()

df = dd.read_csv(file_path, usecols=["carbon_percent", "nitrogen_percent"])

load_time = time.time() - start_time
print(f"Data Loaded in {load_time:.2f} seconds")


✅ Data Loaded in 0.29 seconds


In [None]:
# Start Timer for Computation
start_time = time.time()

# Use Dask's parallel computation to calculate statistics
stats = df.describe().compute(scheduler="threads")  # Uses multithreading

# End Timer
compute_time = time.time() - start_time

# Display Statistics
print(stats)

print(f"✅ Statistics Computed in {compute_time:.2f} seconds")
