In [None]:
!pip install fitz PyMuPDF python-docx openpyxl google-cloud-storage --quiet

In [None]:
!pip install apache-beam[gcp]
!pip install tools

In [None]:
from google.colab import auth
auth.authenticate_user()

In [None]:
import os
import io
import uuid
import logging
from datetime import datetime
from google.cloud import storage
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
import re
from collections import Counter
import json
from bs4 import BeautifulSoup
import fitz  # PyMuPDF for PDF
from docx import Document  # python-docx for DOCX
import xml.etree.ElementTree as ET  # For XML
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
import requests
from typing import Optional
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/content/text-parsing-pipeline-c1d668180e4f.json"

# Download NLTK data
nltk.download('punkt')
nltk.download('stopwords')

# Configure logging
log_file = "pipeline_report.log"
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s',
    handlers=[
        logging.FileHandler(log_file),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# Utility decorator for logging
def logged(func):
    def wrapper(*args, **kwargs):
        try:
            logger.info(f"Started '{func.__name__}' with args: {args}, kwargs: {kwargs}")
            return func(*args, **kwargs)
        except Exception as e:
            logger.exception(f"Error in '{func.__name__}': {e}")
            raise
    return wrapper

@logged
def authenticate_gcs():
    """Authenticate with Google Cloud Storage using default credentials."""
    try:
        client = storage.Client()
        logger.info("GCS authentication successful")
        return client
    except Exception as e:
        logger.error(f"Error authenticating with GCS: {e}")
        raise

@logged
def read_file_from_gcs(bucket_name, file_name):
    """Read a file from GCS and return its content as bytes."""
    client = authenticate_gcs()
    bucket = client.get_bucket(bucket_name)
    blob = bucket.get_blob(file_name)
    if not blob:
        logger.error(f"File {file_name} not found in bucket {bucket_name}")
        raise FileNotFoundError(f"File {file_name} not found in bucket {bucket_name}")
    content = blob.download_as_bytes()
    logger.info(f"Successfully read {file_name} from GCS")
    return content

@logged
def extract_text_from_html(content):
    """Extract text from HTML content using BeautifulSoup."""
    soup = BeautifulSoup(content.decode('utf-8', errors='ignore'), 'html.parser')
    for script_or_style in soup(['script', 'style']):
        script_or_style.decompose()
    text = soup.get_text(separator=' ', strip=True)
    logger.info(f"Extracted HTML text: {text[:50]}...")
    return text

@logged
def extract_text_from_pdf(file_content):
    """Extract text from PDF content using PyMuPDF."""
    with io.BytesIO(file_content) as f:
        doc = fitz.open(stream=f, filetype="pdf")
        text = "\n".join(page.get_text() for page in doc)
        logger.info(f"Extracted PDF text: {text[:50]}...")
        return text

@logged
def extract_text_from_docx(file_content):
    """Extract text from DOCX content using python-docx."""
    with io.BytesIO(file_content) as f:
        doc = Document(f)
        text = [para.text for para in doc.paragraphs if para.text.strip()]
        for table in doc.tables:
            for row in table.rows:
                text.append(" | ".join(cell.text.strip() for cell in row.cells))
        text = "\n".join(text)
        logger.info(f"Extracted DOCX text: {text[:50]}...")
        return text

@logged
def extract_text_from_xml(file_content):
    """Extract text from XML content using ElementTree."""
    try:
        tree = ET.fromstring(file_content.decode('utf-8', errors='ignore'))
        text = ' '.join(tree.itertext()).strip()
        logger.info(f"Extracted XML text: {text[:50]}...")
        return text
    except ET.ParseError as e:
        logger.error(f"XML parsing error: {e}")
        return ""

@logged
def extract_text(file_content, file_name):
    """Extract text based on file extension."""
    extension = os.path.splitext(file_name)[1].lower()
    logger.info(f"Processing file with extension: {extension}")
    if extension == '.html':
        return extract_text_from_html(file_content)
    elif extension == '.pdf':
        return extract_text_from_pdf(file_content)
    elif extension in ['.docx', '.doc']:
        return extract_text_from_docx(file_content)
    elif extension == '.xml':
        return extract_text_from_xml(file_content)
    else:
        logger.error(f"Unsupported file type: {extension}")
        raise ValueError(f"Unsupported file type: {extension}")


@logged
def save_results_to_gcs(bucket_name, output_file_name, results):
    """Save parsed results to GCS as JSON."""
    client = authenticate_gcs()
    bucket = client.get_bucket(bucket_name)
    blob = bucket.blob(output_file_name)
    blob.upload_from_string(json.dumps(results, indent=2), content_type='application/json')
    logger.info(f"Results saved to gs://{bucket_name}/{output_file_name}")

@logged
def text_parsing_pipeline(bucket_name, input_file_name, output_file_name):
    """Main pipeline function to process various file types."""
    try:
        file_content = read_file_from_gcs(bucket_name, input_file_name)
        text = extract_text(file_content, input_file_name)
        save_results_to_gcs(bucket_name, output_file_name, text)
        logger.info(f"Pipeline completed for {input_file_name}")
        return text
    except Exception as e:
        logger.error(f"Pipeline error: {e}")
        raise

# FastAPI app
app = FastAPI(title='app')
GCS_BUCKET = "text-pipeline-bucket-rankush"
GCS_FOLDER = "input"
SUPPORTED_TYPES = ['pdf','docx','xlsx','html']
def upload_to_gcs(file_path: Optional[str] = None, url: Optional[str] = None) -> tuple:
    """Upload file or URL content to GCS and return paths"""
    storage_client = storage.Client()
    filename = str(uuid.uuid4())
    blob_path = f"{GCS_FOLDER}/{filename}"
    ext = None
    inputPath = ''
    outputPath = ''
    try:
        if file_path:
            ext = os.path.splitext(file_path)[1].lower().lstrip('.')
            if ext not in SUPPORTED_TYPES:
                raise ValueError(f"Unsupported file type: {ext}")

            with open(file_path, "rb") as f:
                content = f.read()

            blob = storage_client.bucket(GCS_BUCKET).blob(f"{blob_path}.{ext}")
            blob.upload_from_string(content)
            logger.info(f"Uploaded file to gs://{GCS_BUCKET}/{blob_path}.{ext}")
            inputPath = f"{blob_path}.{ext}"
        elif url:
            ext = "html"
            response = requests.get(url, timeout=30)
            response.raise_for_status()

            blob = storage_client.bucket(GCS_BUCKET).blob(f"{blob_path}.{ext}")
            blob.upload_from_string(response.text, content_type="text/html")
            logger.info(f"Uploaded URL content to gs://{GCS_BUCKET}/{blob_path}.{ext}")
            inputPath = f"{blob_path}.{ext}"

        input_gcs_path = f"gs://{GCS_BUCKET}/{blob_path}.{ext}"
        output_gcs_path = f"gs://{GCS_BUCKET}/output/{filename}.json"
        outputPath = f"output/{filename}.json"

        return inputPath, outputPath

    except Exception as e:
        logger.error(f"Upload failed: {str(e)}")
        raise

def run_pipeline_externally(input_path: str, output_path: str):
    """Execute the processing pipeline"""
    # Import your actual pipeline function here
    logger.info(f"Starting pipeline for {input_path}")
    try:
        result = text_parsing_pipeline(
            GCS_BUCKET,
            input_path,
            output_path
        )
        logger.info("Pipeline completed successfully")
        return result
    except Exception as e:
        logger.error(f"Pipeline failed: {str(e)}")
        raise

input_path, output_path = upload_to_gcs(url = 'https://www.geeksforgeeks.org/python/convert-mp3-to-wav-using-python/')
run_pipeline_externally(input_path, output_path)