<a href="https://colab.research.google.com/github/groda/big_data/blob/master/Ngrams.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Ngrams with PySpark



# Spark Job Nr.1 (Common Crawl)

## Retrieve the data

Let us download some text data from the [Common Crawl archive](https://data.commoncrawl.org/) (to know why I chose this dataset see my [conversation with AI](https://gemini.google.com/share/e94f879fa56a)).

In [1]:
import requests
import gzip

# The URL for the index of the January 2026 crawl archive (https://data.commoncrawl.org/crawl-data/CC-MAIN-2026-04/index.html)
index_url = "https://data.commoncrawl.org/crawl-data/CC-MAIN-2026-04/wet.paths.gz"

local_filename = "wet.paths.gz"

print("Downloading index...")
with requests.get(index_url, stream=True) as r:
    with open(local_filename, 'wb') as f:
        for chunk in r.iter_content(chunk_size=8192):
            f.write(chunk)
print("Download done! Now you can open it normally with gzip.open().")

# Read the content of the gzipped file
with gzip.open('wet.paths.gz', 'rb') as f:
    # Read all content and decode it
    content = f.read().decode('utf-8')

# Split the content into lines, removing any empty strings from splitting
paths = [line for line in content.strip().split('\n') if line]

# Define the output filename
output_filename = "wet_paths_list.txt"

# Save the paths to a new text file, one path per line
with open(output_filename, 'w') as out_f:
    for path in paths:
        out_f.write(path + '\n')

print(f"Paths saved to {output_filename}")
print("\nFirst 10 paths:")
for i, path in enumerate(paths[:10]):
    print(path)

Downloading index...
Download done! Now you can open it normally with gzip.open().
Paths saved to wet_paths_list.txt

First 10 paths:
crawl-data/CC-MAIN-2026-04/segments/1768220467618.22/wet/CC-MAIN-20260112161239-20260112191239-00000.warc.wet.gz
crawl-data/CC-MAIN-2026-04/segments/1768220467618.22/wet/CC-MAIN-20260112161239-20260112191239-00001.warc.wet.gz
crawl-data/CC-MAIN-2026-04/segments/1768220467618.22/wet/CC-MAIN-20260112161239-20260112191239-00002.warc.wet.gz
crawl-data/CC-MAIN-2026-04/segments/1768220467618.22/wet/CC-MAIN-20260112161239-20260112191239-00003.warc.wet.gz
crawl-data/CC-MAIN-2026-04/segments/1768220467618.22/wet/CC-MAIN-20260112161239-20260112191239-00004.warc.wet.gz
crawl-data/CC-MAIN-2026-04/segments/1768220467618.22/wet/CC-MAIN-20260112161239-20260112191239-00005.warc.wet.gz
crawl-data/CC-MAIN-2026-04/segments/1768220467618.22/wet/CC-MAIN-20260112161239-20260112191239-00006.warc.wet.gz
crawl-data/CC-MAIN-2026-04/segments/1768220467618.22/wet/CC-MAIN-2026011216

In [2]:
import os

n = 5
print(f"Download data from the first {n} paths:")
for i, path in enumerate(paths[:n]):
    full_url = "https://data.commoncrawl.org/" + path
    local_filename = os.path.basename(path) # Extract filename from the path
    print(f"Downloading {full_url} to {local_filename}")
    try:
        with requests.get(full_url, stream=True) as r:
            r.raise_for_status() # Raise an exception for bad status codes
            with open(local_filename, 'wb') as f:
                for chunk in r.iter_content(chunk_size=8192):
                    f.write(chunk)
        print(f"Successfully downloaded {local_filename}")
    except requests.exceptions.RequestException as e:
        print(f"Error downloading {full_url}: {e}")

Download data from the first 5 paths:
Downloading https://data.commoncrawl.org/crawl-data/CC-MAIN-2026-04/segments/1768220467618.22/wet/CC-MAIN-20260112161239-20260112191239-00000.warc.wet.gz to CC-MAIN-20260112161239-20260112191239-00000.warc.wet.gz
Successfully downloaded CC-MAIN-20260112161239-20260112191239-00000.warc.wet.gz
Downloading https://data.commoncrawl.org/crawl-data/CC-MAIN-2026-04/segments/1768220467618.22/wet/CC-MAIN-20260112161239-20260112191239-00001.warc.wet.gz to CC-MAIN-20260112161239-20260112191239-00001.warc.wet.gz
Successfully downloaded CC-MAIN-20260112161239-20260112191239-00001.warc.wet.gz
Downloading https://data.commoncrawl.org/crawl-data/CC-MAIN-2026-04/segments/1768220467618.22/wet/CC-MAIN-20260112161239-20260112191239-00002.warc.wet.gz to CC-MAIN-20260112161239-20260112191239-00002.warc.wet.gz
Successfully downloaded CC-MAIN-20260112161239-20260112191239-00002.warc.wet.gz
Downloading https://data.commoncrawl.org/crawl-data/CC-MAIN-2026-04/segments/176822

## Preprocess the data

Extract some English text from the archives. For that we are going to use the `warcio` library.

In [3]:
!pip install warcio



We need some filter to:


*  Select records in the English language (`WARC-Identified-Content-Language` = `eng`)
*  Pick records with long lines (function `is_high_quality_record`) in the hope that the record will contain narrative text and not junk



In [4]:
import gzip
from pathlib import Path
from warcio.archiveiterator import ArchiveIterator

def is_high_quality_record(record_text, min_line_len=80, threshold=0.7):
    # Split and clean lines for calculation
    lines = [l.strip() for l in record_text.split('\n') if l.strip()]
    if not lines:
        return False

    total_chars = sum(len(l) for l in lines)
    long_line_chars = sum(len(l) for l in lines if len(l) >= min_line_len)

    narrative_ratio = long_line_chars / total_chars
    return narrative_ratio >= threshold

def process_all_archives(directory_path, output_path):
    # Open in 'w' mode to ensure the file is empty or created before writing.
    # Subsequent writes will then append to this cleared/new file.
    with open(output_path, 'w', encoding='utf-8') as out_f:
        # Looking for the compressed WET files
        for file_path in Path(directory_path).rglob('*.wet.gz'):
            print(f"Processing: {file_path}")

            with open(file_path, 'rb') as stream:
                for record in ArchiveIterator(stream):
                    if record.rec_type == 'conversion':

                        # 1. Language Check
                        lang = record.rec_headers.get_header('WARC-Identified-Content-Language')
                        if lang != 'eng':
                            continue

                        # 2. Extract Body
                        try:
                            text_content = record.content_stream().read().decode('utf-8', 'ignore')
                        except Exception:
                            continue

                        # 3. Quality Filter
                        if is_high_quality_record(text_content):
                            # Write the raw text block exactly as it is
                            out_f.write(text_content.strip())

                            # Add a separator so the next record doesn't start on the same line
                            out_f.write("\n\n" + "-"*40 + "\n\n")

# Run the process
process_all_archives('.', 'narrative_corpus.txt')

Processing: CC-MAIN-20260112161239-20260112191239-00001.warc.wet.gz
Processing: CC-MAIN-20260112161239-20260112191239-00004.warc.wet.gz
Processing: CC-MAIN-20260112161239-20260112191239-00003.warc.wet.gz
Processing: CC-MAIN-20260112161239-20260112191239-00002.warc.wet.gz
Processing: CC-MAIN-20260112161239-20260112191239-00000.warc.wet.gz


Count number of lines in `narrative_corpus.txt`.

In [5]:
!ls -lh narrative_corpus.txt

-rw-r--r-- 1 root root 84M Feb 22 10:46 narrative_corpus.txt


## Create a Spark context

A Spark context (or a session, that encapsulates a context) is the entry gate for Spark.
It represents the Spark engine (whether on the local machine or on a cluster) and provides an API for creating and running data pipelines.

In this example, we're going to load a text file into a RDD, split the text into ngrams, and count the frequency of ngrams.

### A note on memory configuration in Google Colab

In a typical distributed Spark cluster, `spark.driver.memory` and `spark.executor.memory` refer to distinct JVMs on different machines, so they would be additive. However, in our current setup (the default for PySpark is `local[*]`), both the driver and executor essentially run within the same JVM process on your single Colab instance.

Because they share the same JVM, it makes sense to optimize memory allocation for the 12GB Colab environment by setting `spark.driver.memory` as well as `spark.executor.memory` to 10g. This will allow Spark to utilize up to $10$GB of the available RAM, while leaving some buffer for the operating system and Python interpreter. This should provide a good balance.

In [8]:
from pyspark import SparkContext, SparkConf
from operator import add

# Configure Spark to use more memory
conf = (SparkConf()
    .setAppName("Ngrams with PySpark")
    .set("spark.driver.memory", "10g")  # Allocate 10GB for the driver (and implicitly, the single local executor)
    .set("spark.executor.memory", "10g") # Keep executor memory aligned with driver for local mode
)
sc = SparkContext(
    conf=conf
)
print("SparkContext initialized with increased memory settings.")

SparkContext initialized with increased memory settings.


In [9]:
sc

We are going to use the file `narrative_corpus.txt`.

### Create RDD from file

The second parameter ($8$)  indicates the desired number of partitions.

In [10]:
textFile = sc.textFile("narrative_corpus.txt", 8)
print("textFile is of type: {}\nNumber of partitions: {}". \
      format(type(textFile), textFile.getNumPartitions()))

textFile is of type: <class 'pyspark.core.rdd.RDD'>
Number of partitions: 8


## Extract trigrams

The next pipeline only contains transformations (`flatMap`, `map`, `reduceByKey`, `sortBy`), this means that no actual computation takes place due to Spark's _laziness_.

In [11]:
n = 3
ngrams = (textFile
    .flatMap(lambda x: [x.split()])  # Each line is split into words, creating an RDD of lists of words (one list per line)
    .flatMap(lambda x: [tuple(y) for y in zip(*[x[i:] for i in range(n)])])  # Generates word ngrams from each list of words
    .map(lambda x: (x, 1))
    .reduceByKey(add)
    .sortBy(lambda x: x[1], ascending=False) # This transformation performs a global sort and is computationally expensive. It is only executed when an action (like .take() in the next cell) is called.
)

# Note: Spark transformations are lazy. The code above only defines the computation plan.
# The actual work (and thus the perceived slowness) will occur when an action (e.g., .take(), .count(), .saveAsTextFile()) is invoked on this 'ngrams' RDD.

`ngrams` is an object of type `RDD`

In [12]:
type(ngrams)

Up to now we've just carried out a series of _transformations_. Spark hasn't jet done any computation. By applying the _action_ `take` we first act on the data to get a result.

In [13]:
for (ngram, count) in ngrams.take(10):
    print("{:<20}{:>d}".format(' '.join(ngram), count))

1 1 1               34998
one of the          2801
as well as          2412
Skip to content     1671
be able to          1594
â˜… â˜… â˜…               1453
If you are          1431
a lot of            1429
Replicas Bags Replicas1409
Bags Replicas Bags  1409


Our small sample of the Common Crawl already contains some spammy content, apparently ("_Replicas Bags Replicas_" trigram), despite the basic filtering.

Well, if you explore the Common Crawl expecting a pristine snapshot of human knowledge you're going to be disappointed! You are going to need quite some data engineering work to filter the "raw sewage". Common Crawl is in fact designed to be a neutral, unfiltered crawl of the open web, and unfortunately, a massive percentage of the open web is composed of SEO-optimized adult content, spam, and "junk" domains ðŸ˜•.

The good news is that you won't run out of work with your Spark tools any time soon ðŸ™‚.


### Transformations and actions seen so far

**Transformations**
- `map`
- `flatMap`
- `filter`
- `reduceByKey`
- `sortBy`

**Actions**
- `take`

# Spark Job Nr. 2 (genomic data)

## Download genomic file

You can find the `GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fna` file on the NCBI FTP server. I will download the gzipped version and then decompress it.

In [14]:
import requests
import gzip
import os

file_url = "https://ftp.ncbi.nlm.nih.gov/genomes/all/GCA/003/711/455/GCA_003711455.1_HG02106_EEE_SV-Pop.1/GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fna.gz"
local_gzipped_filename = "GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fna.gz"
local_uncompressed_filename = "GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fna"

print(f"Downloading {file_url}...")

try:
    with requests.get(file_url, stream=True) as r:
        r.raise_for_status()  # Raise an exception for bad status codes
        with open(local_gzipped_filename, 'wb') as f:
            for chunk in r.iter_content(chunk_size=8192):
                f.write(chunk)
    print(f"Successfully downloaded {local_gzipped_filename}")

    print(f"Decompressing {local_gzipped_filename}...")
    with gzip.open(local_gzipped_filename, 'rb') as f_in:
        with open(local_uncompressed_filename, 'wb') as f_out:
            f_out.write(f_in.read())
    print(f"Successfully decompressed to {local_uncompressed_filename}")

except requests.exceptions.RequestException as e:
    print(f"Error downloading {file_url}: {e}")
except Exception as e:
    print(f"Error during decompression: {e}")


Downloading https://ftp.ncbi.nlm.nih.gov/genomes/all/GCA/003/711/455/GCA_003711455.1_HG02106_EEE_SV-Pop.1/GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fna.gz...
Successfully downloaded GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fna.gz
Decompressing GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fna.gz...
Successfully decompressed to GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fna


Now the file `GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fna` should be available in your Colab environment. It is a pretty large file ($1.4$GB).

In [15]:
!ls -lh GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fna

-rw-r--r-- 1 root root 1.4G Feb 22 10:50 GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fna


In [16]:
genomeFile = sc.textFile("GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fna", minPartitions=12)
genomeFile.take(2)[:140]

['>QVRK01000602.1 Homo sapiens isolate HG02106 chromosome 1 1-100500000:0, whole genome shotgun sequence',
 'CCCCAGCCACCCTTgcttccctgccccagccttccatcTCATCTCTCTTGCTTCCATCTCTGGCTTTTCCACTCCAGCCA']

### What's a shotgun sequence (in the context of genomic files)?

**A "shotgun sequence"** in a genomic file almost always refers to a **DNA sequencing read** (or a short DNA sequence fragment) that comes from **shotgun sequencing**, a widely used method for determining the sequence of genomes.

#### What is shotgun sequencing?
Shotgun sequencing is a technique for reading the DNA of an entire genome (or large genomic regions). The name comes from the idea of a "shotgun blast" â€” random and widespread fragmentation.

The basic process works like this:

1. Take the long DNA molecule (e.g., a whole chromosome or entire genome)  
2. Randomly break it into many small overlapping fragments (mechanically or enzymatically)  
3. Sequence each of these small fragments individually â†’ each resulting short sequence is called a **read**  
4. Use powerful computers to find overlapping regions between all these reads  
5. Piece the overlapping reads back together computationally to reconstruct the original long sequence (this is called **assembly**)

This random, parallel approach made it possible to sequence very large genomes much faster and cheaper than earlier ordered methods.

#### What you'll actually see in a genomic file
In FASTA, FASTQ, or other sequence files from a shotgun sequencing project, each entry is typically one of these short pieces â€” one "shotgun sequence":

- Header line (e.g. `@read_name` or `>contig_001`)  
- Followed by the actual DNA sequence (usually 50â€“300 bp for short-read technologies like Illumina, or thousands of bp for long-read tech like PacBio/Oxford Nanopore)

Example of what one might look like in a file:

```
>SRR123456.1 length=151
GATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATC
```

That string of A's, C's, G's and T's **is** a shotgun sequence â€” a small random piece of the genome that came from the fragmentation step.

#### Common variations/contexts
- **Whole-genome shotgun sequencing (WGS)** â€” shotgun applied to the entire genome at once (most common today)  
- **Hierarchical shotgun** â€” older approach where the genome was first broken into large mapped clones (BACs), then shotgun-sequenced individually  
- **Shotgun metagenomics** â€” same principle, but applied to mixed microbial communities (e.g. gut, soil, ocean samples) instead of one organism

In modern genomics files (especially from next-generation sequencing), almost everything you see labeled as sequences in raw data files are shotgun sequences/reads â€” unless the file is already a finished assembly (contigs/chromosomes).

So when someone says "shotgun sequence" in the context of a genomic file, they're usually referring to one of those raw, short, randomly sampled DNA reads that are the building blocks used to reconstruct the full genome.

This passage is to remove newlines and use comment lines (beginning with ">") as block delimiters. The new file is saved in `GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fnaNN`.

## Preprocess

In [20]:
!rm -rf GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fnaNN

In [21]:
import re
genomeFile \
  .map(lambda x: re.sub('^>.*', '---', x)) \
  .map(lambda x: x.upper()) \
  .map(lambda x: re.sub('^$', '\n', x)) \
  .saveAsTextFile("GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fnaNN")

In [22]:
genomeFile = sc.textFile("GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fnaNN", minPartitions=12)

## Extract n-grams

In [23]:
%%time
n = 3
ngrams = (genomeFile
          .map(lambda x: re.sub('\n', '', x))
          .flatMap(lambda x: x.split())
          .flatMap(lambda x: [tuple(y) for y in zip(*[x[i:] for i in range(n)])])
          .map(lambda x: (x, 1))
          .reduceByKey(add)
          .sortBy(lambda x: x[1], ascending=False)
)

CPU times: user 186 ms, sys: 50.4 ms, total: 237 ms
Wall time: 22min 37s


In [24]:
%%time
for (ngram, count) in ngrams.take(10):
    print("{:<20}{:>d}".format(' '.join(ngram), count))

T T T               46984458
A A A               46913491
A A T               30131208
A T T               30115609
C A G               29844683
C T G               29717935
A G A               29179399
T C T               29140174
A C A               26962506
T G T               26932345
CPU times: user 20.8 ms, sys: 3.63 ms, total: 24.4 ms
Wall time: 9.57 s
