<a href="https://colab.research.google.com/github/SinnottKayleigh/B2B-Sales-Algos/blob/main/API_(attempt).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Baaaasic Structure ** v v basic **

Lead Scoring Microservices Ecosystem:

1. API Gateway Service
   - Request routing
   - Authentication
   - Rate limiting
   - Request/Response transformation

2. Lead Management Service
   - Lead creation
   - Lead profile management
   - Data validation
   - Basic lead information storage

3. Scoring Engine Service
   - Core scoring algorithm
   - Machine learning model integration
   - Real-time scoring calculation
   - Model version management

4. Data Persistence Service
   - Database interactions
   - Data storage
   - Caching mechanism
   - Data retrieval optimizations

5. Authentication & Authorization Service
   - User management
   - API key generation
   - Role-based access control
   - Token management

6. Analytics & Reporting Service
   - Performance tracking
   - Scoring trend analysis
   - Predictive insights generation
   - Reporting dashboard

7. Notification Service
   - Email notifications
   - Webhook triggers
   - Internal event broadcasting

Lead Scoring System:

1. Authentication Service
   - User management
   - API key generation
   - Role-based access control

2. Lead Scoring Engine
   - Core scoring algorithm
   - Machine learning integration
   - Real-time scoring calculation

3. Data Persistence Service
   - Database interactions
   - Lead storage
   - Historical data management

4. Analytics & Reporting Service
   - Performance tracking
   - Scoring trend analysis
   - Predictive insights

5. Webhook/Integration Service
   - External system connections
   - CRM synchronization
   - Marketing automation

```
# This is formatted as code
```



POST /api/v1/leads/score
- Input: Lead details
- Output: Scored lead information

GET /api/v1/leads/prospects
- Input: Filter parameters
- Output: Filtered, scored leads

POST /api/v1/leads/create
- Input: New lead details

class LeadScoringAPI:
    def validate_lead_input(lead_data):
        # Validate input fields
        # Check mandatory fields
        # Ensure data integrity

    def calculate_lead_score(lead_data):
        # Apply scoring algorithm
        # Return numerical score
        # Log scoring process

    def store_lead(lead_data, score):
        # Save lead to database
        # Create unique identifier
        # Associate score with lead

    def authenticate_request(api_key):
        # Validate API key
        # Check access permissions
        # Rate limit protection

    def generate_api_response(lead_data, score):
        # Format standardized response
        # Include metadata
        # Ensure consistent structure

Technical Architecture Diagram:

[External Clients]
        │
        ▼
[API Gateway]
    │   │   │
    ▼   ▼   ▼
[Auth Service] [Lead Management] [Scoring Engine]
    │           │               │
    ▼           ▼               ▼
[Database]  [Caching Layer]  [ML Model Registry]
    │           │               │
    ▼           ▼               ▼
[Data Warehouse] [Analytics Service]

In [8]:
pip install fastapi uvicorn pydantic sqlalchemy psycopg2-binary

Collecting uvicorn
  Downloading uvicorn-0.34.0-py3-none-any.whl.metadata (6.5 kB)
Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Downloading uvicorn-0.34.0-py3-none-any.whl (62 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.3/62.3 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m26.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: uvicorn, psycopg2-binary
Successfully installed psycopg2-binary-2.9.10 uvicorn-0.34.0


In [10]:
mkdir lead_scoring_api
cd lead_scoring_api

python3 -m venv venv
source venv/bin/activate

SyntaxError: invalid syntax (<ipython-input-10-ce4c34fbcf61>, line 1)

lead_scoring_api/
│
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── models/
│   │   └── __init__.py
│   ├── schemas/
│   │   └── __init__.py
│   └── services/
│       └── __init__.py
│
├── requirements.txt
├── README.md
└── .env

In [12]:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn

app = FastAPI(
    title="Lead Scoring API",
    description="API for lead scoring and management",
    version="0.1.0"
)

class LeadCreate(BaseModel):
    company_name: str
    industry: str
    annual_revenue: float
    email: str

class LeadResponse(LeadCreate):
    id: int
    lead_score: float

leads_db = []

@app.post("/leads/", response_model=LeadResponse)
async def create_lead(lead: LeadCreate):
    # Basic lead scoring logic
    def calculate_lead_score(lead: LeadCreate) -> float:
        score = 0

        industry_scores = {
            'Technology': 30,
            'Manufacturing': 25,
            'Engineering': 25
        }
        score += industry_scores.get(lead.industry, 10)

        if lead.annual_revenue > 1000000:
            score += 20
        elif lead.annual_revenue > 500000:
            score += 10

        return min(max(score, 0), 100)

    lead_dict = lead.dict()
    lead_dict['id'] = len(leads_db) + 1
    lead_dict['lead_score'] = calculate_lead_score(lead)

    leads_db.append(lead_dict)

    return LeadResponse(**lead_dict)

@app.get("/leads/", response_model=list[LeadResponse])
async def list_leads():
    return leads_db

@app.get("/leads/{lead_id}", response_model=LeadResponse)
async def get_lead(lead_id: int):
    lead = next((lead for lead in leads_db if lead['id'] == lead_id), None)
    if not lead:
        raise HTTPException(status_code=404, detail="Lead not found")
    return lead

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

RuntimeError: asyncio.run() cannot be called from a running event loop

In [13]:
fastapi==0.68.0
uvicorn==0.15.0
pydantic==1.8.2
sqlalchemy==1.4.22
psycopg2-binary==2.9.1
python-dotenv==0.19.0

SyntaxError: invalid syntax (<ipython-input-13-f8d423b1caf1>, line 1)

In [14]:
# Lead Scoring API

## Overview
A microservices-based API for lead scoring and management.

## Setup Instructions
1. Clone the repository
2. Create virtual environment
3. Install dependencies: `pip install -r requirements.txt`
4. Run the application: `uvicorn app.main:app --reload`

## API Endpoints
- POST /leads/: Create a new lead
- GET /leads/: List all leads
- GET /leads/{lead_id}: Get specific lead
- GET /health: Health check endpoint

## Development Roadmap
- [ ] Implement database integration
- [ ] Add authentication
- [ ] Develop advanced scoring algorithm
- [ ] Create comprehensive testing

SyntaxError: invalid syntax (<ipython-input-14-7ac00df5225a>, line 4)

In [16]:
pip install -r requirements.txt
uvicorn app.main:app --reload

SyntaxError: invalid syntax (<ipython-input-16-7cf464bb5487>, line 1)

In [17]:
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import os

DATABASE_URL = os.getenv(
    "DATABASE_URL",
    "postgresql://user:password@localhost/lead_scoring_db"
)

engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

class LeadModel(Base):
    __tablename__ = "leads"

    id = Column(Integer, primary_key=True, index=True)
    company_name = Column(String, index=True)
    industry = Column(String)
    annual_revenue = Column(Float)
    email = Column(String, unique=True)
    lead_score = Column(Float)
    created_at = Column(DateTime, default=datetime.utcnow)

    def to_dict(self):
        return {
            "id": self.id,
            "company_name": self.company_name,
            "industry": self.industry,
            "annual_revenue": self.annual_revenue,
            "email": self.email,
            "lead_score": self.lead_score,
            "created_at": self.created_at
        }

def create_tables():
    Base.metadata.create_all(bind=engine)

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

  from .base import _DECIMAL_TYPES
  Base = declarative_base()


In [18]:
from typing import Dict, Any

class LeadScoringService:
    @staticmethod
    def calculate_lead_score(lead_data: Dict[str, Any]) -> float:
        score = 0

        industry_scores = {
            'Technology': 30,
            'Manufacturing': 25,
            'Engineering': 25,
            'Finance': 20,
            'Healthcare': 15
        }
        score += industry_scores.get(lead_data.get('industry', ''), 10)

        revenue = lead_data.get('annual_revenue', 0)
        if revenue > 5000000:
            score += 30
        elif revenue > 1000000:
            score += 20
        elif revenue > 500000:
            score += 10

        if lead_data.get('email'):
            email_domain = lead_data['email'].split('@')[-1]
            corporate_domains = [
                'gmail.com', 'yahoo.com', 'hotmail.com'
            ]
            score += 10 if email_domain not in corporate_domains else 0

        additional_factors = {
            'has_website': 5,
            'linkedin_profile': 5,
            'previous_interaction': 10
        }

        for factor, points in additional_factors.items():
            if lead_data.get(factor):
                score += points

        return min(max(score, 0), 100)

In [21]:
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
import os

SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

class AuthService:
    @staticmethod
    def verify_password(plain_password, hashed_password):
        return pwd_context.verify(plain_password, hashed_password)

    @staticmethod
    def get_password_hash(password):
        return pwd_context.hash(password)

    @staticmethod
    def create_access_token(data: dict):
        to_encode = data.copy()
        expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
        to_encode.update({"exp": expire})
        return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

    @staticmethod
    def verify_token(token: str):
        try:
            payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
            return payload
        except JWTError:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Could not validate credentials"
            )

SyntaxError: Missing parentheses in call to 'print'. Did you mean print(...)? (jose.py, line 546)

In [23]:
from fastapi import HTTPException, status

class LeadScoringException:
    @staticmethod
    def lead_not_found(lead_id: int):
        return HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Lead with ID {lead_id} not found"
        )

    @staticmethod
    def invalid_lead_data():
        return HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail="Invalid lead data provided"
        )

    @staticmethod
    def unauthorized():
        return HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Could not validate credentials",
            headers={"WWW-Authenticate": "Bearer"}
        )

In [24]:
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List

from .database import create_tables, get_db
from .models import LeadModel
from .services.scoring_service import LeadScoringService
from .auth import AuthService
from .exceptions import LeadScoringException

app = FastAPI(title="Advanced Lead Scoring API")

@app.on_event("startup")
def startup():
    create_tables()

@app.post("/leads/")
def create_lead(
    lead_data: dict,
    db: Session = Depends(get_db)
):
    try:
        lead_score = LeadScoringService.calculate_lead_score(lead_data)

        lead = LeadModel(
            **lead_data,
            lead_score=lead_score
        )

        db.add(lead)
        db.commit()
        db.refresh(lead)

        return lead.to_dict()

    except Exception as e:
        raise LeadScoringException.invalid_lead_data()

@app.get("/leads/", response_model=List[dict])
def list_leads(
    db: Session = Depends(get_db),
    skip: int = 0,
    limit: int = 100
):
    leads = db.query(LeadModel).offset(skip).limit(limit).all()
    return [lead.to_dict() for lead in leads]

@app.get("/leads/{lead_id}")
def get_lead(
    lead_id: int,
    db: Session = Depends(get_db)
):
    lead = db.query(LeadModel).filter(LeadModel.id == lead_id).first()
    if not lead:
        raise LeadScoringException.lead_not_found(lead_id)
    return lead.to_dict()

ImportError: attempted relative import with no known parent package

In [25]:
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List

from .database import create_tables, get_db
from .models import LeadModel
from .services.scoring_service import LeadScoringService
from .auth import AuthService
from .exceptions import LeadScoringException

app = FastAPI(title="Advanced Lead Scoring API")

@app.on_event("startup")
def startup():
    create_tables()

@app.post("/leads/")
def create_lead(
    lead_data: dict,
    db: Session = Depends(get_db)
):
    try:
        lead_score = LeadScoringService.calculate_lead_score(lead_data)

        lead = LeadModel(
            **lead_data,
            lead_score=lead_score
        )

        db.add(lead)
        db.commit()
        db.refresh(lead)

        return lead.to_dict()

    except Exception as e:
        raise LeadScoringException.invalid_lead_data()

@app.get("/leads/", response_model=List[dict])
def list_leads(
    db: Session = Depends(get_db),
    skip: int = 0,
    limit: int = 100
):
    leads = db.query(LeadModel).offset(skip).limit(limit).all()
    return [lead.to_dict() for lead in leads]

@app.get("/leads/{lead_id}")
def get_lead(
    lead_id: int,
    db: Session = Depends(get_db)
):
    lead = db.query(LeadModel).filter(LeadModel.id == lead_id).first()
    if not lead:
        raise LeadScoringException.lead_not_found(lead_id)
    return lead.to_dict()

ImportError: attempted relative import with no known parent package

In [26]:
# tests/test_lead_scoring.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool

from app.main import app
from app.database import Base
from app.models import LeadModel
from app.services.scoring_service import LeadScoringService

# Test Database Setup
TEST_DATABASE_URL = "sqlite:///:memory:"
engine = create_engine(
    TEST_DATABASE_URL,
    connect_args={"check_same_thread": False},
    poolclass=StaticPool
)
TestingSessionLocal = sessionmaker(bind=engine)

@pytest.fixture
def test_db():
    Base.metadata.create_all(bind=engine)
    db = TestingSessionLocal()
    try:
        yield db
    finally:
        db.close()
        Base.metadata.drop_all(bind=engine)

@pytest.fixture
def client():
    return TestClient(app)

def test_create_lead(client, test_db):
    lead_data = {
        "company_name": "Test Company",
        "industry": "Technology",
        "annual_revenue": 2000000,
        "email": "test@example.com"
    }

    response = client.post("/leads/", json=lead_data)
    assert response.status_code == 200

    lead = response.json()
    assert lead['company_name'] == "Test Company"
    assert 'lead_score' in lead

def test_scoring_algorithm():
    lead_data = {
        "industry": "Technology",
        "annual_revenue": 5000000,
        "email": "corporate@techcompany.com"
    }

    score = LeadScoringService.calculate_lead_score(lead_data)
    assert 0 <= score <= 100
    assert score > 50  # High-quality lead

ModuleNotFoundError: No module named 'app'

In [27]:
pip install app

Collecting app
  Downloading app-0.0.1.zip (2.2 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: app
  Building wheel for app (setup.py) ... [?25l[?25hdone
  Created wheel for app: filename=app-0.0.1-py3-none-any.whl size=1508 sha256=080cb350d3c7705d6eb4160065fdd40cc13b88d86688e78e44b53322bacb2d2e
  Stored in directory: /root/.cache/pip/wheels/d4/41/8a/a765a5070346662ff4ad1df79a2c7fe777ccf8fe1dff9c2dcd
Successfully built app
Installing collected packages: app
Successfully installed app-0.0.1


In [40]:
import logging
import sys
from loguru import logger
import uuid

class InterceptHandler(logging.Handler):
    def emit(self, record):
        # Get corresponding Loguru level if it exists
        try:
            level = logger.level(record.levelname).name
        except ValueError:
            level = record.levelno

        # Find caller from where originated the logged message
        frame, depth = logging.currentframe(), 2
        while frame.f_code.co_filename == logging.__file__:
            frame = frame.f_back
            depth += 1

        logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage())

def setup_logging():
    logging.getLogger().handlers.clear()

    logging.basicConfig(handlers=[InterceptHandler()], level=logging.INFO)

    logger.remove()
    logger.add(
        sys.stderr,
        format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
        level="INFO"
    )
    logger.add(
        "logs/app_{time}.log",
        rotation="10 MB",
        level="INFO"
    )

    return logger

from fastapi import Request
import time

async def logging_middleware(request: Request, call_next):
    request_id = str(uuid.uuid4())
    logger.info(f"Request ID: {request_id} | {request.method} {request.url}")

    start_time = time.time()
    response = await call_next(request)

    process_time = time.time() - start_time
    logger.info(
        f"Request ID: {request_id} | "
        f"Status Code: {response.status_code} | "
        f"Process Time: {process_time:.4f} seconds"
    )

    return response

In [29]:
pip install loguru

Collecting loguru
  Downloading loguru-0.7.3-py3-none-any.whl.metadata (22 kB)
Downloading loguru-0.7.3-py3-none-any.whl (61 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.6/61.6 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: loguru
Successfully installed loguru-0.7.3


In [34]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
import mlflow
import mlflow.sklearn

class LeadPredictionModel:
    def __init__(self):
        self.model = None
        self.scaler = StandardScaler()

    def prepare_training_data(self, leads_df):
        features = [
            'annual_revenue',
            'lead_score',
            # Add more features
        ]

        X = leads_df[features]
        y = leads_df['converted']  # Assuming you have a conversion flag

        return X, y

    def train_model(self, leads_df):
        # MLflow tracking
        mlflow.set_experiment("lead_scoring_prediction")

        with mlflow.start_run():
            X, y = self.prepare_training_data(leads_df)

            X_train, X_test, y_train, y_test = train_test_split(
                X, y, test_size=0.2, random_state=42
            )

            X_train_scaled = self.scaler.fit_transform(X_train)
            X_test_scaled = self.scaler.transform(X_test)

            self.model = RandomForestClassifier(n_estimators=100)
            self.model.fit(X_train_scaled, y_train)

            mlflow.log_metric("accuracy", self.model.score(X_test_scaled, y_test))
            mlflow.sklearn.log_model(self.model, "lead_prediction_model")

    def predict_conversion_probability(self, lead_data):
        if not self.model:
            raise ValueError("Model not trained")

        features = np.array([
            lead_data['annual_revenue'],
            lead_data['lead_score']
        ]).reshape(1, -1)

        scaled_features = self.scaler.transform(features)

        return self.model.predict_proba(scaled_features)[0][1]

In [32]:
pip install mlflow

Collecting mlflow
  Downloading mlflow-2.20.1-py3-none-any.whl.metadata (30 kB)
Collecting mlflow-skinny==2.20.1 (from mlflow)
  Downloading mlflow_skinny-2.20.1-py3-none-any.whl.metadata (31 kB)
Collecting alembic!=1.10.0,<2 (from mlflow)
  Downloading alembic-1.14.1-py3-none-any.whl.metadata (7.4 kB)
Collecting docker<8,>=4.0.0 (from mlflow)
  Downloading docker-7.1.0-py3-none-any.whl.metadata (3.8 kB)
Collecting graphene<4 (from mlflow)
  Downloading graphene-3.4.3-py2.py3-none-any.whl.metadata (6.9 kB)
Collecting gunicorn<24 (from mlflow)
  Downloading gunicorn-23.0.0-py3-none-any.whl.metadata (4.4 kB)
Collecting databricks-sdk<1,>=0.20.0 (from mlflow-skinny==2.20.1->mlflow)
  Downloading databricks_sdk-0.43.0-py3-none-any.whl.metadata (38 kB)
Collecting Mako (from alembic!=1.10.0,<2->mlflow)
  Downloading Mako-1.3.8-py3-none-any.whl.metadata (2.9 kB)
Collecting graphql-core<3.3,>=3.1 (from graphene<4->mlflow)
  Downloading graphql_core-3.2.6-py3-none-any.whl.metadata (11 kB)
Colle

In [38]:
FROM python:3.9-slim

WORKDIR /app

RUN apt-get update && apt-get install -y \
    build-essential \
    postgresql-client

COPY requirements.txt .

RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

SyntaxError: invalid syntax (<ipython-input-38-e54a7008367c>, line 1)

In [39]:
name: Lead Scoring API CI/CD

on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]

jobs:
  test:
    runs-on: ubuntu-latest

    steps:
    - uses: actions/checkout@v2

    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: 3.9

    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt

    - name: Run tests
      run: pytest tests/

  deploy:
    needs: test
    runs-on: ubuntu-latest

    steps:
    - uses: actions/checkout@v2

    - name: Build and push Docker image
      uses: docker/build-push-action@v2
      with:
        context: .
        push: true
        tags: your-dockerhub-username/lead-scoring-api:latest

SyntaxError: invalid syntax (<ipython-input-39-90d75fd43b39>, line 1)

In [37]:
# app/config.py
from pydantic import BaseSettings

class Settings(BaseSettings):
    # Database Configuration
    DATABASE_URL: str
    SECRET_KEY: str

    # ML Model Configuration
    MODEL_PATH: str

    # Logging Configuration
    LOG_LEVEL: str = "INFO"

    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"

settings = Settings()

PydanticImportError: `BaseSettings` has been moved to the `pydantic-settings` package. See https://docs.pydantic.dev/2.10/migration/#basesettings-has-moved-to-pydantic-settings for more details.

For further information visit https://errors.pydantic.dev/2.10/u/import-error