In [None]:
import feedparser
import json
from datetime import datetime
import email.utils

RSS_FEED_URL = [
    'http://www.dn.se/nyheter/m/rss/',
    'https://rss.aftonbladet.se/rss2/small/pages/sections/senastenytt/',
    'https://feeds.expressen.se/nyheter/',
    'http://www.svd.se/?service=rss',
    'http://api.sr.se/api/rss/program/83?format=145',
    'http://www.svt.se/nyheter/rss.xml'
]

def fetch_and_parse_feeds():
    """Fetch and parse RSS feeds"""
    posts = []

    for url in RSS_FEED_URL:
        try:
            feed = feedparser.parse(url)
            for entry in feed.entries:
                published_raw = entry.get('published', '')
                # Remove the trailing comma if published_raw is a tuple
                if isinstance(published_raw, tuple):
                    published_raw = published_raw[0]
                
                published_formatted = ""
                # Try the email.utils parser first (handles RFC 2822 format with timezone)
                try:
                    parsed_time_tuple = email.utils.parsedate_tz(published_raw)
                    if parsed_time_tuple:
                        # Convert time tuple to UTC timestamp
                        timestamp = email.utils.mktime_tz(parsed_time_tuple)
                        # Convert timestamp to datetime object
                        dt = datetime.fromtimestamp(timestamp)
                        published_formatted = dt.strftime("%Y-%m-%d %H:%M:%S")
                except:
                    # Fallback to previous method if email.utils parser fails
                    possible_formats = [
                        "%a, %d %b %Y %H:%M:%S %z",  # Format with timezone offset
                        "%a, %d %b %Y %H:%M:%S %Z",
                        "%Y-%m-%dT%H:%M:%SZ",
                        "%Y-%m-%dT%H:%M:%S.%fZ",
                        "%Y-%m-%d %H:%M:%S"
                    ]
                    
                    for fmt in possible_formats:
                        try:
                            parsed_date = datetime.strptime(published_raw, fmt)
                            published_formatted = parsed_date.strftime(
                                "%Y-%m-%d %H:%M:%S")
                            break
                        except ValueError:
                            continue

                post = {
                    'title': entry.get('title', 'No title'),
                    'link': entry.get('link', 'No link'),
                    'summary': entry.get('summary', 'No summary'),
                    "published": published_formatted
                }
                posts.append(post)
        except Exception as e:
            print(f"Failed to parse feed from {url}: {e}")
    return posts



if __name__ == "__main__":
    feeds = fetch_and_parse_feeds()
    # Print to verify
    print(json.dumps(feeds, indent=2))
    print(len(feeds))

# Expose feeds for import
__all__ = ['feeds']

In [17]:
# rss_feed_parser.py
import feedparser
import json
from datetime import datetime
import email.utils

class RSSFeedParser:
    def __init__(self, feed_urls):
        self.feed_urls = feed_urls

    def fetch_and_parse_feeds(self):
        """Fetch and parse RSS feeds"""
        posts = []

        for url in self.feed_urls:
            try:
                feed = feedparser.parse(url)
                for entry in feed.entries:
                    published_raw = entry.get('published', '')
                    # Remove the trailing comma if published_raw is a tuple
                    if isinstance(published_raw, tuple):
                        published_raw = published_raw[0]
                    
                    published_formatted = ""
                    # Try the email.utils parser first (handles RFC 2822 format with timezone)
                    try:
                        parsed_time_tuple = email.utils.parsedate_tz(published_raw)
                        if parsed_time_tuple:
                            # Convert time tuple to UTC timestamp
                            timestamp = email.utils.mktime_tz(parsed_time_tuple)
                            # Convert timestamp to datetime object
                            dt = datetime.fromtimestamp(timestamp)
                            published_formatted = dt.strftime("%Y-%m-%d %H:%M:%S")
                    except:
                        # Fallback to previous method if email.utils parser fails
                        possible_formats = [
                            "%a, %d %b %Y %H:%M:%S %z",  # Format with timezone offset
                            "%a, %d %b %Y %H:%M:%S %Z",
                            "%Y-%m-%dT%H:%M:%SZ",
                            "%Y-%m-%dT%H:%M:%S.%fZ",
                            "%Y-%m-%d %H:%M:%S"
                        ]
                        
                        for fmt in possible_formats:
                            try:
                                parsed_date = datetime.strptime(published_raw, fmt)
                                published_formatted = parsed_date.strftime(
                                    "%Y-%m-%d %H:%M:%S")
                                break
                            except ValueError:
                                continue

                    post = {
                        'title': entry.get('title', 'No title'),
                        'link': entry.get('link', 'No link'),
                        'summary': entry.get('summary', 'No summary'),
                        "published": published_formatted
                    }
                    posts.append(post)
            except Exception as e:
                print(f"Failed to parse feed from {url}: {e}")
        return posts

In [25]:
# rss_feed_saver.py
import json
from datetime import datetime
import os

class RSSFeedSaver:
    def __init__(self, parser):
        self.parser = parser
    
    def save_feeds(self):
        # Get feeds from the parser
        feeds = self.parser.fetch_and_parse_feeds()
        
        # Generate filename with current date and time
        current_time = datetime.now()
        filename = f"feeds_{current_time.strftime('%y%m%d_%H%M%S')}.json"
        
        # Create directory if it doesn't exist
        os.makedirs('feeds', exist_ok=True)
        filepath = os.path.join('feeds', filename)
        
        # Save feeds to JSON file
        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(feeds, f, indent=2, ensure_ascii=False)
        
        print(f"Saved {len(feeds)} feed entries to {filepath}")
        return filepath, len(feeds)

In [19]:
# main.py (example usage)
if __name__ == "__main__":
    # Define your RSS feed URLs
    RSS_FEED_URLS = [
        'http://www.dn.se/nyheter/m/rss/',
        'https://rss.aftonbladet.se/rss2/small/pages/sections/senastenytt/',
        'https://feeds.expressen.se/nyheter/',
        'http://www.svd.se/?service=rss',
        'http://api.sr.se/api/rss/program/83?format=145',
        'http://www.svt.se/nyheter/rss.xml'
    ]
    
    # Create parser and saver
    parser = RSSFeedParser(RSS_FEED_URLS)
    saver = RSSFeedSaver(parser)
    
    # Save feeds
    filepath, count = saver.save_feeds()
    
    # Print summary
    print(f"Successfully saved {count} feed entries to {filepath}")

Saved 243 feed entries to feeds\feeds_250216_160327.json
Successfully saved 243 feed entries to feeds\feeds_250216_160327.json


In [26]:
import re
import sys
import warnings
import nltk
import pandas as pd
import numpy as np
from nltk.corpus import stopwords
from nltk.stem.snowball import SnowballStemmer
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer

class TextPreprocessor:
    def __init__(self, nltk_data_path='/c:/DIAD/ML/Tasks/T2/nltk_data'):
        # Initialize NLTK
        nltk.data.path.append(nltk_data_path)
        
        # Download resources only if needed
        try:
            nltk.data.find('corpora/stopwords')
        except LookupError:
            nltk.download('stopwords', quiet=True)
            
        try:
            nltk.data.find('tokenizers/punkt')
        except LookupError:
            nltk.download('punkt', quiet=True)
        
        self.stop_words = set(stopwords.words('swedish'))
        self.stemmer = SnowballStemmer("swedish")
        self.categories = None
        
        # Suppress warnings
        if not sys.warnoptions:
            warnings.simplefilter("ignore")
    
    def clean_text(self, text):
        """Enhanced text cleaning"""
        return (text
                .str.lower()
                .str.replace(r'http\S+|www\S+', '', regex=True)  # remove URLs
                .str.replace(r'[^\w\s]', '', regex=True)         # remove punctuation - fixed with r prefix
                .str.replace(r'\d+', '', regex=True)             # remove digits - fixed with r prefix
                .str.replace(r'<.*?>', '', regex=True)           # remove HTML tags
                .str.replace(r'\s+', ' ', regex=True)            # normalize whitespace
                .str.strip())                                   # strip leading/trailing whitespace
    
    def remove_stop_words(self, sentence):
        """Remove stop words from text"""
        return " ".join([word for word in nltk.word_tokenize(sentence)
                        if word not in self.stop_words])
    
    def stem_text(self, sentence):
        """Apply stemming to text"""
        return " ".join(self.stemmer.stem(word) for word in sentence.split())
    
    def prepare_data(self, data_path, apply_stemming=False):
        """Main data preparation pipeline"""
        try:
            # Load and shuffle data
            data_raw = pd.read_csv(data_path).sample(frac=1, random_state=42)
        except FileNotFoundError:
            raise FileNotFoundError(f"Could not find data file at {data_path}")
        except pd.errors.EmptyDataError:
            raise ValueError(f"The file at {data_path} is empty")
        
        # Handle missing values
        if data_raw['Heading'].isnull().any():
            print(f"Warning: Found {data_raw['Heading'].isnull().sum()} null values in 'Heading' column")
            data_raw = data_raw.dropna(subset=['Heading'])
        
        # Get category columns
        self.categories = list(data_raw.columns.values)[2:]
        
        # Clean and process text
        data_raw['Heading'] = self.clean_text(data_raw['Heading'])
        data_raw['Heading'] = data_raw['Heading'].apply(self.remove_stop_words)
        
        if apply_stemming:
            data_raw['Heading'] = data_raw['Heading'].apply(self.stem_text)
        
        return data_raw
    
    def create_train_test_split(self, data, test_size=0.20):
        """Split data into train and test sets"""
        # Validation
        if 'Heading' not in data.columns:
            raise ValueError("Data must contain 'Heading' column")
        
        if len(data) < 10:  # arbitrary minimum
            raise ValueError(f"Not enough data: {len(data)} rows")
        
        # Split data
        train, test = train_test_split(
            data, random_state=42, test_size=test_size, shuffle=True)
        
        # Create TF-IDF features
        vectorizer = TfidfVectorizer(
            strip_accents='unicode',
            analyzer='word',
            ngram_range=(1, 3),
            norm='l2'
        )
        
        # Fit and transform training data
        x_train = vectorizer.fit_transform(train['Heading'])
        y_train = train.drop(labels=['Id', 'Heading'], axis=1)
        
        # Transform test data
        x_test = vectorizer.transform(test['Heading'])
        y_test = test.drop(labels=['Id', 'Heading'], axis=1)
        
        return x_train, y_train, x_test, y_test, vectorizer

In [29]:
import os
import logging
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.multiclass import OneVsRestClassifier
from sklearn.multioutput import MultiOutputClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import MultinomialNB
import pandas as pd

class TextClassifier:
   def __init__(self):
       self.models = self._initialize_models()
       self.results = {}
       self.best_models = {}
       self._setup_logging()

   def _setup_logging(self):
       # Create directory if it doesn't exist
       os.makedirs('logs', exist_ok=True)
       filepath = os.path.join('logs', 'mltraining.log')
       logging.basicConfig(filename=filepath, level=logging.INFO, format='%(asctime)s - %(message)s')

   def _initialize_models(self):
       return {
           "Logistic Regression": self._create_model_info(LogisticRegression(max_iter=1000), {
               "C": [0.1, 1.0, 10.0],
               "penalty": ["l1", "l2"],
               "solver": ["liblinear"],
           }),
           "Linear SVC": self._create_model_info(LinearSVC(max_iter=1000), {
               "C": [0.1, 1.0, 10.0],
               "loss": ["squared_hinge"],
               "penalty": ["l1", "l2"],
               "dual": [False]
           }),
           "SVM": self._create_model_info(SVC(probability=True), {
               "C": [0.1, 1.0, 10.0],
               "kernel": ["linear", "rbf"],
               "class_weight": [None, "balanced"]
           }),
           "Random Forest": self._create_model_info(RandomForestClassifier(random_state=42), {
               "n_estimators": [100, 200],
               "max_depth": [None, 10, 20],
               "min_samples_split": [2, 5],
               "class_weight": [None, "balanced"]
           }),
           "Naive Bayes": self._create_model_info(MultinomialNB(), {
               "alpha": [0.1, 0.5, 1.0],
               "fit_prior": [True, False]
           })
       }

   def _create_model_info(self, estimator, param_grid):
       return {
           "classifier": OneVsRestClassifier(estimator) if estimator != RandomForestClassifier else MultiOutputClassifier(estimator),
           "param_grid": {f"estimator__{k}": v for k, v in param_grid.items()}
       }

   def train_and_evaluate(self, x_train, y_train, x_test, y_test):
       logging.info("Training and evaluating models...")
       for model_name, model_info in self.models.items():
           self._train_model(model_name, model_info, x_train, y_train, x_test, y_test)

   def _train_model(self, model_name, model_info, x_train, y_train, x_test, y_test):
       logging.info(f"\n=== {model_name} ===")
       grid = GridSearchCV(model_info["classifier"], model_info["param_grid"], cv=5, scoring="accuracy", n_jobs=-1, return_train_score=True)
       grid.fit(x_train, y_train)

       self.best_models[model_name] = grid.best_estimator_
       y_pred = grid.predict(x_test)
       self.results[model_name] = self._calculate_metrics(y_test, y_pred)

       self._log_grid_results(grid)
       self._log_results(model_name, y_test, y_pred)

   def _calculate_metrics(self, y_test, y_pred):
       return {
           'accuracy': accuracy_score(y_test, y_pred),
           'precision_micro': precision_score(y_test, y_pred, average='micro'),
           'precision_macro': precision_score(y_test, y_pred, average='macro'),
           'recall_micro': recall_score(y_test, y_pred, average='micro'),
           'recall_macro': recall_score(y_test, y_pred, average='macro'),
           'f1_micro': f1_score(y_test, y_pred, average='micro'),
           'f1_macro': f1_score(y_test, y_pred, average='macro'),
       }

   def _log_grid_results(self, grid):
       logging.info("\nGrid Search Results:")
       for i in range(len(grid.cv_results_['params'])):
           logging.info(f"Parameters: {grid.cv_results_['params'][i]}")
           logging.info(f"Mean Test Score: {grid.cv_results_['mean_test_score'][i]}")
           logging.info(f"Rank: {grid.cv_results_['rank_test_score'][i]}")

   def _log_results(self, model_name, y_test, y_pred):
       metrics = self.results[model_name]
       logging.info(f"Results for {model_name}: {metrics}")

   def get_results_summary(self):
       df = pd.DataFrame.from_dict(self.results, orient='index')
       return df.sort_values(by='accuracy', ascending=False)

   def get_best_model(self):
       results_df = self.get_results_summary()
       best_model_name = results_df.index[0]
       return best_model_name, self.best_models[best_model_name]

def main():
   preprocessor = TextPreprocessor()
   data_path = "./Book1.csv"
   processed_data = preprocessor.prepare_data(data_path, apply_stemming=False)
   x_train, y_train, x_test, y_test, vectorizer = preprocessor.create_train_test_split(processed_data)

   classifier = TextClassifier()
   classifier.train_and_evaluate(x_train, y_train, x_test, y_test)

   logging.info("\nFinal Model Comparison:")
   logging.info(classifier.get_results_summary())

   best_model_name, best_model = classifier.get_best_model()
   logging.info(f"\nBest performing model: {best_model_name}")

if __name__ == "__main__":
   main()
