<a href="https://colab.research.google.com/github/Jajar26/SentimentAnalysis_CentraleMRS25/blob/main/Code_Interm%C3%A9diaire_Eq12.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, HistGradientBoostingClassifier
from sklearn.model_selection import cross_val_score
from sklearn.svm import SVC
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score
import requests
import time
import re
from typing import List, Dict
from bs4 import BeautifulSoup

## EDA

In [None]:
# Chargement du dataset
df = pd.read_csv("investments_VC.csv", encoding='latin1')

In [None]:
df.head()

In [None]:
df.columns

In [None]:
# Nettoyage des noms de colonnes
df = df.rename(columns={' market ': "market", ' funding_total_usd ': "funding_total_usd"})

In [None]:
# Définition des mots-clés pour isoler le secteur Santé/Pharma
pharma_keywords = ['pharma', 'biotech', 'abiotechnology', 'biopharmaceutical',
                  'drug', 'therapeutics', 'medicine', 'clinical']

mask = pd.Series([False] * len(df))

# Recherche dans les catégories et les marchés
if 'category_list' in df.columns:
    for keyword in pharma_keywords:
        mask |= df['category_list'].astype(str).str.lower().str.contains(keyword, na=False)

if 'market' in df.columns:
    for keyword in pharma_keywords:
        mask |= df['market'].astype(str).str.lower().str.contains(keyword, na=False)

# Création du sous-ensemble dédié au secteur Pharma
pharma_df = df[mask].copy()
pharma_df.head()

In [None]:
# Nombre d'entreprises uniques par statut (Opérationnelle, Acquise, Fermée)
pharma_df.groupby('status')['name'].nunique()

In [None]:
sns.set_theme(style="whitegrid")
fig, ax = plt.subplots(figsize=(10, 6))

# Visualisation de la répartition par statut
sns.countplot(data=pharma_df, x='status', palette='viridis', ax=ax, hue='status', legend=False)
ax.set_title('Répartition des Statuts (Secteur Pharma)', fontsize=14)
ax.set_xlabel('Statut')
ax.set_ylabel('Nombre d\'entreprises')

plt.tight_layout()
plt.show()


In [None]:
pharma_df['funding_total_usd'] = (pharma_df['funding_total_usd']
                                  .astype(str)
                                  .str.strip()
                                  .str.replace(',', '', regex=False)
                                  .replace('-', np.nan)
                                  .replace('', np.nan))


pharma_df['funding_total_usd'] = pd.to_numeric(pharma_df['funding_total_usd'], errors='coerce')

## Features engineering

In [None]:
CUTOFF = '2012-01-01'
FUNDING_TYPES = ['seed', 'venture', 'grant', 'round_A', 'round_B']
COLS_NUMERIC = ['age', 'funding_total_usd', 'funding_rounds', 'days_to_first_funding',
                'funding_duration', 'funding_velocity', 'round_intensity']
FEATURES = COLS_NUMERIC + [f'has_{c}' for c in FUNDING_TYPES] + ['country_code']

In [None]:

def build_features(df, reference_date, fit_country_encoder=False, rare_countries=None):
    d = df.copy()
    ref = pd.to_datetime(reference_date)

    # Funding_total_usd
    d['funding_total_usd'] = pd.to_numeric(d['funding_total_usd'], errors='coerce')
    d['funding_total_usd'] = d['funding_total_usd'].fillna(d['funding_total_usd'].median())

    # Dates
    for col in ['founded_at', 'first_funding_at', 'last_funding_at']:
        d[col] = pd.to_datetime(d[col], errors='coerce')

    # Âge
    d['age'] = ((ref - d['founded_at']).dt.days / 365.25).clip(lower=0.1)

    # Durées de financement
    d['days_to_first_funding'] = (d['first_funding_at'] - d['founded_at']).dt.days
    last_funding_clipped = d['last_funding_at'].clip(upper=ref)
    d['funding_duration'] = (last_funding_clipped - d['first_funding_at']).dt.days

    # Ratios
    d['funding_velocity'] = d['funding_total_usd'] / d['age']
    d['round_intensity']  = d['funding_rounds'] / d['age']

    for col in FUNDING_TYPES:
        d[f'has_{col}'] = (d[col].fillna(0) > 0).astype(int)

    for col in COLS_NUMERIC:
        d[col] = d.groupby('market')[col].transform(lambda x: x.fillna(x.median()))
        d[col] = d[col].fillna(d[col].median())

    if fit_country_encoder:
        country_counts = d['country_code'].value_counts()
        rare_countries  = country_counts[country_counts < 10].index
    d['country_code'] = d['country_code'].replace(rare_countries, 'OTHER').fillna('UNKNOWN')

    if fit_country_encoder:
        return d, rare_countries
    return d


In [None]:

train_raw = pharma_df[
    (pd.to_datetime(pharma_df['founded_at']) < CUTOFF) &
    (pharma_df['status'] != 'operating')
].copy()

train_data, rare_countries = build_features(train_raw, CUTOFF, fit_country_encoder=True)

train_data['Profitable'] = (train_data['status'].isin(['acquired', 'ipo'])).astype(int)

print(f"Startups d'entraînement : {len(train_data)}")
print("\nÉquilibre des classes :")
print(train_data['Profitable'].value_counts())
print(f"Taux de succès : {train_data['Profitable'].mean():.1%}")
print(f"\nPays après regroupement : {train_data['country_code'].nunique()}")

X = pd.get_dummies(train_data[FEATURES], drop_first=True)
y = train_data['Profitable']

print(f"\nFeatures : {X.shape[1]}  |  NaN dans X : {X.isna().sum().sum()}")

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

scaler_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])
X_train_s = scaler_pipeline.fit_transform(X_train)
X_test_s  = scaler_pipeline.transform(X_test)

#Modèle ML

##Entrainement

In [None]:
models = {
    "Logistic Regression": LogisticRegression(class_weight='balanced', max_iter=1000, random_state=42),
    "Random Forest": RandomForestClassifier(class_weight='balanced', n_estimators=100, random_state=42),
    "Gradient Boosting": HistGradientBoostingClassifier(class_weight='balanced', max_iter=100, random_state=42),
    "SVM": SVC(class_weight='balanced', probability=True, random_state=42)
}


results = []

for name, model in models.items():
    cv_scores = cross_val_score(model, X_train_s, y_train, cv=5, scoring='f1')

    model.fit(X_train_s, y_train)
    y_pred = model.predict(X_test_s)

    results.append({
        "Modele": name,
        "Accuracy": accuracy_score(y_test, y_pred),
        "Precision": precision_score(y_test, y_pred),
        "Recall": recall_score(y_test, y_pred),
        "F1-Score": f1_score(y_test, y_pred),
    })

df_results = pd.DataFrame(results).sort_values(by="F1-Score", ascending=False)
print("RÉSULTATS DES MODÈLES")
print(df_results.to_string(index=False))

##Prédiction

In [None]:

scoring_raw = pharma_df[pharma_df['status'] == 'operating'].copy()
df_scoring  = build_features(scoring_raw, CUTOFF,
                              fit_country_encoder=False,
                              rare_countries=rare_countries)

X_scoring_raw     = pd.get_dummies(df_scoring[FEATURES], drop_first=True)
X_scoring_aligned = X_scoring_raw.reindex(columns=X.columns, fill_value=0).fillna(0)

X_scoring_scaled = scaler_pipeline.transform(X_scoring_aligned)

# Scores par modèle
for name, model in models.items():
    df_scoring[f'Score_{name}'] = model.predict_proba(X_scoring_scaled)[:, 1]

score_cols = [f'Score_{m}' for m in models]
df_scoring['Score_Consensus'] = df_scoring[score_cols].mean(axis=1)

# Affichage top 10
print("TOP 10 DES STARTUPS 'OPERATING' LES PLUS PROMETTEUSES")
cols_display = ['name', 'market', 'Score_Consensus'] + score_cols
top_10 = (df_scoring[cols_display]
          .sort_values('Score_Consensus', ascending=False)
          .head(10))

pd.options.display.float_format = '{:.1%}'.format
print(top_10.to_string(index=False))

output_path = "startups_scoring.csv"
df_scoring[cols_display].sort_values('Score_Consensus', ascending=False).to_csv(output_path, index=False, float_format='%.4f')

In [None]:
startups_scoring = pd.read_csv("startups_scoring.csv", encoding='latin1')

In [None]:
startups_scoring

#Modèle NLP

In [None]:
class MultiSourceEthicalExtractor:
    """
        Sources : ClinicalTrials.gov, PubMed et sites.
    """

    CLINICAL_TRIALS_BASE = "https://clinicaltrials.gov/api/v2/studies"
    PUBMED_BASE          = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils"
    HEADERS              = {'User-Agent': 'Mozilla/5.0'}

    def search_clinical_trials(self, startup_name: str, max_studies: int = 50) -> List[Dict]:
        """Recherche les essais cliniques terminés liés à la startup (Sponsor ou Collaborateur)."""
        queries = [
            f'AREA[OverallStatus]COMPLETED AND AREA[LeadSponsorName]"{startup_name}"',
            f'AREA[OverallStatus]COMPLETED AND AREA[CollaboratorName]"{startup_name}"',
        ]

        # Collecte des IDs uniques
        nct_ids = list({
            nct_id
            for q in queries
            for nct_id in self._search_nct_ids(q, max_studies)
        })
        print(f"[{startup_name}] ClinicalTrials : {len(nct_ids)} essais identifiés")

        return [d for nct_id in nct_ids if (d := self._extract_study_data(nct_id))]

    def _search_nct_ids(self, query: str, max_studies: int) -> List[str]:
        """Méthode interne pour récupérer les identifiants NCT depuis l'API ClinicalTrials."""
        params = {"query.term": query, "pageSize": min(max_studies, 1000), "format": "json"}
        try:
            r = requests.get(self.CLINICAL_TRIALS_BASE, params=params, timeout=30)
            r.raise_for_status()
            return [
                s['protocolSection']['identificationModule']['nctId']
                for s in r.json().get('studies', [])
            ]
        except Exception:
            return []

    def _extract_study_data(self, nct_id: str) -> Dict | None:
        """Récupère les détails textuels (titre, résumé, critères) d'un essai spécifique."""
        try:
            r = requests.get(f"{self.CLINICAL_TRIALS_BASE}/{nct_id}",
                             params={"format": "json"}, timeout=30)
            s = r.json().get('protocolSection', {})
            return {
                'source':      'clinicaltrials',
                'title':       s.get('identificationModule', {}).get('officialTitle', ''),
                'description': s.get('descriptionModule', {}).get('briefSummary', ''),
                'eligibility': s.get('eligibilityModule', {}).get('eligibilityCriteria', ''),
            }
        except Exception:
            return None

    def search_pubmed(self, startup_name: str, max_articles: int = 20) -> List[Dict]:
        """Recherche les publications scientifiques sur PubMed."""
        try:
            # Étape 1 : Recherche des PMIDs
            search_r = requests.get(
                f"{self.PUBMED_BASE}/esearch.fcgi",
                params={'db': 'pubmed', 'term': f'"{startup_name}"[Affiliation]',
                        'retmax': max_articles, 'retmode': 'json'},
                timeout=30
            )
            search_r.raise_for_status()
            pmids = search_r.json().get('esearchresult', {}).get('idlist', [])
            print(f"[{startup_name}] PubMed : {len(pmids)} publications trouvées")

            if not pmids:
                return []

            # Étape 2 : Récupération des résumés (Abstracts)
            fetch_r = requests.get(
                f"{self.PUBMED_BASE}/efetch.fcgi",
                params={'db': 'pubmed', 'id': ','.join(pmids), 'retmode': 'xml'},
                timeout=30
            )
            soup = BeautifulSoup(fetch_r.content, 'xml')
            articles = []
            for article in soup.find_all('PubmedArticle'):
                title_tag    = article.find('ArticleTitle')
                abstract_tag = article.find('AbstractText')
                articles.append({
                    'source':      'pubmed',
                    'title':       title_tag.text if title_tag else '',
                    'description': abstract_tag.text if abstract_tag else '',
                    'eligibility': '',
                })
            time.sleep(0.5)
            return articles[:max_articles]
        except Exception as e:
            print(f"[{startup_name}] PubMed erreur : {e}")
            return []

    def scrape_company_website(self, startup_name: str, website_url: str = None) -> Dict | None:
        """Scrape les sections relatives à la mission et aux valeurs sur le site web de l'entreprise."""
        url = website_url or f"https://{startup_name.lower().replace(' ', '')}.com/about"
        try:
            r = requests.get(url, headers=self.HEADERS, timeout=10)
            r.raise_for_status()
            page_text = BeautifulSoup(r.content, 'html.parser').get_text(separator=' ', strip=True)

            # Recherche d'extraits autour des mots-clés de culture d'entreprise
            keywords = ['mission', 'values', 'diversity', 'inclusion', 'ethics', 'team', 'culture']
            excerpts = [
                m
                for kw in keywords
                for m in re.findall(rf'.{{0,200}}{kw}.{{0,200}}', page_text, re.IGNORECASE)
            ]
            time.sleep(1)
            return {
                'source':      'website',
                'title':       f"{startup_name} - Mission & Values",
                'description': ' | '.join(excerpts[:5])[:1000],
                'eligibility': '',
            }
        except Exception as e:
            print(f"[{startup_name}] Site web inaccessible : {e}")
            return None


In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch

tokenizer = AutoTokenizer.from_pretrained("nbroad/ESG-BERT")
model = AutoModelForSequenceClassification.from_pretrained("nbroad/ESG-BERT")

In [None]:
class HeuristicESGScorer:
    """
    Scoring éthique via ESG-BERT (nbroad/ESG-BERT).
    """

    def compute_ethical_score(self, all_sources_data):
        if not all_sources_data:
            return {
                'ethical_investment_index': 0.0,
                'num_sources': 0,
                'data_quality_score': 0.0
            }

        full_text = " ".join([
            d.get('title', '') + " " + d.get('description', '')
            for d in all_sources_data
        ])

        inputs = tokenizer(full_text, return_tensors="pt", truncation=True, max_length=512)

        with torch.no_grad():
            outputs = model(**inputs)

        probs = torch.softmax(outputs.logits, dim=1).squeeze().tolist()

        return {
            'ethical_investment_index': round(max(probs), 4),
            'num_sources': len(all_sources_data),
            'data_quality_score': min(len(all_sources_data) / 30, 1.0)
        }

In [None]:
def run_ethical_scoring(startup_list: List[Dict], output_file: str = "ethical_scores.csv") -> pd.DataFrame:
    """Scoring pour une liste de startups donnée."""
    extractor = MultiSourceEthicalExtractor()
    scorer    = HeuristicESGScorer()
    results   = []

    print(f"Lancement du pipeline pour {len(startup_list)} entreprises")

    for i, startup in enumerate(startup_list, 1):
        name    = startup['name']
        website = startup.get('website')
        print(f"--- [{i}/{len(startup_list)}] Traitement : {name} ---")

        # Extraction multi-sources
        all_data = []
        all_data.extend(extractor.search_clinical_trials(name))
        all_data.extend(extractor.search_pubmed(name))
        if web_data := extractor.scrape_company_website(name, website):
            all_data.append(web_data)

        # Calcul des scores
        scores = scorer.compute_ethical_score(all_data)

        results.append({
          'startup_name':             name,
          'total_sources':            scores['num_sources'],
          'ethical_investment_index': scores['ethical_investment_index'],
          'data_quality_score':       scores['data_quality_score'],
        })

        time.sleep(2)  # Respect des limitations d'appels API

    # Structuration et sauvegarde
    df = pd.DataFrame(results).sort_values('ethical_investment_index', ascending=False)
    df.to_csv(output_file, index=False)
    print(f"Opération terminée. Fichier généré : {output_file}")
    return df

if __name__ == "__main__":
    startups_to_score = [
        {'name': 'Zag Bio', 'website': 'https://zagbio.com'},
        {'name': 'Azalea Therapeutics', 'website': None},
        {'name': 'Braveheart Bio', 'website': None},
        {'name': 'Pfizer', 'website': 'https://pfizer.com'},
        {'name': 'Moderna', 'website': 'https://modernatx.com'},
    ]

    df_final = run_ethical_scoring(startups_to_score)

In [None]:
ethicals_score = pd.read_csv("ethical_scores.csv", encoding='latin1')

In [None]:
ethicals_score