# LLM-driven stance and category inference via prompt engineering

## Using LLama and Falcon:

In [None]:
import os, re, json, time
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM

# Config
HF_TOKEN       = os.getenv('HF_TOKEN','SECRET')
MODEL_NAME     = 'tiiuae/Falcon3-7B-Instruct' #"meta-llama/Llama-3.1-8B-Instruct"
INPUT_FILE     = "/content/drive/MyDrive/ReligiousHateSpeech_DataSet.txt"
OUTPUT_FILE    = "/content/drive/MyDrive/Falcon3-7B-Instruct_Stance and Category.tsv"
BATCH_SIZE     = 2
MAX_RETRIES    = 3
RETRY_DELAY    = 2
STANCE_CHOICES = ['anti-Christianity','anti-Islam','anti-Hinduism','anti-Buddhism','anti-Atheism','undefined']
CATEGORY_CHOICES = ['implicit animosity','explicit derogation and dehumanization','threatening language','abusive humor','undefined']

def generate_prompt(text: str) -> str:
    return (
        f"Text: {text}\n"
        f"Stance (choose one: {', '.join(STANCE_CHOICES)})\n"
        f"Category (choose one: {', '.join(CATEGORY_CHOICES)})\n"
        "Respond in JSON with keys: stance and category."
    )

def parse_response(response: str):
    response = response.replace('“','"').replace('”','"')
    m = re.search(r'\{.*?\}', response, re.DOTALL)
    if not m:
        return 'undefined','undefined'
    try:
        data = json.loads(m.group(0))
        s = data.get('stance','undefined')
        c = data.get('category','undefined')
        return (s if s in STANCE_CHOICES else 'undefined',
                c if c in CATEGORY_CHOICES else 'undefined')
    except json.JSONDecodeError:
        return 'undefined','undefined'

def _process_and_write_batch(batch, model, tokenizer, device, f_out):
    for attempt in range(MAX_RETRIES):
        try:
            prompts = [generate_prompt(t) for t in batch]
            inputs = tokenizer(
                prompts,
                return_tensors='pt',
                padding=True,
                truncation=True,
                max_length=512
            ).to(device)

            outputs = model.generate(
                **inputs,
                max_new_tokens=150,
                do_sample=False,  # greedy decoding
                pad_token_id=tokenizer.eos_token_id
            )
            for i, text in enumerate(batch):
                out_ids = outputs[i][len(inputs['input_ids'][i]):]
                decoded = tokenizer.decode(out_ids, skip_special_tokens=True).strip()
                stance, category = parse_response(decoded)
                clean = text.replace('\t',' ').replace('\n',' ')
                f_out.write(f"{clean}\t{stance}\t{category}\n")
                f_out.flush()
                print(f"[Batch] {clean[:60]} → {stance}, {category}")
            break
        except Exception as e:
            print(f"Retry {attempt+1} failed:", e)
            if attempt < MAX_RETRIES - 1:
                time.sleep(RETRY_DELAY)
            else:
                for t in batch:
                    c = t.replace('\t',' ').replace('\n',' ')
                    f_out.write(f"{c}\tundefined\tundefined\n")
                f_out.flush()
                print("Final fallback: undefined")

def classify_loop(model, tokenizer, device):
    with open(INPUT_FILE, 'r', encoding='utf-8') as f_in, \
         open(OUTPUT_FILE,'w', encoding='utf-8') as f_out:
        f_out.write("text\tstance\tcategory\n")
        batch = []
        for raw in f_in:
            txt = raw.strip()
            if not txt: continue
            batch.append(txt)
            if len(batch) == BATCH_SIZE:
                _process_and_write_batch(batch, model, tokenizer, device, f_out)
                batch = []
        if batch:
            _process_and_write_batch(batch, model, tokenizer, device, f_out)

def main():
    try:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(f"Using device: {device}")

        print("Loading tokenizer...")
        tok = AutoTokenizer.from_pretrained(
            MODEL_NAME,
            token=HF_TOKEN,  # updated per deprecation warning
            trust_remote_code=True
        )
        tok.padding_side = "left"
        if tok.pad_token is None:
            tok.pad_token = tok.eos_token

        print("Loading model...")
        mdl = AutoModelForCausalLM.from_pretrained(
            MODEL_NAME,
            token=HF_TOKEN,
            trust_remote_code=True,
            torch_dtype=torch.bfloat16,
            device_map="auto"  # spreads across multiple GPUs if available
        )

        print("Starting classification loop...")
        classify_loop(mdl, tok, device)

    except Exception as e:
        print(f"Exception occurred: {e}")

if __name__ == '__main__':
    main()

These two models could be run the same.

## Using Mistral:

In [None]:
import os, re, json, time
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM

# Config
HF_TOKEN       = os.getenv('HF_TOKEN','SECRET')
#MODEL_NAME     = "mistralai/Mistral-Small-3.1-24B-Instruct-2503"
MODEL_NAME = "mistralai/Mistral-7B-Instruct-v0.1"
INPUT_FILE     = "/content/drive/MyDrive/ReligiousHateSpeech_DataSet.txt"
OUTPUT_FILE    = "/content/drive/MyDrive/Mistral24B_Stance_and_Category.tsv"
BATCH_SIZE     = 2
MAX_RETRIES    = 3
RETRY_DELAY    = 2
STANCE_CHOICES = ['anti-Christianity','anti-Islam','anti-Hinduism','anti-Buddhism','anti-Atheism','undefined']
CATEGORY_CHOICES = ['implicit animosity','explicit derogation and dehumanization','threatening language','abusive humor','undefined']

# Prompt format suited for chat-instruct models like Mistral
def generate_prompt(text: str) -> str:
    return (
        f"[INST] Given the following text, classify the stance and category.\n\n"
        f"Text: {text}\n\n"
        f"Stance (choose one): {', '.join(STANCE_CHOICES)}\n"
        f"Category (choose one): {', '.join(CATEGORY_CHOICES)}\n"
        f"Respond only in JSON using keys: stance and category. [/INST]"
    )

def parse_response(response: str):
    response = response.replace('“','"').replace('”','"')
    m = re.search(r'\{.*?\}', response, re.DOTALL)
    if not m:
        return 'undefined','undefined'
    try:
        data = json.loads(m.group(0))
        s = data.get('stance','undefined')
        c = data.get('category','undefined')
        return (s if s in STANCE_CHOICES else 'undefined',
                c if c in CATEGORY_CHOICES else 'undefined')
    except json.JSONDecodeError:
        return 'undefined','undefined'

def _process_and_write_batch(batch, model, tokenizer, device, f_out):
    for attempt in range(MAX_RETRIES):
        try:
            prompts = [generate_prompt(t) for t in batch]
            inputs = tokenizer(
                prompts,
                return_tensors='pt',
                padding=True,
                truncation=True,
                max_length=1024
            ).to(device)

            outputs = model.generate(
                **inputs,
                max_new_tokens=200,
                do_sample=False,
                pad_token_id=tokenizer.eos_token_id
            )
            for i, text in enumerate(batch):
                out_ids = outputs[i][len(inputs['input_ids'][i]):]
                decoded = tokenizer.decode(out_ids, skip_special_tokens=True).strip()
                stance, category = parse_response(decoded)
                clean = text.replace('\t',' ').replace('\n',' ')
                f_out.write(f"{clean}\t{stance}\t{category}\n")
                f_out.flush()
                print(f"[✓] {clean[:60]} → {stance}, {category}")
            break
        except Exception as e:
            print(f"Retry {attempt+1} failed:", e)
            if attempt < MAX_RETRIES - 1:
                time.sleep(RETRY_DELAY)
            else:
                for t in batch:
                    c = t.replace('\t',' ').replace('\n',' ')
                    f_out.write(f"{c}\tundefined\tundefined\n")
                f_out.flush()
                print("Final fallback: undefined")

def classify_loop(model, tokenizer, device):
    with open(INPUT_FILE, 'r', encoding='utf-8') as f_in, \
         open(OUTPUT_FILE,'w', encoding='utf-8') as f_out:
        f_out.write("text\tstance\tcategory\n")
        batch = []
        for raw in f_in:
            txt = raw.strip()
            if not txt: continue
            batch.append(txt)
            if len(batch) == BATCH_SIZE:
                _process_and_write_batch(batch, model, tokenizer, device, f_out)
                batch = []
        if batch:
            _process_and_write_batch(batch, model, tokenizer, device, f_out)

def main():
    try:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(f"Using device: {device}")

        print("Loading tokenizer...")
        tok = AutoTokenizer.from_pretrained(
            MODEL_NAME,
            token=HF_TOKEN,
            trust_remote_code=True
        )
        tok.padding_side = "left"
        if tok.pad_token is None:
            tok.pad_token = tok.eos_token

        print("Loading model...")
        mdl = AutoModelForCausalLM.from_pretrained(
            MODEL_NAME,
            token=HF_TOKEN,
            trust_remote_code=True,
            torch_dtype=torch.bfloat16,
            device_map="auto"
        )

        print("Starting classification loop...")
        classify_loop(mdl, tok, device)

    except Exception as e:
        print(f"Exception occurred: {e}")

if __name__ == '__main__':
    main()

## Using the YI model

In [None]:
import os, re, json, time
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM

# Config
HF_TOKEN       = os.getenv('HF_TOKEN','SECRET')
MODEL_NAME     = "01-ai/Yi-1.5-9B-Chat"
INPUT_FILE     = "/content/drive/MyDrive/ReligiousHateSpeech_DataSet.txt"
OUTPUT_FILE    = "/content/drive/MyDrive/Yi-9B_Stance_and_Category.tsv"
BATCH_SIZE     = 1
MAX_RETRIES    = 3
RETRY_DELAY    = 2

STANCE_CHOICES = ['anti-Christianity','anti-Islam','anti-Hinduism','anti-Buddhism','anti-Atheism','undefined']
CATEGORY_CHOICES = ['implicit animosity','explicit derogation and dehumanization','threatening language','abusive humor','undefined']

SYSTEM_PROMPT = (
    "You are a helpful assistant. Given a text, identify whether it contains religious hate speech, and classify:\n"
    f"- Stance: One of {STANCE_CHOICES}\n"
    f"- Category: One of {CATEGORY_CHOICES}\n"
    "Respond in JSON with the keys 'stance' and 'category'."
)

def generate_prompt(text: str) -> str:
    return (
        f"System: {SYSTEM_PROMPT}\n"
        f"Human: Text: {text}\nPlease classify.\n"
        f"Assistant:"
    )

def parse_response(response: str):
    try:
        response = response.replace('“','"').replace('”','"')
        m = re.search(r'\{.*?\}', response, re.DOTALL)
        if not m:
            return 'undefined','undefined'
        data = json.loads(m.group(0))
        s = data.get('stance','undefined')
        c = data.get('category','undefined')
        return (s if s in STANCE_CHOICES else 'undefined',
                c if c in CATEGORY_CHOICES else 'undefined')
    except Exception:
        return 'undefined','undefined'

def _process_and_write_batch(batch, model, tokenizer, device, f_out):
    for attempt in range(MAX_RETRIES):
        try:
            prompts = [generate_prompt(t) for t in batch]
            inputs = tokenizer(prompts, return_tensors='pt', padding=True, truncation=True, max_length=1024).to(device)

            outputs = model.generate(
                **inputs,
                max_new_tokens=200,
                do_sample=False,
                pad_token_id=tokenizer.eos_token_id
            )

            for i, text in enumerate(batch):
                out_ids = outputs[i][len(inputs['input_ids'][i]):]
                decoded = tokenizer.decode(out_ids, skip_special_tokens=True).strip()
                stance, category = parse_response(decoded)
                clean = text.replace('\t',' ').replace('\n',' ')
                f_out.write(f"{clean}\t{stance}\t{category}\n")
                f_out.flush()
                print(f"[✓] {clean[:60]} → {stance}, {category}")
            break
        except Exception as e:
            print(f"Retry {attempt+1} failed:", e)
            if attempt < MAX_RETRIES - 1:
                time.sleep(RETRY_DELAY)
            else:
                for t in batch:
                    clean = t.replace('\t',' ').replace('\n',' ')
                    f_out.write(f"{clean}\tundefined\tundefined\n")
                f_out.flush()
                print("Final fallback: undefined")

def classify_loop(model, tokenizer, device):
    with open(INPUT_FILE, 'r', encoding='utf-8') as f_in, \
         open(OUTPUT_FILE, 'w', encoding='utf-8') as f_out:
        f_out.write("text\tstance\tcategory\n")
        batch = []
        for raw in f_in:
            txt = raw.strip()
            if not txt: continue
            batch.append(txt)
            if len(batch) == BATCH_SIZE:
                _process_and_write_batch(batch, model, tokenizer, device, f_out)
                batch = []
        if batch:
            _process_and_write_batch(batch, model, tokenizer, device, f_out)

def main():
    try:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(f"Using device: {device}")

        print("Loading tokenizer...")
        tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=False, token=HF_TOKEN)
        tokenizer.padding_side = "left"
        if tokenizer.pad_token is None:
            tokenizer.pad_token = tokenizer.eos_token

        print("Loading model...")
        model = AutoModelForCausalLM.from_pretrained(
            MODEL_NAME,
            token=HF_TOKEN,
            torch_dtype=torch.float16,
            device_map="auto"
        )

        print("Starting classification loop...")
        classify_loop(model, tokenizer, device)

    except Exception as e:
        print(f"Exception occurred: {e}")

if __name__ == '__main__':
    main()

Using the SEA-LION model:

In [None]:
import os, re, json, time
import torch
from transformers import pipeline

# === Config ===
HF_TOKEN       = os.getenv('HF_TOKEN', 'SECRET')
MODEL_NAME     = "aisingapore/Llama-SEA-LION-v3.5-8B-R"
INPUT_FILE     = "/content/drive/MyDrive/ReligiousHateSpeech_DataSet.txt"
OUTPUT_FILE    = "/content/drive/MyDrive/SEA_LION_Stance_and_Category.tsv"
BATCH_SIZE     = 1
MAX_RETRIES    = 3
RETRY_DELAY    = 2

STANCE_CHOICES = ['anti-Christianity','anti-Islam','anti-Hinduism','anti-Buddhism','anti-Atheism','undefined']
CATEGORY_CHOICES = ['implicit animosity','explicit derogation' ,'dehumanization','threatening language','abusive humor','undefined']

# === Prompt Generator ===
def generate_prompt(text: str) -> str:
    return (
        "Classify the following text strictly by returning only the two labels in this format:\n"
        "Stance: <one of [anti-Christianity, anti-Islam, anti-Hinduism, anti-Buddhism, anti-Atheism, undefined]>\n"
        "Category: <one of [implicit animosity, explicit derogation and dehumanization, threatening language, abusive humor, undefined]>\n"
        "Do not explain your reasoning. Do not write anything else.\n\n"
        f"Text: {text}\n"
        "Answer:"
    )


# === Response Parser ===
def parse_response(response: str):
    stance = 'undefined'
    category = 'undefined'

    if not isinstance(response, str):
        response = str(response)

    # Optional: debug output
    print("[RAW RESPONSE]:", repr(response))

    # Try to match full line-based outputs
    stance_match = re.search(r"\b[Ss]tance\s*[:\-]?\s*(anti-[\w]+|undefined)", response)
    category_match = re.search(
        r"\b[Cc]ategory\s*[:\-]?\s*(implicit animosity|explicit derogation and dehumanization|threatening language|abusive humor|undefined)",
        response)

    if stance_match:
        s_raw = stance_match.group(1).strip().lower()
        for s in STANCE_CHOICES:
            if s.lower() == s_raw:
                stance = s
                break

    if category_match:
        c_raw = category_match.group(1).strip().lower()
        for c in CATEGORY_CHOICES:
            if c.lower() == c_raw:
                category = c
                break

    # Fallback: try to find a known stance/category anywhere in the response
    if stance == 'undefined':
        for s in STANCE_CHOICES:
            if re.search(rf"\b{s}\b", response, flags=re.IGNORECASE):
                stance = s
                break

    if category == 'undefined':
        for c in CATEGORY_CHOICES:
            if re.search(rf"\b{re.escape(c)}\b", response, flags=re.IGNORECASE):
                category = c
                break

    return stance, category

# === Batch Processor ===
def _process_and_write_batch(batch, pipeline_model, f_out):
    for attempt in range(MAX_RETRIES):
        try:
            prompts = [generate_prompt(t) for t in batch]
            messages = [{"role": "user", "content": prompt} for prompt in prompts]
            outputs = pipeline_model(messages, max_new_tokens=512)

            for i, text in enumerate(batch):
                response_data = outputs[i]

                # Extract assistant response from chat-style output
                content = ""
                if isinstance(response_data, dict) and "generated_text" in response_data:
                    generated_sequence = response_data["generated_text"]
                    if isinstance(generated_sequence, list):
                        for entry in generated_sequence:
                            if entry.get("role") == "assistant":
                                content = entry.get("content", "")
                                break
                elif isinstance(response_data, str):
                    content = response_data
                else:
                    content = str(response_data)

                # Truncate everything to the last "Stance:" to avoid yapping
                last_stance = content.rfind("Stance:")
                last_category = content.rfind("Category:")
                if last_stance != -1 and last_category != -1 and last_category > last_stance:
                    content = content[last_stance:]

                print("[RAW MODEL OUTPUT]:", repr(content))  # Optional debug

                stance, category = parse_response(content)
                clean = text.replace('\t',' ').replace('\n',' ')
                f_out.write(f"{clean}\t{stance}\t{category}\n")
                f_out.flush()
                print(f"[✓] {clean[:60]} → {stance}, {category}")
            break  # success
        except Exception as e:
            print(f"Retry {attempt+1} failed:", e)
            if attempt < MAX_RETRIES - 1:
                time.sleep(RETRY_DELAY)
            else:
                for t in batch:
                    clean = t.replace('\t',' ').replace('\n',' ')
                    f_out.write(f"{clean}\tundefined\tundefined\n")
                f_out.flush()
                print("Final fallback: undefined")

# === Classification Loop ===
def classify_loop(pipeline_model):
    with open(INPUT_FILE, 'r', encoding='utf-8') as f_in, \
         open(OUTPUT_FILE, 'w', encoding='utf-8') as f_out:
        f_out.write("text\tstance\tcategory\n")
        batch = []
        for raw in f_in:
            txt = raw.strip()
            if not txt: continue
            batch.append(txt)
            if len(batch) == BATCH_SIZE:
                _process_and_write_batch(batch, pipeline_model, f_out)
                batch = []
        if batch:
            _process_and_write_batch(batch, pipeline_model, f_out)

# === Main ===
def main():
    try:
        print("Loading SEA-LION model via pipeline...")

        # Detect GPU
        if torch.cuda.is_available():
            device = 0
            dtype = torch.bfloat16  # or bfloat16 if your GPU supports it (e.g., A100, L4)
        else:
            device = -1  # CPU fallback
            dtype = None

        pipeline_model = pipeline(
            "text-generation",
            model=MODEL_NAME,
            device=device,
            torch_dtype=dtype,
            token=HF_TOKEN
        )

        classify_loop(pipeline_model)

    except Exception as e:
        print(f"Exception occurred: {e}")

if __name__ == '__main__':
    main()

# Create topic model and concept map

In [None]:
# Step 1: Install necessary libraries
!pip install -q bertopic[visualization] sentence-transformers umap-learn hdbscan plotly kaleido

In [None]:
# Install required libraries
!pip install -q nltk networkx plotly kaleido

# Imports
import pandas as pd
import nltk
import re
import itertools
import networkx as nx
import plotly.graph_objects as go
from nltk.corpus import stopwords
from collections import Counter
from google.colab import files

# Download stopwords
nltk.download('stopwords')
stop_words = set(stopwords.words('english'))

# Load and clean data
df = pd.read_csv("/content/drive/MyDrive/disagreements.tsv", sep="\t")
texts = df["text"].astype(str).tolist()

def preprocess(text):
    text = re.sub(r'[^\w\s]', '', text.lower())
    return [word for word in text.split() if word not in stop_words]

tokenized_texts = [preprocess(text) for text in texts]

# Build co-occurrence pairs
cooccurrence = Counter()
for tokens in tokenized_texts:
    unique_tokens = list(set(tokens))  # prevent inflating from repeated words
    for pair in itertools.combinations(unique_tokens, 2):
        cooccurrence[tuple(sorted(pair))] += 1

# Create graph
G = nx.Graph()
for (w1, w2), freq in cooccurrence.items():
    if freq >= 2:  # Filter low-strength edges
        G.add_edge(w1, w2, weight=freq)

# Layout & draw with Plotly
pos = nx.spring_layout(G, k=0.5, seed=42)  # Force-directed layout

edge_x = []
edge_y = []
weights = []
for edge in G.edges(data=True):
    x0, y0 = pos[edge[0]]
    x1, y1 = pos[edge[1]]
    edge_x += [x0, x1, None]
    edge_y += [y0, y1, None]
    weights.append(edge[2]['weight'])

edge_trace = go.Scatter(
    x=edge_x, y=edge_y,
    line=dict(width=1, color='gray'),
    hoverinfo='none',
    mode='lines')

node_x = []
node_y = []
node_text = []
for node in G.nodes():
    x, y = pos[node]
    node_x.append(x)
    node_y.append(y)
    node_text.append(node)

node_trace = go.Scatter(
    x=node_x, y=node_y,
    mode='markers+text',
    text=node_text,
    textposition="top center",
    hoverinfo='text',
    marker=dict(
        size=10,
        color='skyblue',
        line_width=2))

fig = go.Figure(data=[edge_trace, node_trace],
                layout=go.Layout(
                    title='Word Co-occurrence Concept Map',
                    showlegend=False,
                    hovermode='closest',
                    margin=dict(b=20,l=5,r=5,t=40),
                    xaxis=dict(showgrid=False, zeroline=False),
                    yaxis=dict(showgrid=False, zeroline=False))
                )

# Save
fig.write_html("word_concept_map.html")
fig.write_image("word_concept_map.png", width=1000, height=800)

# Download
files.download("word_concept_map.html")
files.download("word_concept_map.png")

In [None]:
# Install needed libraries
!pip install -q nltk plotly networkx kaleido

# Imports
import pandas as pd
import re
import nltk
from nltk.corpus import stopwords
from collections import Counter
import itertools
import networkx as nx
import plotly.graph_objects as go

nltk.download('stopwords')
stop_words = set(stopwords.words('english'))

# Load and preprocess text
df = pd.read_csv("/content/drive/MyDrive/disagreements.tsv", sep="\t")
texts = df["text"].tolist()

def preprocess(text):
    text = re.sub(r'[^\w\s]', '', text.lower())
    return [word for word in text.split() if word not in stop_words]

tokenized_texts = [preprocess(t) for t in texts]

# Count co-occurrences
co_occurrence = Counter()
for tokens in tokenized_texts:
    for pair in itertools.combinations(set(tokens), 2):  # avoid duplicates in one doc
        co_occurrence[tuple(sorted(pair))] += 1

# Create graph
G = nx.Graph()
for (w1, w2), weight in co_occurrence.items():
    if weight >= 2:  # filter for stronger connections
        G.add_edge(w1, w2, weight=weight)

# Position nodes using spring layout
pos = nx.spring_layout(G, k=0.5, iterations=50)

# Prepare edges for Plotly
edge_x = []
edge_y = []
edge_weights = []

for edge in G.edges(data=True):
    x0, y0 = pos[edge[0]]
    x1, y1 = pos[edge[1]]
    edge_x += [x0, x1, None]
    edge_y += [y0, y1, None]
    edge_weights.append(edge[2]['weight'])

edge_trace = go.Scatter(
    x=edge_x,
    y=edge_y,
    line=dict(width=1, color='#888'),
    hoverinfo='none',
    mode='lines'
)

# Node trace
node_x = []
node_y = []
node_text = []
for node in G.nodes():
    x, y = pos[node]
    node_x.append(x)
    node_y.append(y)
    node_text.append(node)

node_trace = go.Scatter(
    x=node_x,
    y=node_y,
    mode='markers+text',
    text=node_text,
    textposition="top center",
    hoverinfo='text',
    marker=dict(
        showscale=True,
        colorscale='YlGnBu',
        reversescale=True,
        color=[len(list(G.neighbors(n))) for n in G.nodes()],
        size=10,
        colorbar=dict(
            thickness=15,
            title='Node Connections',
            xanchor='left',
            titleside='right'
        )
    )
)

# Create and save figure
fig = go.Figure(data=[edge_trace, node_trace],
                layout=go.Layout(
                    title='Word Co-occurrence Concept Map',
                    showlegend=False,
                    hovermode='closest',
                    margin=dict(b=20,l=5,r=5,t=40)
                ))

fig.write_html("cooccurrence_concept_map.html")
fig.write_image("cooccurrence_concept_map.png", format="png", width=1000, height=800, scale=2)

# Download
from google.colab import files
files.download("cooccurrence_concept_map.html")
files.download("cooccurrence_concept_map.png")