In [None]:
import requests
import pandas as pd
from io import StringIO
from sqlalchemy import create_engine, text
from urllib.parse import quote_plus
import subprocess
from datetime import datetime
import psycopg2
import os
import pickle
import base64
import re
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from email.mime.text import MIMEText
from dotenv import load_dotenv
import logging

# Load environment variables
load_dotenv()

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('pipeline.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# ---------------------
# CONFIG FROM ENVIRONMENT
# ---------------------

sheet_sources = {
    "Lagos Midweek": os.getenv("SHEET_LAGOS_MIDWEEK"),
    "Lagos Weekend": os.getenv("SHEET_LAGOS_WEEKEND"),
    "Abuja Midweek": os.getenv("SHEET_ABUJA_MIDWEEK"),
    "Abuja Weekend": os.getenv("SHEET_ABUJA_WEEKEND")
}

sheet_metadata = {
    "Lagos Midweek": {"location": "Lagos", "course": "Data Analytics"},
    "Lagos Weekend": {"location": "Lagos", "course": "Data Analytics"},
    "Abuja Midweek": {"location": "Abuja", "course": "Data Analytics"},
    "Abuja Weekend": {"location": "Abuja", "course": "Data Analytics"}
}

# PostgreSQL DB connection from environment
db_user = os.getenv('DB_USER')
db_password = quote_plus(os.getenv('DB_PASSWORD'))
db_host = os.getenv('DB_HOST', 'localhost')
db_port = os.getenv('DB_PORT', '5432')
db_name = os.getenv('DB_NAME')
connection_str = f'postgresql+psycopg2://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}'
engine = create_engine(connection_str)

# Gmail API config
SCOPES = ['https://www.googleapis.com/auth/gmail.send']
CLIENT_SECRET_FILE = os.getenv('CLIENT_SECRET_FILE')
TOKEN_FILE = 'token.pickle'

# DBT project path
DBT_PROJECT_PATH = os.getenv('DBT_PROJECT_PATH')

# ---------------------
# DATA CLEANING FUNCTIONS
# ---------------------

def clean_column_names(df):
    df.columns = (
        df.columns
        .str.strip()
        .str.replace(r'[^\w\s]', '', regex=True)
        .str.replace(r'\s+', '_', regex=True)
        .str.replace(r'__+', '_', regex=True)
        .str.lower()
    )
    return df

def clean_dates(df, date_cols):
    for col in date_cols:
        if col in df.columns:
            df[col] = pd.to_datetime(df[col], errors='coerce').dt.date
    return df

def better_clean_numerics(df, num_cols):
    for col in num_cols:
        if col in df.columns:
            df[col] = df[col].astype(str).str.replace(r"[^\d\.\-]", "", regex=True)
            df[col] = df[col].replace('', None)
            df[col] = pd.to_numeric(df[col], errors='coerce')
    return df

# ---------------------
# FULL DATA PIPELINE
# ---------------------

def refresh_data():
    logger.info("Starting data refresh...")
    dfs = []
    for source, url in sheet_sources.items():
        try:
            response = requests.get(url, timeout=30)
            if response.status_code == 200:
                df = pd.read_csv(StringIO(response.text), dtype=str)
                df = clean_column_names(df)
                df["source"] = source
                df["location"] = sheet_metadata[source]["location"]
                df["course"] = sheet_metadata[source]["course"]
                dfs.append(df)
                logger.info(f"‚úÖ Successfully loaded: {source}")
            else:
                logger.error(f"‚ùå Failed to load: {source} (Status: {response.status_code})")
        except Exception as e:
            logger.error(f"‚ùå Error loading {source}: {e}")
    
    if not dfs:
        logger.error("No data loaded from any source!")
        return None
    
    full_df = pd.concat(dfs, ignore_index=True)

    date_columns = ['timestamp', 'payment_date', 'payment_date_2nd', 'payment_date_3rd', 'start_date', 'ssdate', 'sub_end']
    numeric_columns = ['amount', '1st_installment', '2nd', '3rd', 'sub_days', 'sub_left', 'on_a_scale_of_1_10_what_is_your_skill_level_in_data_analytics']

    full_df = clean_dates(full_df, date_columns)
    full_df = better_clean_numerics(full_df, numeric_columns)
    full_df = full_df.where(pd.notnull(full_df), None)

    logger.info(f"Data refresh complete. Total rows: {len(full_df)}")
    return full_df

def load_to_postgres(df):
    if df is None or df.empty:
        logger.error("Cannot load empty dataframe to PostgreSQL")
        return False
    
    try:
        with engine.begin() as conn:
            conn.execute(text("TRUNCATE TABLE global_table;"))
        df.to_sql('global_table', engine, if_exists='append', index=False, method='multi')
        logger.info("‚úÖ Table truncated and refreshed.")
        return True
    except Exception as e:
        logger.error(f"‚ùå Failed to load to PostgreSQL: {e}")
        return False

def run_dbt():
    try:
        subprocess.run(
            ["dbt", "run", "--full-refresh"], 
            cwd=DBT_PROJECT_PATH,
            check=True,
            capture_output=True,
            text=True
        )
        logger.info("‚úÖ DBT models refreshed.")
        return True
    except subprocess.CalledProcessError as e:
        logger.error(f"‚ùå DBT run failed: {e.stderr}")
        return False
    except Exception as e:
        logger.error(f"‚ùå Error running DBT: {e}")
        return False

def get_milestone_candidates():
    try:
        conn = psycopg2.connect(
            dbname=db_name, 
            user=db_user, 
            password=os.getenv('DB_PASSWORD'), 
            host=db_host, 
            port=db_port
        )
        cur = conn.cursor()
        query = """
        SELECT 
            candid, 
            start_date, 
            adjusted_end_date, 
            (
                (sub_days + COALESCE(paused_days, 0) + COALESCE(extra_days, 0)) 
                - (CURRENT_DATE - start_date)
            ) AS days_left, 
            email
        FROM subscription_status
        WHERE (
                (sub_days + COALESCE(paused_days, 0) + COALESCE(extra_days, 0)) 
                - (CURRENT_DATE - start_date)
              ) IN (90, 60, 30, 10, 0)
           OR start_date = CURRENT_DATE
        """
        cur.execute(query)
        data = cur.fetchall()
        cur.close()
        conn.close()
        logger.info(f"Found {len(data)} milestone candidates")
        return data
    except Exception as e:
        logger.error(f"‚ùå Error fetching milestone candidates: {e}")
        return []

# ---------------------
# GMAIL API AUTH
# ---------------------

def gmail_auth():
    try:
        creds = None
        if os.path.exists(TOKEN_FILE):
            with open(TOKEN_FILE, 'rb') as token:
                creds = pickle.load(token)
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRET_FILE, SCOPES)
                creds = flow.run_local_server(port=0)
            with open(TOKEN_FILE, 'wb') as token:
                pickle.dump(creds, token)
        logger.info("‚úÖ Gmail authentication successful")
        return build('gmail', 'v1', credentials=creds)
    except Exception as e:
        logger.error(f"‚ùå Gmail authentication failed: {e}")
        return None

# ---------------------
# SEND EMAIL
# ---------------------

def send_email_gmail(name, days_left, email, service):
    if service is None:
        logger.error("Gmail service not initialized")
        return False
    
    if days_left == 90:
        body = f"""Hi {name},\n\nWelcome to Datapluga! Your 90-day subscription begins today!\n"""
    elif days_left == 60:
        body = f"Hi {name}, 30 days in! Keep pushing üí™."
    elif days_left == 30:
        body = f"Hi {name}, 60 days down, just 30 more to go."
    elif days_left == 10:
        body = f"Hi {name}, only 10 days left. Wrap up strong!"
    elif days_left == 0:
        body = f"Hi {name}, your subscription has expired. Please consider renewing."
    else:
        return False

    message = MIMEText(body)
    message['to'] = email
    message['from'] = 'me'
    message['subject'] = 'üì¨ Datapluga Subscription Update'

    raw_message = {'raw': base64.urlsafe_b64encode(message.as_bytes()).decode()}

    try:
        service.users().messages().send(userId='me', body=raw_message).execute()
        logger.info(f"üì© Email sent to {name} ({email})")
        return True
    except Exception as e:
        logger.error(f"‚ùå Failed to send email to {name}: {e}")
        return False

# ---------------------
# MAIN WORKFLOW
# ---------------------

def main():
    logger.info(f"\n‚è∞ Pipeline started at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    
    try:
        # Step 1: Refresh data
        df = refresh_data()
        if df is None:
            logger.error("Data refresh failed. Aborting pipeline.")
            return False
        
        # Step 2: Load to PostgreSQL
        if not load_to_postgres(df):
            logger.error("PostgreSQL load failed. Aborting pipeline.")
            return False
        
        # Step 3: Run DBT
        if not run_dbt():
            logger.error("DBT run failed. Aborting pipeline.")
            return False
        
        # Step 4: Send emails
        service = gmail_auth()
        if service:
            milestones = get_milestone_candidates()
            for row in milestones:
                candid, start_date, end_date, days_left, email = row
                send_email_gmail(candid, days_left, email, service)
        
        logger.info(f"‚úÖ Pipeline completed successfully at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        return True
        
    except Exception as e:
        logger.error(f"‚ùå Pipeline failed with error: {e}")
        return False

if __name__ == "__main__":
    main()


‚è∞ Run started at 2025-06-19 22:54:29


  df[col] = pd.to_datetime(df[col], errors='coerce').dt.date
  df[col] = pd.to_datetime(df[col], errors='coerce').dt.date
  df[col] = pd.to_datetime(df[col], errors='coerce').dt.date
  df[col] = pd.to_datetime(df[col], errors='coerce').dt.date


‚úÖ Table truncated and refreshed.
‚úÖ DBT models refreshed.
üì© Email sent to DMCABJ22001599 (emmanueljjustice@gmail.com)
üì© Email sent to DMCABJ22001600 (emmanueljjustice@gmail.com)
üì© Email sent to DMCABJ22001601 (emmanueljjustice@gmail.com)
üì© Email sent to DMCABJ22001603 (emmanueljjustice@gmail.com)
