In [18]:
import pandas as pd, numpy as np, sqlite3, hashlib
from datetime import datetime, timedelta
from sqlalchemy import create_engine, text
from pathlib import Path

# Single DB for E1
RAW = create_engine('sqlite:///datasens.db')
print("Ready")

Ready


In [19]:
# 02: SCHEMA (6 CORE TABLES - E1 ONLY)
sql = """
CREATE TABLE IF NOT EXISTS source (
    source_id INTEGER PRIMARY KEY,
    name TEXT UNIQUE,
    source_type TEXT,
    url TEXT,
    sync_frequency TEXT,
    last_sync_date TIMESTAMP,
    retry_policy TEXT,
    active BOOLEAN
);
CREATE TABLE IF NOT EXISTS raw_data (raw_data_id INTEGER PRIMARY KEY, source_id INTEGER, title TEXT, content TEXT, url TEXT, published_at TIMESTAMP, collected_at TIMESTAMP, fingerprint TEXT UNIQUE, quality_score REAL DEFAULT 0.5);
CREATE TABLE IF NOT EXISTS sync_log (sync_log_id INTEGER PRIMARY KEY, source_id INTEGER, sync_date TIMESTAMP, rows_synced INTEGER, status TEXT);
CREATE TABLE IF NOT EXISTS topic (topic_id INTEGER PRIMARY KEY, name TEXT UNIQUE, keywords TEXT);
CREATE TABLE IF NOT EXISTS document_topic (doc_topic_id INTEGER PRIMARY KEY, raw_data_id INTEGER, topic_id INTEGER);
CREATE TABLE IF NOT EXISTS model_output (output_id INTEGER PRIMARY KEY, raw_data_id INTEGER, model_name TEXT, label TEXT, score REAL);
"""

for stmt in sql.split(';'):
    if stmt.strip():
        with RAW.connect() as conn:
            conn.execute(text(stmt))
            conn.commit()

# INSERT 10 SOURCES
sources = [
    ('rss_french_news', 'RSS', 'https://www.lemonde.fr/rss/une.xml', 'DAILY', 'SKIP'),
    ('gdelt_events', 'API', 'https://api.gdeltproject.org/api/v2/search/tv', 'DAILY', 'SKIP'),
    ('openweather_api', 'API', 'https://openweathermap.org/api', 'DAILY', 'SKIP'),
    ('insee_api', 'API', 'https://www.insee.fr/fr/accueil', 'MONTHLY', 'SKIP'),
    ('kaggle_french_opinions', 'Dataset', 'https://kaggle.com/datasets/french-opinions', 'MONTHLY', 'SKIP'),
    ('google_news_rss', 'RSS', 'https://news.google.com/rss', 'DAILY', 'SKIP'),
    ('regional_media_rss', 'RSS', 'https://www.bfmtv.com/rss/', 'DAILY', 'SKIP'),
    ('ifop_barometers', 'WebScraping', 'https://www.ifop.com/', 'YEARLY', 'FALLBACK_PREVIOUS_YEAR'),
    ('reddit_france', 'API', 'https://api.reddit.com/r/france', 'DAILY', 'SKIP'),
    ('trustpilot_reviews', 'WebScraping', 'https://www.trustpilot.com/', 'WEEKLY', 'SKIP'),
]

with RAW.connect() as conn:
    for name, stype, url, freq, retry in sources:
        conn.execute(text("""
            INSERT OR IGNORE INTO source (name, source_type, url, sync_frequency, retry_policy, active)
            VALUES (:name, :type, :url, :freq, :retry, 1)
        """), {'name': name, 'type': stype, 'url': url, 'freq': freq, 'retry': retry})
    conn.commit()

print("Schema created + 10 sources inserted")

Schema created + 10 sources inserted


In [20]:
# 03: INGEST (Smart scheduling with mock realistic data)
import feedparser, requests, json
from bs4 import BeautifulSoup
from pathlib import Path

# Mock data for testing
MOCK_ARTICLES = [
    {'title': 'Sentiment francais sur economie', 'content': 'Les Francais sont preoccupes par linflation', 'url': 'https://example.com/1', 'published_at': '2025-12-10'},
    {'title': 'Sondage barometrique climat', 'content': 'Etude montrant la prise de conscience sur le changement climatique', 'url': 'https://example.com/2', 'published_at': '2025-12-09'},
    {'title': 'Politique europeenne tensions', 'content': 'Nouvelles tensions diplomatiques', 'url': 'https://example.com/3', 'published_at': '2025-12-08'},
    {'title': 'Sante publique enjeux', 'content': 'Debats autour de la reforme du systeme de sante', 'url': 'https://example.com/4', 'published_at': '2025-12-07'},
    {'title': 'Technologie IA France', 'content': 'La France accelere ses investissements dans IA', 'url': 'https://example.com/5', 'published_at': '2025-12-06'},
    {'title': 'Emploi jeunesse statistiques', 'content': 'Taux de chomage des jeunes en baisse', 'url': 'https://example.com/6', 'published_at': '2025-12-05'},
    {'title': 'Immigration debat societal', 'content': 'Nouvelle enquete dopinion', 'url': 'https://example.com/7', 'published_at': '2025-12-04'},
    {'title': 'Ecole education reforme', 'content': 'Resultats scolaires post-reforme', 'url': 'https://example.com/8', 'published_at': '2025-12-03'},
    {'title': 'Transport mobilite durable', 'content': 'Transition vers transports propres', 'url': 'https://example.com/9', 'published_at': '2025-12-02'},
    {'title': 'Culture patrimoine francais', 'content': 'Preservation du patrimoine cultural', 'url': 'https://example.com/10', 'published_at': '2025-12-01'},
    {'title': 'Sport ligue 1 sentiment', 'content': 'Passion des supporters francais', 'url': 'https://example.com/11', 'published_at': '2025-11-30'},
    {'title': 'Tourisme France attractivite', 'content': 'Tendances du tourisme', 'url': 'https://example.com/12', 'published_at': '2025-11-29'},
]

# Get active sources from DB
with RAW.connect() as conn:
    active_sources = conn.execute(text("SELECT source_id, name FROM source WHERE active = 1")).fetchall()

print(f"Found {len(active_sources)} active sources")

# INGEST
total_ingested = 0
with RAW.connect() as conn:
    for idx, (source_id, source_name) in enumerate(active_sources):
        # Get mock articles (rotate through mock data)
        articles = MOCK_ARTICLES[idx*2:(idx*2)+2] if idx*2 < len(MOCK_ARTICLES) else MOCK_ARTICLES[:2]
        
        print(f"INGEST {source_name}...", end=" ")
        
        inserted = 0
        for article in articles:
            try:
                fp = hashlib.sha256((article['title'] + article['url']).encode()).hexdigest()
                conn.execute(text("""
                    INSERT OR IGNORE INTO raw_data 
                    (source_id, title, content, url, fingerprint, published_at, collected_at, quality_score)
                    VALUES (:source_id, :title, :content, :url, :fp, :published, datetime('now'), 0.8)
                """), {
                    'source_id': source_id,
                    'title': article['title'],
                    'content': article['content'],
                    'url': article['url'],
                    'fp': fp,
                    'published': article['published_at']
                })
                inserted += 1
            except Exception as e:
                print(f"Error: {e}")
        
        conn.commit()
        print(f"{inserted} articles")
        total_ingested += inserted

print(f"\nTOTAL INGESTED: {total_ingested} articles")

Found 10 active sources
INGEST rss_french_news... 2 articles
INGEST gdelt_events... 2 articles
INGEST openweather_api... 2 articles
INGEST insee_api... 2 articles
INGEST kaggle_french_opinions... 2 articles
INGEST google_news_rss... 2 articles
INGEST regional_media_rss... 2 articles
INGEST ifop_barometers... 2 articles
INGEST reddit_france... 2 articles
INGEST trustpilot_reviews... 2 articles

TOTAL INGESTED: 20 articles


In [21]:
# 04: QUALITY CHECK (in-memory)
df = pd.read_sql("SELECT * FROM raw_data", RAW)

if len(df) > 0:
    # Normalize
    df['title'] = df['title'].fillna('').str.strip()
    df['content'] = df['content'].fillna('').str.strip()
    
    # Quality scoring
    def quality(row):
        score = 0.5
        if len(str(row['title']).strip()) > 10: score += 0.2
        if len(str(row['content']).strip()) > 50: score += 0.3
        return min(score, 1.0)
    
    df['quality_score'] = df.apply(quality, axis=1)
    
    # Dedup detection (simple set-based)
    def fp(text):
        return hashlib.sha256(str(text).encode()).hexdigest()
    
    seen = set()
    duplicates = []
    for idx, row in df.iterrows():
        hash_val = fp(row['title'] + row['content'])
        if hash_val in seen:
            duplicates.append(True)
        else:
            seen.add(hash_val)
            duplicates.append(False)
    
    df['is_duplicate'] = duplicates
    
    # Filter low quality
    df_clean = df[~df['is_duplicate'] & (df['quality_score'] >= 0.5)].copy()
    
    print(f"Quality check: {len(df_clean)}/{len(df)} articles passed")
else:
    df_clean = df
    print("No raw data to clean")

Quality check: 12/12 articles passed


In [22]:
# 05: METRICS & SUMMARY
with RAW.connect() as conn:
    raw_count = conn.execute(text("SELECT COUNT(*) FROM raw_data")).scalar()
    raw_sources = conn.execute(text("SELECT COUNT(*) FROM source")).scalar()
    topics_count = conn.execute(text("SELECT COUNT(*) FROM topic")).scalar()

print("\n" + "="*60)
print("E1 PIPELINE COMPLETE")
print("="*60)
print(f"RAW: {raw_sources} sources, {raw_count:,} articles")
print(f"Quality: {len(df_clean):,} articles passed filters")
if len(df) > 0:
    print(f"Compression: {100*(1-len(df_clean)/len(df)):.1f}% removed")
else:
    print("Compression: No data to compress")
print(f"Topics: {topics_count} defined")
print(f"\nReady for EXPORT GOLD (Parquet)")


E1 PIPELINE COMPLETE
RAW: 10 sources, 12 articles
Quality: 12 articles passed filters
Compression: 0.0% removed
Topics: 0 defined

Ready for EXPORT GOLD (Parquet)


In [None]:
# 05: EXPORT GOLD (Parquet) - DATASET PRET POUR IA
import pyarrow as pa
import pyarrow.parquet as pq

# Load all data
df_raw = pd.read_sql("SELECT * FROM raw_data", RAW)
df_sources = pd.read_sql("SELECT * FROM source", RAW)
df_topics = pd.read_sql("SELECT * FROM topic", RAW)
df_doc_topics = pd.read_sql("SELECT * FROM document_topic", RAW)
df_models = pd.read_sql("SELECT * FROM model_output", RAW)

# Join sources
df_gold = df_raw.merge(df_sources[['source_id', 'name', 'source_type']], on='source_id', how='left')
df_gold.rename(columns={'name': 'source_name'}, inplace=True)

# Join topics (multiple topics per article)
df_doc_topics_named = df_doc_topics.merge(df_topics[['topic_id', 'name']], on='topic_id', how='left')
df_topics_agg = df_doc_topics_named.groupby('raw_data_id')['name'].apply(lambda x: '|'.join(x)).reset_index()
df_topics_agg.rename(columns={'name': 'topics'}, inplace=True)
df_gold = df_gold.merge(df_topics_agg, on='raw_data_id', how='left')
df_gold['topics'] = df_gold['topics'].fillna('')

# Join model outputs (sentiment/labels)
df_models_latest = df_models.sort_values('output_id').drop_duplicates('raw_data_id', keep='last')
df_gold = df_gold.merge(df_models_latest[['raw_data_id', 'model_name', 'label', 'score']], on='raw_data_id', how='left')
df_gold.rename(columns={'label': 'sentiment_label', 'score': 'sentiment_score'}, inplace=True)

# Add partition column
df_gold['partition_date'] = pd.to_datetime(df_gold['published_at']).dt.date

# Clean columns for export
df_gold_export = df_gold[[
    'raw_data_id', 'source_id', 'source_name', 'source_type',
    'title', 'content', 'url', 'published_at', 'collected_at',
    'topics', 'sentiment_label', 'sentiment_score',
    'partition_date'
]].copy()

# Create GOLD directory
gold_dir = Path('data/gold/articles_parquet')
gold_dir.mkdir(parents=True, exist_ok=True)

# Export to Parquet (partitioned by date)
df_gold_export.to_parquet(
    gold_dir,
    partition_cols=['partition_date'],
    compression='snappy',
    index=False,
    engine='pyarrow'
)

# Verify
partitions = list(gold_dir.glob('partition_date=*/'))
print(f"\n" + "="*60)
print("GOLD ZONE EXPORTED (Ready for IA)")
print("="*60)
print(f"Total rows: {len(df_gold_export):,}")
print(f"Partitions: {len(partitions)}")
print(f"Format: Parquet (snappy)")
print(f"Location: {gold_dir}")
print(f"Columns: {len(df_gold_export.columns)}")
print(f"\nDataset ready for fine-tuning IA models!")



GOLD ZONE EXPORTED (Ready for IA)
Total rows: 12
Partitions: 12
Format: Parquet (snappy)
Location: data\gold\articles_parquet
Columns: 13

Dataset ready for fine-tuning IA models!


: 