In [54]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = "--conf spark.driver.memory=2g pyspark-shell"

from pyspark.sql import SparkSession
import findspark
from pyspark.sql.functions import input_file_name
import PyPDF2
import pdfplumber
from docx import Document
from pptx import Presentation
import re
from nltk.corpus import stopwords
import nltk
nltk.download('stopwords')

# Initialize Spark
findspark.init()
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("doc_processor") \
    .getOrCreate()
sc = spark.sparkContext

def preprocess_text(text):
    """Clean and normalize text"""
    text = re.sub(r'[^\w\s]', '', text)
    text = re.sub(r'\s+', ' ', text).strip().lower()
    stop_words = set(stopwords.words('english'))
    return ' '.join([word for word in text.split() if word not in stop_words])

def process_pdf(file_path):
    """Improved PDF text extraction"""
    text = ""
    try:
        with pdfplumber.open(file_path) as pdf:
            for page in pdf.pages:
                text += (page.extract_text() or "") + " "
    except Exception as e:
        with open(file_path, 'rb') as f:
            reader = PyPDF2.PdfReader(f)
            text = ' '.join([page.extract_text() or "" for page in reader.pages])
    return preprocess_text(text)

def process_document(file_path):
    """Process document based on type"""
    if file_path.lower().endswith('.pdf'):
        return process_pdf(file_path)
    elif file_path.lower().endswith(('.doc', '.docx')):
        try:
            return preprocess_text(' '.join([p.text for p in Document(file_path).paragraphs]))
        except:
            return ""
    elif file_path.lower().endswith(('.ppt', '.pptx')):
        try:
            ppt = Presentation(file_path)
            return preprocess_text(' '.join(
                shape.text for slide in ppt.slides 
                for shape in slide.shapes 
                if hasattr(shape, "text")
            ))
        except:
            return ""
    return ""

def chunk_text(text, chunk_size=300):
    """Generate fixed-size text chunks"""
    words = text.split()
    for i in range(0, len(words), chunk_size):
        yield ' '.join(words[i:i+chunk_size])


folder_path = "INFOH515/"
valid_extensions = (".pdf", ".docx", ".pptx")

file_paths = [
    os.path.join(folder_path, f)
    for f in os.listdir(folder_path)
    if f.lower().endswith(valid_extensions) and os.path.isfile(os.path.join(folder_path, f))
]


texts_rdd = sc.parallelize([(fp, process_document(fp)) for fp in file_paths if os.path.exists(fp)])

chunks_rdd = texts_rdd.flatMap(lambda x: [
    (os.path.basename(x[0]), i, chunk)
    for i, chunk in enumerate(chunk_text(x[1]))
    if chunk.strip()
])


chunks_df = chunks_rdd.toDF(["file_name", "chunk_id", "text"])
chunks_df.show(5)

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\cedri\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


+------------+--------+--------------------+
|   file_name|chunk_id|                text|
+------------+--------+--------------------+
|0-prelim.pdf|       0|infoh515 big data...|
| 1-intro.pdf|       0|infoh515 big data...|
| 1-intro.pdf|       1|computer luiz and...|
| 1-intro.pdf|       2|collecting cleani...|
| 1-intro.pdf|       3|viewing habits pu...|
+------------+--------+--------------------+
only showing top 5 rows



In [55]:

output_file = "all_cleaned_texts.txt"
with open(output_file, 'w', encoding='utf-8') as f:
    for file_path in file_paths:
        try:
            cleaned_text = process_document(file_path)
            f.write(cleaned_text + "\n\n")
        except Exception as e:
            print(f"Erreur avec {file_path}: {str(e)}")


In [56]:
from pyspark.sql import SparkSession
import os
import shutil
pdf = chunks_df.toPandas()
output_dir = "output"
output_file = os.path.join(output_dir, "all_chunks.csv")
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)
os.makedirs(output_dir, exist_ok=True)
pdf.to_csv(output_file, index=False, encoding='utf-8')
