In [2]:
!pip install newspaper3k lxml_html_clean fastapi uvicorn nest-asyncio pyngrok transformers torch newspaper3k nltk &> /dev/null

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz
!tar xf spark-3.4.0-bin-hadoop3.tgz
!pip install -q findspark pyspark

In [3]:
!ngrok config add-authtoken 2xmOnusF7Uhy7I9ChbKLH47TOzb_635DbfWmkg2yRmCY1Vmk

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [9]:
global results_df
results_df = None

In [None]:
import os
import requests
import time
import re
from datetime import datetime
from urllib.parse import urlparse
import torch
from transformers import pipeline
from newspaper import Article
from fastapi import FastAPI, Request
import nest_asyncio
from pyngrok import ngrok
import uvicorn

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.0-bin-hadoop3"

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType, StructType, StructField, ArrayType
import json

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("SparkSentimentAnalysis") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

print(f"Spark initialized with {spark.sparkContext.defaultParallelism} cores")

API_KEY = "03797cb80667beed8ea1bc74341941d5"
GNEWS_API_URL = "https://gnews.io/api/v4/search"

device = 0 if torch.cuda.is_available() else -1
print(f"Using device: {'GPU' if device == 0 else 'CPU'}")

try:
    summarizer = pipeline("summarization",
                         model="facebook/bart-large-cnn",
                         device=device)
    sentiment_analyzer = pipeline("text-classification",
                         model="tabularisai/multilingual-sentiment-analysis",
                         device=device)
    print("✓ ML models loaded successfully")
except Exception as e:
    print(f"Error loading models: {e}")
    summarizer = pipeline("summarization", device=device)
    sentiment_analyzer = pipeline("sentiment-analysis", device=device)

def clean_text_spark(text):
    """Clean text for Spark processing"""
    if not text:
        return ""

    text = re.sub(r'[^\x20-\x7E\n\r\t]', ' ', text)
    text = re.sub(r'\s+', ' ', text)
    text = text.strip()

    lines = text.split('\n')
    cleaned = []
    for line in lines:
        line = line.strip()
        if len(line) > 5 and not line.startswith('ADVERTISEMENT'):
            cleaned.append(line)

    return ' '.join(cleaned)

def extract_article_content(url):
    """Extract article content using newspaper3k - Spark compatible"""
    try:
        article = Article(url)
        article.download()
        article.parse()

        if not article.text:
            return None

        return {
            'url': url,
            'title': article.title or "No Title",
            'date': article.publish_date.strftime("%Y-%m-%d") if article.publish_date else "Unknown",
            'source': urlparse(url).netloc,
            'text': clean_text_spark(article.text),
            'status': 'success'
        }
    except Exception as e:
        return {
            'url': url,
            'title': "Extraction Failed",
            'date': "Unknown",
            'source': urlparse(url).netloc if url else "Unknown",
            'text': "",
            'status': f'error: {str(e)[:100]}'
        }

clean_text_udf = udf(clean_text_spark, StringType())
extract_article_udf = udf(extract_article_content, StringType())

def get_gnews_articles(topic, max_articles=20):
    """Fetch articles from GNews API with increased limit for Spark processing"""
    params = {
        "q": topic,
        "lang": "en",
        "max": max_articles,
        "token": API_KEY
    }

    try:
        response = requests.get(GNEWS_API_URL, params=params)
        response.raise_for_status()
        articles = response.json().get("articles", [])

        results = []
        for item in articles:
            results.append({
                'title': item.get("title", ""),
                'url': item.get("url", ""),
                'publishedAt': item.get("publishedAt", ""),
                'source': item.get("source", {}).get("name", "")
            })

        print(f"✓ Found {len(results)} articles for topic '{topic}'")
        return results
    except Exception as e:
        print(f"✗ GNews API search failed: {e}")
        return []

def process_articles_with_spark(articles_data):
    """Process articles using Spark for distributed extraction"""
    if not articles_data:
        return []

    print(f"🚀 Processing {len(articles_data)} articles with Spark...")

    articles_df = spark.createDataFrame(articles_data)
    print(f"✓ Created Spark DataFrame with {articles_df.count()} articles")

    def extract_single_article(row):
        return extract_article_content(row['url'])

    articles_rdd = articles_df.rdd
    extracted_rdd = articles_rdd.map(extract_single_article)

    successful_articles = extracted_rdd.filter(lambda x: x and x['status'] == 'success' and len(x['text']) > 50)

    processed_articles = successful_articles.collect()
    print(f"✓ Successfully extracted {len(processed_articles)} articles using Spark")

    return processed_articles

def safe_summarize_batch(articles):
    """Batch summarization for better performance"""
    results = []

    for article in articles:
        try:
            text = article['text']
            if len(text) < 20:
                summary = "Content too short to summarize."
            else:
                max_length = min(10000, len(text))
                truncated_text = text[:max_length]

                summary_result = summarizer(
                    truncated_text,
                    max_length=120,
                    min_length=30,
                    do_sample=False,
                    truncation=True
                )

                summary = summary_result[0]['summary_text']

        except Exception as e:
            print(f"Summarization failed for article: {e}")
            summary = "Summarization failed."

        results.append({
            **article,
            'summary': summary
        })

    return results

def safe_sentiment_batch(articles):
    """Batch sentiment analysis for better performance"""
    results = []

    for article in articles:
        try:
            text = article['text']

            if len(text) < 10:
                sentiment = {'label': 'NEUTRAL', 'score': 0.5}
            else:
                sentiment_result = sentiment_analyzer(text, truncation=True)
                sentiment = sentiment_result[0]

        except Exception as e:
            print(f"Sentiment analysis failed for article: {e}")
            sentiment = {'label': 'NEUTRAL', 'score': 0.5}

        results.append({
            **article,
            'sentiment': sentiment
        })

    return results

def process_complete_pipeline(topic, max_articles=20):
    """Complete processing pipeline using Spark + ML models"""
    print(f"🔍 Starting analysis for topic: '{topic}'")

    articles_data = get_gnews_articles(topic, max_articles)
    if not articles_data:
        return []

    extracted_articles = process_articles_with_spark(articles_data)
    if not extracted_articles:
        print("No articles successfully extracted")
        return []

    print(f"🤖 Running ML inference on {len(extracted_articles)} articles...")

    summarized_articles = safe_summarize_batch(extracted_articles)
    final_results = safe_sentiment_batch(summarized_articles)

    sentiment_data = []
    for article in final_results:
        sentiment_data.append({
            'title': article['title'],
            'url': article['url'],
            'sentiment_label': article['sentiment']['label'],
            'sentiment_score': float(article['sentiment']['score'])
        })

    if sentiment_data:
        sentiment_df = spark.createDataFrame(sentiment_data)

        total_articles = sentiment_df.count()
        avg_sentiment_score = sentiment_df.agg({'sentiment_score': 'avg'}).collect()[0][0]

        sentiment_distribution = sentiment_df.groupBy('sentiment_label').count().collect()
        sentiment_stats = {row['sentiment_label']: row['count'] for row in sentiment_distribution}
    else:
        total_articles = 0
        avg_sentiment_score = 0.0
        sentiment_stats = {}

    print(f"✅ Analysis complete!")
    print(f"   📊 Total articles processed: {total_articles}")
    print(f"   🎭 Average sentiment score: {avg_sentiment_score:.3f}")
    print(f"   📈 Sentiment distribution: {sentiment_stats}")

    return {
        'articles': final_results,
        'analytics': {
            'total_articles': total_articles,
            'avg_sentiment_score': float(avg_sentiment_score),
            'sentiment_distribution': sentiment_stats,
            'processing_method': 'Spark + Transformers Hybrid'
        }
    }

ERROR:asyncio:Task exception was never retrieved
future: <Task finished name='Task-21' coro=<Server.serve() done, defined at /usr/local/lib/python3.11/dist-packages/uvicorn/server.py:69> exception=KeyboardInterrupt()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/uvicorn/main.py", line 580, in run
    server.run()
  File "/usr/local/lib/python3.11/dist-packages/uvicorn/server.py", line 67, in run
    return asyncio.run(self.serve(sockets=sockets))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/nest_asyncio.py", line 30, in run
    return loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/nest_asyncio.py", line 92, in run_until_complete
    self._run_once()
  File "/usr/local/lib/python3.11/dist-packages/nest_asyncio.py", line 133, in _run_once
    handle._run()
  File "/usr/lib/python3.11/asyncio/events.py", line 84, in _run
    s

Spark initialized with 2 cores
Using device: GPU


Device set to use cuda:0


config.json:   0%|          | 0.00/902 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/541M [00:00<?, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/125 [00:00<?, ?B/s]

Device set to use cuda:0


✓ ML models loaded successfully


In [41]:

app = FastAPI(title="Spark-Powered Sentiment Analysis", version="2.0")

@app.post("/analyze/")
async def analyze_topic(request: Request):
    """Analyze sentiment for a given topic using Spark + ML"""
    try:
        data = await request.json()
        topic = data.get("topic", "")
        max_articles = data.get("max_articles", 20)

        if not topic:
            return {"error": "No topic provided"}

        # Process using Spark pipeline
        results = process_complete_pipeline(topic, max_articles)

        if not results:
            print("NO ARTICLES PROCESSED")
            return {"error": "No articles could be processed for this topic"}

        return {
            "topic": topic,
            "timestamp": datetime.now().isoformat(),
            "results": results['articles'],
            "analytics": results['analytics']
        }

    except Exception as e:
        print("PROCESSING FAILED", e)
        return {"error": f"Processing failed: {str(e)}"}

@app.get("/health/")
async def health_check():
    """Health check endpoint"""
    return {
        "status": "healthy",
        "spark_status": "running" if spark else "not available",
        "ml_models": "loaded",
        "timestamp": datetime.now().isoformat()
    }

@app.get("/")
async def root():
    """Root endpoint with API info"""
    return {
        "message": "Spark-Powered Sentiment Analysis API",
        "version": "2.0",
        "endpoints": {
            "analyze": "POST /analyze/ - Analyze sentiment for a topic",
            "health": "GET /health/ - Health check"
        },
        "features": [
            "Apache Spark for distributed processing",
            "Hugging Face transformers for ML",
            "Parallel article extraction",
            "Batch ML inference",
            "Real-time analytics"
        ]
    }

# Launch server
if __name__ == "__main__":
    print("🚀 Starting Spark-powered Sentiment Analysis Server...")

    # Launch ngrok tunnel
    public_url = ngrok.connect(8000, url="mudfish-glorious-jackal.ngrok-free.app")
    print(f"🌐 Public URL: {public_url}")
    print(f"📡 API endpoint: {public_url}/analyze/")
    print(f"❤️  Health check: {public_url}/health/")

    # Apply nest_asyncio for Colab compatibility
    nest_asyncio.apply()

    # Run the server
    uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")



🚀 Starting Spark-powered Sentiment Analysis Server...
🌐 Public URL: NgrokTunnel: "https://mudfish-glorious-jackal.ngrok-free.app" -> "http://localhost:8000"
📡 API endpoint: NgrokTunnel: "https://mudfish-glorious-jackal.ngrok-free.app" -> "http://localhost:8000"/analyze/
❤️  Health check: NgrokTunnel: "https://mudfish-glorious-jackal.ngrok-free.app" -> "http://localhost:8000"/health/


INFO:     Started server process [720]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)


🔍 Starting analysis for topic: 'Air India'
✓ Found 10 articles for topic 'Air India'
🚀 Processing 10 articles with Spark...
✓ Created Spark DataFrame with 10 articles


Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


✓ Successfully extracted 6 articles using Spark
🤖 Running ML inference on 6 articles...
✅ Analysis complete!
   📊 Total articles processed: 6
   🎭 Average sentiment score: 0.458
   📈 Sentiment distribution: {'Very Positive': 1, 'Neutral': 4, 'Negative': 1}
INFO:     122.172.86.173:0 - "POST /analyze/ HTTP/1.1" 200 OK
🔍 Starting analysis for topic: 'Air India'
✓ Found 10 articles for topic 'Air India'
🚀 Processing 10 articles with Spark...
✓ Created Spark DataFrame with 10 articles
✓ Successfully extracted 9 articles using Spark
🤖 Running ML inference on 9 articles...


You seem to be using the pipelines sequentially on GPU. In order to maximize efficiency please use a dataset
Your max_length is set to 120, but your input_length is only 114. Since this is a summarization task, where outputs shorter than the input are typically wanted, you might consider decreasing max_length manually, e.g. summarizer('...', max_length=57)


✅ Analysis complete!
   📊 Total articles processed: 9
   🎭 Average sentiment score: 0.443
   📈 Sentiment distribution: {'Very Positive': 1, 'Neutral': 6, 'Positive': 1, 'Negative': 1}
INFO:     122.172.86.173:0 - "POST /analyze/ HTTP/1.1" 200 OK
🔍 Starting analysis for topic: 'Air India'
✓ Found 10 articles for topic 'Air India'
🚀 Processing 10 articles with Spark...
✓ Created Spark DataFrame with 10 articles
✓ Successfully extracted 9 articles using Spark
🤖 Running ML inference on 9 articles...


Your max_length is set to 120, but your input_length is only 108. Since this is a summarization task, where outputs shorter than the input are typically wanted, you might consider decreasing max_length manually, e.g. summarizer('...', max_length=54)


✅ Analysis complete!
   📊 Total articles processed: 9
   🎭 Average sentiment score: 0.442
   📈 Sentiment distribution: {'Very Positive': 1, 'Neutral': 6, 'Positive': 1, 'Negative': 1}
INFO:     122.172.86.173:0 - "POST /analyze/ HTTP/1.1" 200 OK


INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [720]
