<a href="https://colab.research.google.com/github/dornercr/DSCI511/blob/main/DSCI511_week9.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 10.0 Big Data – Conceptual Foundations + Toy Dataset

Big Data is not defined by a strict numerical threshold. Instead, it describes data that exceeds the ability of conventional tools to store, process, or analyze efficiently. The 3Vs—Volume, Velocity, and Variety—provide the classic framework for understanding these limits.

This notebook creates a toy Big Data environment using:

- A synthetic “large” dataset (1M rows)

- A small streaming dataset simulating velocity

- Multiple modalities simulating variety

- Artificial noise simulating veracity problems

- Map–Reduce examples

- PySpark RDD operations

## 10.0.1 Creating a Toy Big-Data-Like Dataset (Volume, Velocity, Variety)

In [1]:
import numpy as np
import pandas as pd
import time

# -------------------------------
# Create a "large" dataset (~1M rows)
# -------------------------------
N = 1_000_000

np.random.seed(42)

df_large = pd.DataFrame({
    "user_id": np.random.randint(1, 50000, N),
    "event_type": np.random.choice(["click", "view", "purchase"], N),
    "event_value": np.random.rand(N) * 100,
    "timestamp": pd.date_range("2025-01-01", periods=N, freq="s")
})

df_large.head()


Unnamed: 0,user_id,event_type,event_value,timestamp
0,15796,click,82.494904,2025-01-01 00:00:00
1,861,purchase,34.71047,2025-01-01 00:00:01
2,38159,click,46.806673,2025-01-01 00:00:02
3,44733,click,98.884011,2025-01-01 00:00:03
4,11285,click,52.1905,2025-01-01 00:00:04


## 10.0.2 Simulating Veracity Problems (Noise, Missingness, Outliers)

In [2]:
df_corrupted = df_large.copy()

# inject missing values
mask_missing = np.random.choice([True, False], size=N, p=[0.02, 0.98])
df_corrupted.loc[mask_missing, "event_value"] = np.nan

# inject outliers
mask_outliers = np.random.choice([True, False], size=N, p=[0.001, 0.999])
df_corrupted.loc[mask_outliers, "event_value"] = df_corrupted["event_value"] * 1000

df_corrupted.head(10)


Unnamed: 0,user_id,event_type,event_value,timestamp
0,15796,click,82.494904,2025-01-01 00:00:00
1,861,purchase,34.71047,2025-01-01 00:00:01
2,38159,click,46.806673,2025-01-01 00:00:02
3,44733,click,98.884011,2025-01-01 00:00:03
4,11285,click,52.1905,2025-01-01 00:00:04
5,6266,purchase,93.340141,2025-01-01 00:00:05
6,16851,purchase,72.5445,2025-01-01 00:00:06
7,37195,purchase,36.857333,2025-01-01 00:00:07
8,21963,purchase,39.883588,2025-01-01 00:00:08
9,47192,view,58.568382,2025-01-01 00:00:09


## 10.0.3 Simulating Velocity (Streaming 5 events/second)

In [3]:
import itertools

stream = itertools.cycle([
    {"user_id": 1, "event": "click"},
    {"user_id": 2, "event": "view"},
    {"user_id": 3, "event": "purchase"},
])

# Stream 10 events with delay
for i in range(10):
    print(i, next(stream))
    time.sleep(0.2)


0 {'user_id': 1, 'event': 'click'}
1 {'user_id': 2, 'event': 'view'}
2 {'user_id': 3, 'event': 'purchase'}
3 {'user_id': 1, 'event': 'click'}
4 {'user_id': 2, 'event': 'view'}
5 {'user_id': 3, 'event': 'purchase'}
6 {'user_id': 1, 'event': 'click'}
7 {'user_id': 2, 'event': 'view'}
8 {'user_id': 3, 'event': 'purchase'}
9 {'user_id': 1, 'event': 'click'}


# 10.1 Hardware-Aware Thinking (Toy Demonstrations)

This section creates mini-experiments showing:

- RAM constraints

- Disk vs memory fetch speed

- CPU parallelization impact

## 10.1.1 Memory Pressure Simulation

In [4]:
# measure memory footprint of the dataset
df_large.memory_usage(deep=True).sum() / (1024**2)


np.float64(75.0227279663086)

## 10.1.2 Disk vs RAM Demo (Write + Read Timing)

In [5]:
%%time

df_large.to_csv("large_temp.csv", index=False)


CPU times: user 5.79 s, sys: 60.7 ms, total: 5.85 s
Wall time: 6.33 s


In [9]:
%%time

pd.read_csv("large_temp.csv")


CPU times: user 971 ms, sys: 84.8 ms, total: 1.06 s
Wall time: 1.06 s


Unnamed: 0,user_id,event_type,event_value,timestamp
0,15796,click,82.494904,2025-01-01 00:00:00
1,861,purchase,34.710470,2025-01-01 00:00:01
2,38159,click,46.806673,2025-01-01 00:00:02
3,44733,click,98.884011,2025-01-01 00:00:03
4,11285,click,52.190500,2025-01-01 00:00:04
...,...,...,...,...
999995,20377,purchase,12.219603,2025-01-12 13:46:35
999996,31747,click,71.108958,2025-01-12 13:46:36
999997,9462,click,28.159659,2025-01-12 13:46:37
999998,9847,view,5.024435,2025-01-12 13:46:38


# 10.2 Parallelization (Multiprocessing Demo)

We create a CPU-intensive task (sum of squares) and parallelize it.

## 10.2.1 Multiprocessing Example

In [11]:
from multiprocessing import Pool

data = list(range(1_000_000))

def square(x):
    return x*x

%time
# serial
serial = list(map(square, data))


CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 4.77 µs


In [12]:
%time
# parallel
with Pool(4) as p:
    parallel = p.map(square, data)


CPU times: user 4 µs, sys: 0 ns, total: 4 µs
Wall time: 8.11 µs


# 10.3 Map–Reduce (Local Python Version)

We perform a word count using pure Python, simulating classic MapReduce.

## 10.3.1 Map Step

In [13]:
text = [
    "big data requires scalable systems",
    "data science uses map reduce",
    "reduce steps aggregate intermediate results",
    "big compute big clusters big problems"
]

def map_words(line):
    return [(word, 1) for word in line.split()]

mapped = []
for line in text:
    mapped.extend(map_words(line))

mapped[:10]


[('big', 1),
 ('data', 1),
 ('requires', 1),
 ('scalable', 1),
 ('systems', 1),
 ('data', 1),
 ('science', 1),
 ('uses', 1),
 ('map', 1),
 ('reduce', 1)]

## 10.3.2 Shuffle Step (Group by Key)

In [14]:
from collections import defaultdict

shuffle = defaultdict(list)

for word, count in mapped:
    shuffle[word].append(count)

dict(list(shuffle.items())[:5])


{'big': [1, 1, 1, 1],
 'data': [1, 1],
 'requires': [1],
 'scalable': [1],
 'systems': [1]}

## 10.3.3 Reduce Step

In [15]:
reduced = {word: sum(counts) for word, counts in shuffle.items()}
reduced


{'big': 4,
 'data': 2,
 'requires': 1,
 'scalable': 1,
 'systems': 1,
 'science': 1,
 'uses': 1,
 'map': 1,
 'reduce': 2,
 'steps': 1,
 'aggregate': 1,
 'intermediate': 1,
 'results': 1,
 'compute': 1,
 'clusters': 1,
 'problems': 1}

# 10.4 PySpark Demo (True Distributed Engine)

This part will fully run inside Colab.

## 10.4.1 Install & Initialize PySpark (Colab)

In [16]:
!pip install pyspark




In [17]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("bigdata_demo").getOrCreate()
sc = spark.sparkContext


## 10.4.2 Create RDD from Text

In [18]:
rdd = sc.parallelize(text)
rdd.collect()


['big data requires scalable systems',
 'data science uses map reduce',
 'reduce steps aggregate intermediate results',
 'big compute big clusters big problems']

## 10.4.3 flatMap → map → reduceByKey (True Word Count)

In [19]:
wordcount = (
    rdd.flatMap(lambda line: line.split())
       .map(lambda word: (word, 1))
       .reduceByKey(lambda a, b: a + b)
)

wordcount.collect()


[('big', 4),
 ('requires', 1),
 ('science', 1),
 ('uses', 1),
 ('map', 1),
 ('steps', 1),
 ('aggregate', 1),
 ('intermediate', 1),
 ('results', 1),
 ('problems', 1),
 ('data', 2),
 ('scalable', 1),
 ('systems', 1),
 ('reduce', 2),
 ('compute', 1),
 ('clusters', 1)]

## 10.4.4 Sorting Results

In [20]:
wordcount.sortBy(lambda x: -x[1]).collect()


[('big', 4),
 ('data', 2),
 ('reduce', 2),
 ('requires', 1),
 ('science', 1),
 ('uses', 1),
 ('map', 1),
 ('steps', 1),
 ('aggregate', 1),
 ('intermediate', 1),
 ('results', 1),
 ('problems', 1),
 ('scalable', 1),
 ('systems', 1),
 ('compute', 1),
 ('clusters', 1)]

## 10.4.5 Using Spark DataFrames with the “Large” Toy Dataset

In [21]:
df_spark = spark.createDataFrame(df_large)
df_spark.printSchema()
df_spark.show(5)


root
 |-- user_id: long (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_value: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)

+-------+----------+------------------+-------------------+
|user_id|event_type|       event_value|          timestamp|
+-------+----------+------------------+-------------------+
|  15796|     click|  82.4949037469124|2025-01-01 00:00:00|
|    861|  purchase|34.710469826837155|2025-01-01 00:00:01|
|  38159|     click|46.806673359542394|2025-01-01 00:00:02|
|  44733|     click| 98.88401144759135|2025-01-01 00:00:03|
|  11285|     click|  52.1905002143988|2025-01-01 00:00:04|
+-------+----------+------------------+-------------------+
only showing top 5 rows



## 10.4.6 Distributed Aggregations

In [22]:
df_spark.groupBy("event_type").count().show()


+----------+------+
|event_type| count|
+----------+------+
|  purchase|333645|
|      view|334035|
|     click|332320|
+----------+------+



## 10.4.7 Example: Compute Average Event Value per user_id

In [23]:
df_spark.groupBy("user_id").avg("event_value").show(10)


+-------+------------------+
|user_id|  avg(event_value)|
+-------+------------------+
|  32098|54.399675114399564|
|  28762| 55.48205363650108|
|  15375|45.950635375331316|
|   5409|  53.5815770518254|
|  19979|61.883167025158066|
|  30562| 65.30684668567017|
|  44901| 51.82673977614868|
|   9715|50.507313934558745|
|  26486| 60.17436141249474|
|  11945| 57.34458833332217|
+-------+------------------+
only showing top 10 rows

