In [0]:
## TODO Will want access to input vendor set used by norme


In [0]:
import json
from datetime import datetime
import uuid
import requests
import pandas as pd
from io import StringIO

Customer_Name = "tmobile"
Env = "prod"
api_token = dbutils.secrets.get(scope="suplari-secrets", key=f"{Customer_Name}-{Env}")

suplari_env = f"{Customer_Name}/initial40_databricks_test"

# Generate a unique X-Suplari-Request-ID with Databricks identifier
request_id = f"Databricks-GetNormalizationReport-{datetime.now().strftime('%Y%m%d')}-{uuid.uuid4().hex[:8]}"

url = f'https://{Customer_Name}.suplari.com/v1/normalization/report'
headers = {
      'Authorization': f'Bearer {api_token}',
      'Content-Type': 'application/json',
      'X-Suplari-Environment': suplari_env,
      'X-Suplari-Request-ID': request_id
    }
response = requests.get(url, headers=headers)

data = StringIO(response.text)
# df = spark.createDataFrame(pd.read_csv(data))

In [0]:
pd.read_csv(data)

In [0]:
response.text

Example Vendor Normzalization Plan
Unit tests for each component
Integration tests for the full pipeline
Validation tests to ensure data quality

Implementation Timeline

1. Environment setup and configuration - 1 day
2. Data access implementations - 2 days
3. Normalization logic implementation - 3 days
4. Pipeline orchestration - 2 days
5. Testing and validation - 3 days
6. Documentation and deployment - 1 day

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("VendorNormalization") \
    .getOrCreate()

In [0]:
def extract_data(input_path, reference_table_path):
    # Load input data from S3
    input_vendors = spark.read.format("csv") \
        .option("header", "true") \
        .load(input_path)
    
    # Load reference table
    reference_table = spark.read.table(reference_table_path)
    
    return input_vendors, reference_table

In [0]:
def normalize_vendors(input_vendors, reference_table):
    # Join with reference table
    normalized_vendors = input_vendors.join(
        reference_table,
        input_vendors.original_vendor == reference_table.original_vendor,
        "left_outer"
    )
    
    # Apply normalization logic - use existing normalization if available
    normalized_vendors = normalized_vendors.withColumn(
        "normalized_vendor",
        when(col("is_normalized") == True, col("normalized_vendor"))
        .otherwise(col("original_vendor"))
    )
    
    # Select required columns for output
    result = normalized_vendors.select(
        "original_vendor",
        "normalized_vendor",
        "website"
    )
    
    return result

In [0]:
def load_data(normalized_data, output_path):
    # Write results to output location
    normalized_data.write \
        .mode("overwrite") \
        .format("csv") \
        .option("header", "true") \
        .save(output_path)

In [0]:
def run_normalization_pipeline(input_path, reference_table_path, output_path):
    try:
        # Extract
        input_vendors, reference_table = extract_data(input_path, reference_table_path)
        
        # Transform
        normalized_vendors = normalize_vendors(input_vendors, reference_table)
        
        # Load
        load_data(normalized_vendors, output_path)
        
        return True, "Normalization completed successfully"
    except Exception as e:
        return False, f"Error in normalization process: {str(e)}"

In [0]:
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Example usage with error handling
def execute_with_retry(func, max_retries=3, *args, **kwargs):
    retries = 0
    while retries < max_retries:
        try:
            return func(*args, **kwargs)
        except Exception as e:
            logger.warning(f"Attempt {retries+1} failed: {str(e)}")
            retries += 1
            if retries == max_retries:
                logger.error(f"Max retries reached. Operation failed.")
                raise