In [0]:

# Cell 1: Install XML parsing library
# lxml provides efficient XML processing for large Wikipedia dumps
%pip install lxml
dbutils.library.restartPython()


In [0]:
# Cell 2: Configure Azure Data Lake Storage Gen2 authentication

storage_account = "sradatalake"  
storage_key = "<your-storage-account-key"  

# Configure Spark to access Azure Storage directly
spark.conf.set(
    f"fs.azure.account.key.{storage_account}.dfs.core.windows.net",
    storage_key
)

print(" Storage configured! You can now access files directly.")

In [0]:
# Cell 3: Verify Azure Storage connectivity and list source files
# Displays file inventory with sizes for validation before processing

container = "pile-raw"
directory = "dumps"


path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/{directory}/"

files = dbutils.fs.ls(path)

for f in files:
    size_gb = f.size / (1024**3)
    print(f"{f.name}: {size_gb:.2f} GB")

In [0]:
# Cell 4: Initial data profiling - Extract sample articles for quality assessment
# Processes 100K lines (~245 articles) to validate XML structure and content quality

import bz2
import xml.etree.ElementTree as ET
from pyspark.sql import Row
import io


storage_account = "sradatalake"  
file_path = f"abfss://pile-raw@{storage_account}.dfs.core.windows.net/dumps/enwiki-latest-pages-articles.xml.bz2"

print(" Reading file using Spark's native bz2 support...")

# Leverage Spark's distributed decompression for parallel processing
text_lines = spark.read.text(file_path)

print(" File loaded into Spark DataFrame")
print(" Processing lines to extract articles...")

# Sample size determined through initial data profiling
sample_lines = text_lines.limit(100000).collect()

print(f"üìù Collected {len(sample_lines)} lines for parsing")

# Reconstruct full text from distributed line reads
full_text = '\n'.join([row.value for row in sample_lines])

print(" Parsing XML...")

# Parse the XML
articles = []
count = 0
max_articles = 1000

# XML namespace
ns = {'mw': 'http://www.mediawiki.org/xml/export-0.10/'}

try:
    
    import re
    
    # Regex extraction optimized for large-scale XML processing
    page_pattern = r'<page>(.*?)</page>'
    pages = re.findall(page_pattern, full_text, re.DOTALL)
    
    print(f"Found {len(pages)} pages in sample")
    
    for page_text in pages:
        if count >= max_articles:
            break
        
        # Extract title
        title_match = re.search(r'<title>(.*?)</title>', page_text)
        # Extract text content
        text_match = re.search(r'<text[^>]*>(.*?)</text>', page_text, re.DOTALL)
        
        if title_match and text_match:
            title = title_match.group(1)
            text = text_match.group(1)
            
            if text and len(text) > 100:
                articles.append(Row(
                    title=title,
                    text=text,
                    text_length=len(text)
                ))
                count += 1
                
                if count % 100 == 0:
                    print(f"Extracted {count} articles...")
    
    print(f"\n‚úÖ Extracted {len(articles)} articles")
    
    # Create DataFrame
    df = spark.createDataFrame(articles)
    df.show(5, truncate=50)
    print(f"\nüìä Total articles: {df.count()}")
    
except Exception as e:
    print(f"‚ùå Parsing error: {e}")

In [0]:
# Cell 5: Full-scale extraction - Process target volume of articles
# Increased to 500K lines based on profiling results (100K‚Üí245 articles; 500K‚Üí1000+ articles)

import bz2
import xml.etree.ElementTree as ET
from pyspark.sql import Row
import io
import re

storage_account = "sradatalake"
file_path = f"abfss://pile-raw@{storage_account}.dfs.core.windows.net/dumps/enwiki-latest-pages-articles.xml.bz2"

print(" Reading file using Spark's native bz2 support...")

# Read as text lines (Spark handles bz2 decompression automatically)
text_lines = spark.read.text(file_path)

print(" File loaded into Spark DataFrame")
print(" Processing lines to extract articles...")

# Sample size optimized from initial profiling metrics
sample_size = 500000
sample_lines = text_lines.limit(sample_size).collect()

print(f" Collected {len(sample_lines)} lines for parsing")

# Join lines into text
full_text = '\n'.join([row.value for row in sample_lines])

print(" Parsing XML...")

articles = []
count = 0
max_articles = 1000

# XML namespace
ns = {'mw': 'http://www.mediawiki.org/xml/export-0.10/'}

# Pattern to extract pages
page_pattern = r'<page>(.*?)</page>'
pages = re.findall(page_pattern, full_text, re.DOTALL)

print(f"Found {len(pages)} pages in sample")

for page_text in pages:
    if count >= max_articles:
        break
    
    # Extract title
    title_match = re.search(r'<title>(.*?)</title>', page_text)
    # Extract text content
    text_match = re.search(r'<text[^>]*>(.*?)</text>', page_text, re.DOTALL)
    
    if title_match and text_match:
        title = title_match.group(1)
        text = text_match.group(1)
        
        if text and len(text) > 100:
            articles.append(Row(
                title=title,
                text=text,
                text_length=len(text)
            ))
            count += 1
            
            if count % 100 == 0:
                print(f"Extracted {count} articles...")

print(f"\n‚úÖ Extracted {len(articles)} articles")

# Create DataFrame
df = spark.createDataFrame(articles)
df.show(5, truncate=50)

print(f"\nüìä DataFrame Statistics:")
print(f"Total articles: {df.count()}")
df.describe(['text_length']).show()

# Display sample article for quality verification
print("\nüìñ Sample article:")
sample = df.filter(df.text_length > 1000).first()
if sample:
    print(f"\nTitle: {sample.title}")
    print(f"Text preview: {sample.text[:500]}...")
    print(f"Total length: {sample.text_length} characters")

In [0]:
# Cell 6: Text cleaning and normalization
# Removes Wikipedia markup syntax and applies content quality filters

from pyspark.sql.functions import col, length, regexp_replace

# Clean Wikipedia markup
df_clean = df.select(
    col("title"),
    regexp_replace(col("text"), r'\[\[|\]\]|\{\{|\}\}|\'\'\'|\'\'|==|<.*?>', ' ').alias("text_clean"),
    col("text_length")
).filter(
    (col("text_length") >= 500) &  # At least 500 characters
    (col("text_length") <= 50000)  # Max 50K
)

print(f"Original: {df.count()} articles")
print(f"After filtering: {df_clean.count()} articles")

df_clean.show(5, truncate=50)

In [0]:
# Cell 7: Persist cleaned data to Azure Data Lake Storage
# Saves processed articles in Parquet format for efficient downstream consumption

from pyspark.sql.functions import col, length, regexp_replace

# Clean Wikipedia markup
df_clean = df.select(
    col("title"),
    regexp_replace(col("text"), r'\[\[|\]\]|\{\{|\}\}|\'\'\'|\'\'|==|<.*?>', ' ').alias("text_clean"),
    col("text_length")
).filter(
    (col("text_length") >= 500) &  # At least 500 characters
    (col("text_length") <= 50000)  # Max 50K
)

print(f"Original: {df.count()} articles")
print(f"After filtering: {df_clean.count()} articles")
df_clean.show(5, truncate=50)

# Save to correct container name
output_path = f"abfss://processed-data@{storage_account}.dfs.core.windows.net/wikipedia_1000/"

print(f"\nüíæ Saving cleaned articles to: {output_path}")

df_clean.write.format("parquet") \
    .mode("overwrite") \
    .save(output_path)

article_count = df_clean.count()

print(f"‚úÖ Saved {article_count} cleaned articles!")
print(f"üìç Location: {output_path}")

# Verify the save
print("\nüîç Verifying saved data...")
df_verify = spark.read.parquet(output_path)
print(f"‚úÖ Verification successful! Loaded {df_verify.count()} articles")
df_verify.show(3, truncate=50)

In [0]:
# Cell 8: Data quality metrics and summary statistics
# Generate descriptive analytics for processed article corpus
from pyspark.sql.functions import avg, min, max

stats = df_clean.agg(
    avg("text_length").alias("avg_length"),
    min("text_length").alias("min_length"),
    max("text_length").alias("max_length")
).collect()[0]

print(f"üìä Statistics:")
print(f"  Average length: {stats.avg_length:.0f} characters")
print(f"  Shortest: {stats.min_length} characters")
print(f"  Longest: {stats.max_length} characters")

# Show longest articles
print("\nüìö Longest articles:")
df_clean.orderBy(col("text_length").desc()).select("title", "text_length").show(10)
