In [0]:
# Databricks notebook source
import time
from pyspark.sql.functions import col
start = time.time()
import datetime
import os
import re
import shutil

In [0]:
dbutils.widgets.text("from_date", "")
from_date = dbutils.widgets.get("from_date")

dbutils.widgets.text("to_date", "")
to_date = dbutils.widgets.get("to_date")

dbutils.widgets.text("country", "")
country = dbutils.widgets.get("country") 

dbutils.widgets.text("dealer_status", "")
dealer_status = dbutils.widgets.get("dealer_status")

dbutils.widgets.text("enrollment_status", "")
enrollment_status = dbutils.widgets.get("enrollment_status")

dbutils.widgets.text("product_name", "")
product_name = dbutils.widgets.get("product_name")

dbutils.widgets.text("email_from", "")
email_from = dbutils.widgets.get("email_from")

dbutils.widgets.text("email_to", "")
email_to = dbutils.widgets.get("email_to")

dbutils.widgets.text("email_cc", "")
email_cc = dbutils.widgets.get("email_cc")

dbutils.widgets.text("dir_tmp", "")
dir_tmp = dbutils.widgets.get("dir_tmp")

dbutils.widgets.text("env", "")
env = dbutils.widgets.get("env")

# NEW WIDGET: To pass the SendGrid API Key securely without using Secret Manager
dbutils.widgets.text("sendgrid_api_key", "", "SendGrid API Key")
sendgrid_api_key = dbutils.widgets.get("sendgrid_api_key")

In [0]:
# Define parameters
current_date = datetime.datetime.now()
execution_id = current_date.strftime('%Y-%m-%d')

# Date range logic
if from_date and to_date:
    # Use provided dates
    final_from_date = from_date
    final_to_date = to_date
    print(f"Using provided date range: {final_from_date} to {final_to_date}")
else:
    # Default to previous calendar month
    # Get first day of current month
    first_day_current_month = current_date.replace(day=1) # 16 Oct --> 01 Oct
    
    # Get last day of previous month
    last_day_prev_month = first_day_current_month - datetime.timedelta(days=1) # 30 Sept
    
    # Get first day of previous month
    first_day_prev_month = last_day_prev_month.replace(day=1) # 01 Sept
    
    final_from_date = first_day_prev_month.strftime('%Y-%m-%d')
    final_to_date = last_day_prev_month.strftime('%Y-%m-%d')
    print(f"No dates provided. Defaulting to previous month: {final_from_date} to {final_to_date}")

# Use the actual date range for reporting labels
report_start = datetime.datetime.strptime(final_from_date, '%Y-%m-%d')
report_end = datetime.datetime.strptime(final_to_date, '%Y-%m-%d')

# Update report labels to reflect the actual date range
if report_start.month == report_end.month and report_start.year == report_end.year:
    # Same month
    report_month = report_start.strftime('%B %Y')
    report_year_month = report_start.strftime('%Y-%m')
else:
    # Date range spans multiple months
    report_month = f"{report_start.strftime('%B %Y')} to {report_end.strftime('%B %Y')}"
    report_year_month = f"{report_start.strftime('%Y-%m')}_to_{report_end.strftime('%Y-%m')}"

csv_filename = f"Dealer_Enrollment_Activation_Report_{report_year_month}.csv"

# Define DBFS base path
base_dir = dir_tmp
temp_dir = f"{base_dir}/{execution_id}"
archive_dir = f"{base_dir}/archive"
temp_output = f"{temp_dir}/output"
temp_csv = f"{temp_dir}/{csv_filename}"

# Delete old temporary files
print(f"Cleaning up old files in temp directory: {temp_dir}")
dbutils.fs.rm(temp_dir, True)

# Create directory in DBFS
dbutils.fs.mkdirs(temp_dir)
dbutils.fs.mkdirs(archive_dir)

In [0]:
print("Extracting dealer enrollment data...")

query = f"""
SELECT 
    pacode, dealer_name, brand_type, brand_name, product_name, sku_name, 
    enrollment_type, termination_reason, 
    to_date(enrollment_ts) as enrollment_date, 
    to_date(activation_ts) as activation_date, 
    DATE_FORMAT(activation_ts, 'yyyy-MM') as year_month,
    'Activation' as category, 
    COUNT(*) as count
FROM uc_sp.sales_reporting.enrollment_status_log 
WHERE country = '{country}'
    AND dealer_status = '{dealer_status}'
    AND enrollment_status = 'Enrolled'
    AND product_name = '{product_name}'
    AND enrollment_type NOT IN ('New Enrollment', 'Termination')
    AND to_date(activation_ts) >= '{final_from_date}'
    AND to_date(activation_ts) <= '{final_to_date}'
    AND activation_ts IS NOT NULL
GROUP BY pacode, dealer_name, brand_type, brand_name, product_name, sku_name, 
         enrollment_type, termination_reason, to_date(enrollment_ts), 
         to_date(activation_ts), DATE_FORMAT(activation_ts, 'yyyy-MM')

UNION

SELECT 
    pacode, dealer_name, brand_type, brand_name, product_name, sku_name, 
    enrollment_type, termination_reason, 
    to_date(enrollment_ts) as enrollment_date, 
    to_date(activation_ts) as activation_date, 
    DATE_FORMAT(enrollment_ts, 'yyyy-MM') as year_month,
    'New' as category, 
    COUNT(*) as count
FROM uc_sp.sales_reporting.enrollment_status_log
WHERE country = '{country}'
    AND dealer_status = '{dealer_status}'
    AND enrollment_status = 'Enrolled'
    AND product_name = '{product_name}'
    AND enrollment_type = 'New Enrollment'
    AND to_date(enrollment_ts) >= '{final_from_date}'
    AND to_date(enrollment_ts) <= '{final_to_date}'
    AND enrollment_ts IS NOT NULL
GROUP BY pacode, dealer_name, brand_type, brand_name, product_name, sku_name, 
         enrollment_type, termination_reason, to_date(enrollment_ts), 
         to_date(activation_ts), DATE_FORMAT(enrollment_ts, 'yyyy-MM')

ORDER BY year_month DESC, pacode, category
"""

# Execute query
result_df = spark.sql(query)

# Get counts by category
total_records = result_df.count()
activation_count = result_df.filter(col("category") == "Activation").count()
new_enrollment_count = result_df.filter(col("category") == "New").count()
print(f"Extracted {total_records:,} total records (New: {new_enrollment_count:,}, Activation: {activation_count:,})")
print(f"Date range: {final_from_date} to {final_to_date}")

display(result_df)

In [0]:
print("Writing result to CSV...")

# Write to output directory
result_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(temp_output)

# Remove non-part files - Replaced missing shared_functions.remove_non_part_files
print(f" Removing non-essential files from {temp_output}")
output_files = dbutils.fs.ls(temp_output)
for f in output_files:
    # Retain the single 'part-' file and the '_SUCCESS' file
    if not (f.name.startswith("part-") or f.name == "_SUCCESS"):
        # Delete other metadata/temp files
        print(f"  Removing non-essential file: {f.path}")
        dbutils.fs.rm(f.path)

# List files and locate the part file
files = dbutils.fs.ls(temp_output)
part_file = next((f.path for f in files if f.name.startswith("part-") and f.name.endswith(".csv")), None)

if not part_file:
    raise Exception("No part CSV file found after writing.")

# Move part file to final filename
dbutils.fs.mv(part_file, temp_csv)
print(f"CSV saved as: {temp_csv}")

In [0]:
# NOTE: The function now accepts 'api_key' as an argument, replacing the secret manager call.
def send_email(from_email, to_email, subject, message, env, cc_email='', body_type='', attachment_path=None, attachment_name=None, api_key=None):
    import ssl
    import smtplib
    from email.mime.text import MIMEText
    from email.mime.base import MIMEBase
    from email.mime.multipart import MIMEMultipart
    from email import encoders

    if not api_key:
        raise ValueError("API Key is missing. Please provide the SendGrid API Key.")
        
    html = True if body_type.lower() == 'html' else False
    sendgrid_api_key = api_key # CHANGED: Uses the API key passed as an argument

    # Use ssl.create_default_context() for a client connection.
    context = ssl.create_default_context()
    context.minimum_version = ssl.TLSVersion.TLSv1_2

    # Create a multipart message and set headers
    msg = MIMEMultipart()
    msg['From'] = from_email
    msg['To'] = ",".join([email.strip() for email in re.split(pattern='[,;]', string=to_email)])
    msg['Cc'] = ",".join([email.strip() for email in re.split(pattern='[,;]', string=cc_email)]) if cc_email else ''
    msg['Subject'] = subject

    # Prepare the list of all recipients for smtplib.sendmail()
    to_email_list = re.split(pattern='[,;]', string=to_email) + (re.split(pattern='[,;]', string=cc_email) if cc_email else [])

    # Attach body
    msg.attach(MIMEText(message, 'html')) if html else msg.attach(MIMEText(message, 'plain'))

    # Attach file if provided
    if attachment_path and attachment_name:
        # Read the file from DBFS
        with open(attachment_path.replace("dbfs:/", "/dbfs/"), "rb") as attachment:
            part = MIMEBase('application', 'octet-stream')
            part.set_payload(attachment.read())
        
        # Encode file in ASCII characters to send by email    
        encoders.encode_base64(part)
        
        # Add header with the file name
        part.add_header(
            'Content-Disposition',
            f'attachment; filename= {attachment_name}',
        )
        
        msg.attach(part)
        print(f"  Attached file: {attachment_name}")

    # Create SMTP session
    with smtplib.SMTP_SSL("smtp.sendgrid.net", 465, context=context) as server:
        server.login(user='apikey', password=sendgrid_api_key)
        server.sendmail(from_email, to_email_list, msg.as_string())

In [0]:
print("Preparing to send email...")

# Calculate file size for display
file_info = dbutils.fs.ls(temp_csv)[0]
file_size_mb = file_info.size / (1024 * 1024)

# Check file size against SendGrid limits
MAX_ATTACHMENT_SIZE_MB = 25  # Stay under 30 MB limit with buffer

if file_size_mb > MAX_ATTACHMENT_SIZE_MB:
    print(f"WARNING: File size ({file_size_mb:.2f} MB) exceeds recommended limit!")
    print("Consider uploading to a shared location and sending a link instead.")

email_subject = f"Dealer Extract Report - {report_month}"

# --- UPDATED EMAIL BODY SECTIONS ---

# 1. Build environment banner if non-production
env_banner = ""
if env.lower() in ["dev", "qa"]:
    env_banner = f"""
    <div style="font-family: 'Ford Antenna', Arial, sans-serif; background-color: #fff8e1; border: 1px solid #ffc107; padding: 15px; margin-bottom: 25px; border-radius: 6px; font-size: 14px;">
        <strong style="color: #856404; display: block; margin-bottom: 5px;">⚠️ {env.upper()} ENVIRONMENT WARNING</strong>
        <p style="color: #856404; margin: 0; font-size: 14px;">
            This report was generated in the **{env.upper()}** environment and should be treated as a test/development output.
        </p>
    </div>
    """
    
# 2. Introduction
intro_section = f"""
<div style="font-family: 'Ford Antenna', Arial, sans-serif; color: #333; line-height: 1.6; font-size: 15px;">
    <p style="margin-bottom: 15px;">Hello,</p>
    <p style="margin-bottom: 25px;">
        The **Dealer Enrollment Report** for the period **{final_from_date} to {final_to_date}** has been successfully generated. 
        {'<span style="color: #00095B; font-weight: bold;">(Environment: ' + env.upper() + ')</span>' if env.lower() in ['dev', 'qa'] else ''}
    </p>
</div>
"""

# 3. HTML Table (The previously updated section)
html_table = f"""
<div style="font-family: 'Ford Antenna', Arial, sans-serif; max-width: 600px; margin: 20px auto; padding: 0 10px;">
    <h2 style="color: #00095B; margin-bottom: 20px; font-size: 24px; font-weight: 700; border-bottom: 3px solid #00095B; padding-bottom: 5px;">📊 Report Summary</h2>
    <table role="presentation" width='100%' border='0' cellpadding='0' cellspacing='0' style='border-collapse: collapse; box-shadow: 0 6px 15px rgba(0,0,0,0.1); border-radius: 8px; overflow: hidden; background-color: #ffffff;'>
        <thead>
            <tr style='background-color: #00095B;'>
                <th style='color: white; text-align: left; font-size: 16px; font-weight: 700; padding: 15px 20px; width: 50%;'>Metric</th>
                <th style='color: white; text-align: left; font-size: 16px; font-weight: 700; padding: 15px 20px; width: 50%;'>Value</th>
            </tr>
        </thead>
        <tbody>
            <tr style='background-color: #f7f7f9;'>
                <td style='padding: 12px 20px; border-bottom: 1px solid #eee; color: #555; font-weight: 500; font-size: 14px;'>📅 Report Period</td>
                <td style='padding: 12px 20px; border-bottom: 1px solid #eee; color: #00095B; font-weight: bold; font-size: 15px;'>{report_month}</td>
            </tr>
            <tr style='background-color: #ffffff;'>
                <td style='padding: 12px 20px; border-bottom: 1px solid #eee; color: #555; font-weight: 500; font-size: 14px;'>📆 Date Range</td>
                <td style='padding: 12px 20px; border-bottom: 1px solid #eee; color: #00095B; font-weight: bold; font-size: 15px;'>{final_from_date} to {final_to_date}</td>
            </tr>
            
            <tr style='background-color: #fff8e1; border-left: 4px solid #ffc107;'>
                <td style='padding: 15px 20px; border-bottom: 1px solid #ffe0b2; color: #333; font-weight: 600; font-size: 16px;'>📋 Total Records</td>
                <td style='padding: 15px 20px; border-bottom: 1px solid #ffe0b2; color: #e65100; font-weight: bold; font-size: 21px;'>{total_records:,}</td>
            </tr>
            <tr style='background-color: #e8f5e9; border-left: 4px solid #4caf50;'>
                <td style='padding: 15px 20px; border-bottom: 1px solid #c8e6c9; color: #333; font-weight: 600; font-size: 16px;'>✅ New Enrollments</td>
                <td style='padding: 15px 20px; border-bottom: 1px solid #c8e6c9; color: #2e7d32; font-weight: bold; font-size: 21px;'>{new_enrollment_count:,}</td>
            </tr>
            <tr style='background-color: #e3f2fd; border-left: 4px solid #2196f3;'>
                <td style='padding: 15px 20px; border-bottom: 1px solid #bbdefb; color: #333; font-weight: 600; font-size: 16px;'>🚀 Activations</td>
                <td style='padding: 15px 20px; border-bottom: 1px solid #bbdefb; color: #1976d2; font-weight: bold; font-size: 21px;'>{activation_count:,}</td>
            </tr>

            <tr style='background-color: #f7f7f9;'>
                <td style='padding: 12px 20px; border-bottom: 1px solid #eee; color: #555; font-weight: 500; font-size: 14px;'>📄 File Name</td>
                <td style='padding: 12px 20px; border-bottom: 1px solid #eee; color: #777; font-family: monospace; font-size: 13px;'>{csv_filename}</td>
            </tr>
            <tr style='background-color: #ffffff;'>
                <td style='padding: 12px 20px; color: #555; font-weight: 500; font-size: 14px;'>💾 File Size</td>
                <td style='padding: 12px 20px; color: #00095B; font-weight: bold; font-size: 15px;'>{file_size_mb:.2f} MB</td>
            </tr>
        </tbody>
    </table>
</div>
"""

# 4. Attachment notice
attachment_notice = f"""
<div style="font-family: 'Ford Antenna', Arial, sans-serif; margin-top: 30px; padding: 15px; background-color: #f0f4f7; border-left: 4px solid #00095B; border-radius: 4px;">
    <p style="font-size: 15px; color: #333; margin: 0;">
        <strong style="color: #00095B;">📎 Attachment Note:</strong> Please find the detailed report attached in the CSV file **{csv_filename}**.
    </p>
</div>
"""

# 5. Signature
signature = f"""
<div style="font-family: 'Ford Antenna', Arial, sans-serif; margin-top: 40px; padding-top: 20px; border-top: 1px solid #ddd; color: #333;">
    <p style="font-size: 15px; margin: 0;">Thank you,</p>
    <p style="font-size: 16px; color: #00095B; font-weight: bold; margin: 8px 0 0 0;">CRM Report Automation</p>
    <p style="font-size: 13px; color: #555; margin: 2px 0 0 0;">(This is an automated email. Please do not reply.)</p>
</div>
"""

# Combine all sections
email_body = env_banner + intro_section + html_table + attachment_notice + signature

# Send email with attachment
send_email(
    from_email=email_from,
    to_email=email_to,
    subject=email_subject,
    message=email_body,
    env=env,
    cc_email=email_cc,
    body_type='html',
    attachment_path=temp_csv,
    attachment_name=csv_filename,
    api_key=sendgrid_api_key
)

# Copy file to archive
print("Archiving the report file...")
dbutils.fs.cp(temp_csv, f"{archive_dir}/{csv_filename}")

print(f"Report archived to: {archive_dir}/{csv_filename}")
print(f"Total execution time: {(time.time() - start):.2f} seconds")

In [0]:
# print(f"Cleaning up temp_dir: {temp_dir}")
# dbutils.fs.rm(temp_dir, recurse=True)
# print("Cleanup complete.")

# Optional: signal success to parent notebook
# dbutils.notebook.exit('{"status": "SUCCESS"}')