Install dependencies

In [26]:
!pip install -q flask flask-cors streamlit transformers torch pandas python-dateutil watchdog requests pyngrok


**Imports and Processor **

In [27]:
from transformers import pipeline
from dateutil import parser as dateparser
import re, json, os

# Initialize NER pipeline
ner = pipeline(
    "ner",
    model="dbmdz/bert-large-cased-finetuned-conll03-english",
    grouped_entities=True
)

def looks_like_date(s: str) -> bool:
    if not s or not isinstance(s, str):
        return False
    try:
        dateparser.parse(s, fuzzy=False)
        return True
    except:
        return bool(re.search(r"\d{4}|Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec", s, re.IGNORECASE))

def extract_entities(text: str):
    if not text:
        return {
            "persons": [], "locations": [], "organizations": [],
            "ner_other_entities": [], "ner_dates": [], "regex_dates": []
        }

    ner_results = ner(text)
    persons = []
    locations = []
    organizations = []
    ner_others = []
    ner_dates = []

    for ent in ner_results:
        label = ent.get("entity_group") or ent.get("entity")
        word = ent.get("word")
        score = ent.get("score")
        entry = {"text": word, "label": label, "score": float(score) if score is not None else None}

        if label and label.lower() in ("per", "person", "persons"):
            persons.append(entry)
        elif label and label.lower() in ("loc", "location"):
            locations.append(entry)
        elif label and label.lower() in ("org", "organization"):
            organizations.append(entry)
        elif looks_like_date(word):
            ner_dates.append(entry)
        else:
            ner_others.append(entry)

    DATE_REGEXES = [
        r"\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b",
        r"\b\d{4}-\d{2}-\d{2}\b",
        r"\b(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\.?\s+\d{1,2},?\s+\d{4}\b",
        r"\b\d{1,2}\s+(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{4}\b",
        r"\b\d{4}\b"
    ]
    found_dates = []
    for rx in DATE_REGEXES:
        for m in re.findall(rx, text, flags=re.IGNORECASE):
            if m not in found_dates:
                found_dates.append(m)

    normalized_dates = []
    for d in found_dates:
        try:
            parsed = dateparser.parse(d, fuzzy=True)
            iso = parsed.date().isoformat()
            normalized_dates.append({"text": d, "iso": iso})
        except:
            normalized_dates.append({"text": d})

    return {
        "persons": persons,
        "locations": locations,
        "organizations": organizations,
        "ner_other_entities": ner_others,
        "ner_dates": ner_dates,
        "regex_dates": normalized_dates
    }


Some weights of the model checkpoint at dbmdz/bert-large-cased-finetuned-conll03-english were not used when initializing BertForTokenClassification: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Device set to use cpu


**Database and CSV handling**

In [28]:
import sqlite3
import pandas as pd

DB_PATH = "results.db"
CSV_PATH = "results.csv"

conn = sqlite3.connect(DB_PATH, check_same_thread=False)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS results (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    input_text TEXT,
    extracted_json TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()

def save_result(text: str, extracted: dict):
    c.execute("INSERT INTO results (input_text, extracted_json) VALUES (?, ?)", (text, json.dumps(extracted, ensure_ascii=False)))
    conn.commit()

    row_df = pd.DataFrame([[text, json.dumps(extracted, ensure_ascii=False)]], columns=["input_text","extracted_json"])
    if not os.path.exists(CSV_PATH):
        row_df.to_csv(CSV_PATH, index=False, encoding="utf-8")
    else:
        row_df.to_csv(CSV_PATH, mode="a", header=False, index=False, encoding="utf-8")


**Flask backend**

In [38]:
from flask import Flask, request, jsonify, send_file
from flask_cors import CORS
import logging
import os
import traceback

app = Flask("genai_pipeline")
CORS(app)

logging.basicConfig(level=logging.INFO)

@app.route("/process_text", methods=["POST"])
def process_text_endpoint():
    try:
        text = request.form.get("text", "")
        if not text.strip():
            logging.warning("Empty text received")
            return jsonify({"error": "No text provided"}), 400

        extracted = extract_entities(text)
        save_result(text, extracted)
        logging.info("Text processed successfully")
        return jsonify({"extracted": extracted})
    except Exception as e:
        logging.error(f"Error processing text: {str(e)}\n{traceback.format_exc()}")
        return jsonify({"error": "Internal server error"}), 500

@app.route("/process_file", methods=["POST"])
def process_file_endpoint():
    try:
        file = request.files.get("file")
        if file is None:
            logging.warning("No file provided in /process_file")
            return jsonify({"error": "No file provided"}), 400


        filename = file.filename
        if not filename.lower().endswith(".txt"):
            logging.warning(f"Invalid file type: {filename}")
            return jsonify({"error": "Invalid file type, only .txt files allowed"}), 400


        file.seek(0, os.SEEK_END)
        size = file.tell()
        file.seek(0)
        if size > 2 * 1024 * 1024:
            logging.warning(f"File too large: {size} bytes")
            return jsonify({"error": "File too large (max 2MB)"}), 400

        raw = file.read()
        try:
            text = raw.decode("utf-8")
        except Exception:
            text = raw.decode("latin-1", errors="ignore")

        if not text.strip():
            logging.warning("Empty file content")
            return jsonify({"error": "File is empty or unreadable"}), 400

        extracted = extract_entities(text)
        save_result(text, extracted)
        logging.info(f"File processed successfully: {filename}")
        return jsonify({"filename": filename, "extracted": extracted})
    except Exception as e:
        logging.error(f"Error in /process_file: {str(e)}\n{traceback.format_exc()}")
        return jsonify({"error": "Internal server error"}), 500

@app.route("/download_csv", methods=["GET"])
def download_csv_endpoint():
    try:
        if not os.path.exists(CSV_PATH):
            logging.warning("CSV not found")
            return jsonify({"error": "CSV not found"}), 404
        return send_file(CSV_PATH, as_attachment=True)
    except Exception as e:
        logging.error(f"CSV download error: {str(e)}\n{traceback.format_exc()}")
        return jsonify({"error": "Internal server error"}), 500

@app.route("/download_db", methods=["GET"])
def download_db_endpoint():
    try:
        if not os.path.exists(DB_PATH):
            logging.warning("DB not found")
            return jsonify({"error": "DB not found"}), 404
        return send_file(DB_PATH, as_attachment=True)
    except Exception as e:
        logging.error(f"DB download error: {str(e)}\n{traceback.format_exc()}")
        return jsonify({"error": "Internal server error"}), 500

@app.route("/health", methods=["GET"])
def health_endpoint():
    return jsonify({"status": "ok"})


**Start Flask in background thread via ngrok**

In [39]:
import threading
from pyngrok import ngrok
import time

ngrok.set_auth_token("30UiDijxuK8aLc4QgwbrbeBn2sB_7NLAR4ZDQYMG1QnHiwKzM") #here use your ngrok id just signup and paste your token

def run_flask():
    app.run(host="0.0.0.0", port=8000, debug=False, use_reloader=False)

thread = threading.Thread(target=run_flask, daemon=True)
thread.start()

time.sleep(3)

backend_tunnel = ngrok.connect(8000)
print("Flask backend public URL:", backend_tunnel.public_url)


 * Serving Flask app 'genai_pipeline'
 * Debug mode: off


Address already in use
Port 8000 is in use by another program. Either identify and stop that program, or start the server with a different port.


Flask backend public URL: https://192d002157ed.ngrok-free.app


**Streamlit frontend file**

In [41]:
%%writefile frontend.py
import streamlit as st
import requests
import os
import json

st.set_page_config(page_title="GenAI Text Pipeline", layout="centered")
st.title("GenAI Document Entity Extractor")

default_api_url = os.environ.get("API_URL", "http://localhost:8000")
if default_api_url.startswith("http://localhost"):
    API_URL = st.text_input(
        "Enter backend API URL (e.g. ngrok URL):",
        default_api_url,
        help="Enter the public URL of your backend server."
    )
else:
    API_URL = default_api_url

st.write(f"Using backend API URL: {API_URL}")

st.markdown("""
This UI lets you paste text or upload a text file (.txt).
It sends the input to the backend, shows extracted JSON, and saves results to CSV + SQLite on the server.
""")

choice = st.radio("Input type", ["Text", "File"])

if "history" not in st.session_state:
    st.session_state.history = []

if choice == "Text":
    user_text = st.text_area("Paste your text here", height=200)
    if st.button("Process Text"):
        if not user_text.strip():
            st.warning("Please enter some text.")
        else:
            with st.spinner("Processing..."):
                resp = requests.post(f"{API_URL.rstrip('/')}/process_text", data={"text": user_text})
            if resp.ok:
                st.success("Processed successfully")
                st.json(resp.json()["extracted"])
                st.session_state.history.append({
                    "input": user_text,
                    "result": resp.json()["extracted"],
                    "input_type": "Text"
                })
            else:
                st.error(f"Error: {resp.status_code} {resp.text}")
else:
    uploaded = st.file_uploader("Choose a .txt file to upload", type=["txt"])
    if uploaded is not None:
        if st.button("Upload and Process File"):
            files = {"file": (uploaded.name, uploaded.getvalue())}
            with st.spinner("Uploading & processing..."):
                resp = requests.post(f"{API_URL.rstrip('/')}/process_file", files=files)
            if resp.ok:
                st.success("File processed successfully")
                st.json(resp.json()["extracted"])
                st.session_state.history.append({
                    "input": f"[FILE] {uploaded.name}",
                    "result": resp.json()["extracted"],
                    "input_type": "File"
                })
            else:
                st.error(f"Error: {resp.status_code} {resp.text}")

st.write("---")
st.subheader("Processing History")
if st.session_state.history:
    for i, entry in enumerate(reversed(st.session_state.history[-10:]), 1):
        st.markdown(f"**Entry #{len(st.session_state.history)-i+1}**")
        label = entry.get("input_type", "Text")
        st.text_area(f"{label} Input", value=entry['input'], height=100, key=f"hist_input_{i}", disabled=True)
        st.json(entry['result'], expanded=False)
        st.markdown("---")
else:
    st.write("No history yet.")

st.write("---")
st.subheader("Download per-entry result as file")

if st.session_state.history:
    history_labels = [
        f"{i+1}: {entry['input'][:40]}{'...' if len(entry['input'])>40 else ''} ({entry['input_type']})"
        for i, entry in enumerate(st.session_state.history)
    ]
    selected_idx = st.selectbox("Select processed input to download:", options=list(range(len(history_labels))), format_func=lambda x: history_labels[x])
    entry = st.session_state.history[selected_idx]
    file_name = f"result_entry_{selected_idx+1}.json"
    st.download_button(
        label=f"Download result {selected_idx+1} as JSON",
        data=json.dumps(entry["result"], indent=2),
        file_name=file_name,
        mime="application/json"
    )

st.write("---")
st.subheader("Download stored results (All)")

def url_exists(url):
    try:
        resp = requests.head(url)
        return resp.status_code == 200
    except:
        return False

if API_URL:
    download_csv_url = f"{API_URL.rstrip('/')}/download_csv"
    download_db_url = f"{API_URL.rstrip('/')}/download_db"

    csv_exists = url_exists(download_csv_url)
    db_exists = url_exists(download_db_url)

    col1, col2 = st.columns(2)
    with col1:
        if csv_exists:
            resp = requests.get(download_csv_url)
            st.download_button(
                label="Download results.csv",
                data=resp.content,
                file_name="results.csv",
                mime="text/csv"
            )
        else:
            st.info("results.csv not available yet. Please process some text or files first.")
    with col2:
        if db_exists:
            resp = requests.get(download_db_url)
            st.download_button(
                label="Download results.db",
                data=resp.content,
                file_name="results.db",
                mime="application/octet-stream"
            )
        else:
            st.info("results.db not available yet. Please process some text or files first.")

st.write("---")
st.markdown("**Notes:** The backend saves results to `results.csv` and `results.db`. Use the download buttons above.")


Overwriting frontend.py


**Run Streamlit frontend & expose with ngrok**

In [44]:
get_ipython().system_raw("streamlit run frontend.py --server.port 8501 &")

frontend_tunnel = ngrok.connect(8501)
print("Streamlit frontend public URL:", frontend_tunnel.public_url)


Streamlit frontend public URL: https://11421c77c622.ngrok-free.app


Example n8n workflow JSON snippet (save as n8n_workflow.json)

In [23]:
import json

workflow = {
    "name": "GenAI Text Processing Workflow",
    "nodes": [
        {
            "parameters": {"path": "/watch-folder", "options": {}},
            "id": "1",
            "name": "Watch Files",
            "type": "n8n-nodes-base.watch",
            "typeVersion": 1,
            "position": [250, 300]
        },
        {
            "parameters": {
                "url": "https://5e6f26158a28.ngrok-free.app",#here use url from Start Flask in background thread via ngrok for eg:Flask backend public URL: https://c196ecdcb5ce.ngrok-free.app
                "method": "POST",
                "jsonParameters": False,
                "options": {},
                "bodyParametersUi": {
                    "parameter": [
                        {
                            "name": "file",
                            "value": '={{$node["Watch Files"].json["path"]}}',
                            "type": "file"
                        }
                    ]
                },
                "responseFormat": "json"
            },
            "id": "2",
            "name": "Call GenAI API",
            "type": "n8n-nodes-base.httpRequest",
            "typeVersion": 2,
            "position": [450, 300]
        },
        {
            "parameters": {"fileName": "results.csv", "mode": "append", "options": {}},
            "id": "3",
            "name": "Save to CSV",
            "type": "n8n-nodes-base.csvFile",
            "typeVersion": 1,
            "position": [650, 300]
        }
    ],
    "connections": {
        "Watch Files": {
            "main": [[{"node": "Call GenAI API", "type": "main", "index": 0}]]
        },
        "Call GenAI API": {
            "main": [[{"node": "Save to CSV", "type": "main", "index": 0}]]
        }
    }
}

# Save to JSON file
with open("n8n_workflow.json", "w") as f:
    json.dump(workflow, f, indent=2)

print("Exported n8n workflow to n8n_workflow.json")


Exported n8n workflow to n8n_workflow.json


**Displaying Stored Results**

In [24]:
import sqlite3
import pandas as pd

DB_PATH = "results.db"
CSV_PATH = "results.csv"

def show_latest_db_results(limit=10):
    conn = sqlite3.connect(DB_PATH)
    query = f"SELECT id, input_text, substr(extracted_json, 1, 200) as extracted_preview, created_at FROM results ORDER BY id DESC LIMIT {limit}"
    df = pd.read_sql_query(query, conn)
    conn.close()
    print("Latest results from SQLite DB:")
    print(df)

def show_csv_results():
    try:
        df = pd.read_csv(CSV_PATH)
        print(f"Showing first 10 records from {CSV_PATH}:")
        print(df.head(10))
    except FileNotFoundError:
        print(f"{CSV_PATH} not found. No results saved yet.")

if __name__ == "__main__":
    show_latest_db_results()
    print("\n")
    show_csv_results()


Latest results from SQLite DB:
   id                                         input_text  \
0  12  Meeting with Sam Carter on April 10, 2033. Fol...   
1  11  Meeting with Paige Taylor on January 14, 2033....   
2  10  Meeting with Paige Taylor on January 14, 2033....   
3   9  Meeting with Sam Carter on April 10, 2033. Fol...   
4   8  Meeting with Sam Carter on April 10, 2033. Fol...   
5   7  Meeting with Paige Taylor on January 14, 2033....   
6   6  Team catch-up with Mary Johnson on 2025-11-15....   
7   5  Ravi Kumar will join the meeting at Infosys ca...   
8   4  Project discussion with Elon Musk and Sundar P...   
9   3  Dr. Alan Turing gave a lecture on Artificial I...   

                                   extracted_preview           created_at  
0  {"persons": [{"text": "Sam Carter", "label": "...  2025-09-17 05:39:04  
1  {"persons": [{"text": "Paige Taylor", "label":...  2025-09-17 05:38:37  
2  {"persons": [{"text": "Paige Taylor", "label":...  2025-09-17 05:34:46  
3  {

**Create sample input files (5 example files)**

In [14]:
samples = {
    "sample1.txt": """Meeting with John Doe on 23/09/2025.
Follow up with Jane Smith on September 30, 2025 in Bengaluru.""",

    "sample2.txt": """Dr. Alan Turing gave a lecture on Artificial Intelligence at Cambridge on 12-06-1950.
Next review scheduled for July 20, 1951.""",

    "sample3.txt": """Project discussion with Elon Musk and Sundar Pichai is scheduled on 15 March 2024 at California HQ.""",

    "sample4.txt": """Ravi Kumar will join the meeting at Infosys campus on 01/01/2026.
He will later travel to New Delhi for another event on January 5, 2026.""",

    "sample5.txt": """Team catch-up with Mary Johnson on 2025-11-15.
CEO Satya Nadella will join remotely from Seattle."""
}

for name, content in samples.items():
    with open(name, "w", encoding="utf-8") as f:
        f.write(content)
print("Sample files created:", list(samples.keys()))


Sample files created: ['sample1.txt', 'sample2.txt', 'sample3.txt', 'sample4.txt', 'sample5.txt']


**Test the pipeline on sample files **

In [25]:
import requests


backend_url = backend_tunnel.public_url if 'backend_tunnel' in globals() else "http://localhost:8000"
print("Using backend:", backend_url)

for name in samples.keys():
    with open(name, "rb") as f:
        files = {"file": (name, f)}
        resp = requests.post(f"{backend_url}/process_file", files=files)
    print(f"{name} -> status {resp.status_code}")
    if resp.ok:
        print(json.dumps(resp.json()["extracted"], indent=2))
    else:
        print(resp.text)


import pandas as pd
df_db = pd.read_sql_query("SELECT id, input_text, substr(extracted_json,1,200) as extracted_preview, created_at FROM results ORDER BY id DESC LIMIT 10;", conn)
df_db


Using backend: https://c196ecdcb5ce.ngrok-free.app
sample1.txt -> status 404
<!DOCTYPE html>
<html class="h-full" lang="en-US" dir="ltr">
  <head>
    <link rel="preload" href="https://cdn.ngrok.com/static/fonts/euclid-square/EuclidSquare-Regular-WebS.woff" as="font" type="font/woff" crossorigin="anonymous" />
    <link rel="preload" href="https://cdn.ngrok.com/static/fonts/euclid-square/EuclidSquare-RegularItalic-WebS.woff" as="font" type="font/woff" crossorigin="anonymous" />
    <link rel="preload" href="https://cdn.ngrok.com/static/fonts/euclid-square/EuclidSquare-Medium-WebS.woff" as="font" type="font/woff" crossorigin="anonymous" />
    <link rel="preload" href="https://cdn.ngrok.com/static/fonts/euclid-square/EuclidSquare-Semibold-WebS.woff" as="font" type="font/woff" crossorigin="anonymous" />
    <link rel="preload" href="https://cdn.ngrok.com/static/fonts/euclid-square/EuclidSquare-MediumItalic-WebS.woff" as="font" type="font/woff" crossorigin="anonymous" />
    <link rel="pr

Unnamed: 0,id,input_text,extracted_preview,created_at
0,12,"Meeting with Sam Carter on April 10, 2033. Fol...","{""persons"": [{""text"": ""Sam Carter"", ""label"": ""...",2025-09-17 05:39:04
1,11,"Meeting with Paige Taylor on January 14, 2033....","{""persons"": [{""text"": ""Paige Taylor"", ""label"":...",2025-09-17 05:38:37
2,10,"Meeting with Paige Taylor on January 14, 2033....","{""persons"": [{""text"": ""Paige Taylor"", ""label"":...",2025-09-17 05:34:46
3,9,"Meeting with Sam Carter on April 10, 2033. Fol...","{""persons"": [{""text"": ""Sam Carter"", ""label"": ""...",2025-09-17 05:34:14
4,8,"Meeting with Sam Carter on April 10, 2033. Fol...","{""persons"": [{""text"": ""Sam Carter"", ""label"": ""...",2025-09-17 05:23:13
5,7,"Meeting with Paige Taylor on January 14, 2033....","{""persons"": [{""text"": ""Paige Taylor"", ""label"":...",2025-09-17 05:21:57
6,6,Team catch-up with Mary Johnson on 2025-11-15....,"{""persons"": [{""text"": ""Mary Johnson"", ""label"":...",2025-09-17 05:18:22
7,5,Ravi Kumar will join the meeting at Infosys ca...,"{""persons"": [{""text"": ""Ravi Kumar"", ""label"": ""...",2025-09-17 05:18:21
8,4,Project discussion with Elon Musk and Sundar P...,"{""persons"": [{""text"": ""Elon Musk"", ""label"": ""P...",2025-09-17 05:18:20
9,3,Dr. Alan Turing gave a lecture on Artificial I...,"{""persons"": [{""text"": ""Alan Turing"", ""label"": ...",2025-09-17 05:18:20


**Download results from Colab**

In [None]:

from google.colab import files
if os.path.exists(CSV_PATH):
    files.download(CSV_PATH)
if os.path.exists(DB_PATH):
    files.download(DB_PATH)


**Cleanup / Stop tunnels**

In [43]:

try:
    ngrok.kill()
    print("Ngrok tunnels stopped.")
except Exception as e:
    print("ngrok cleanup error:", e)

Ngrok tunnels stopped.
