In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from datetime import datetime
import json
from azure.cosmos import CosmosClient, exceptions  # Added


print("✓ All libraries imported successfully")



In [0]:
print(dbutils.secrets.listScopes())
print(dbutils.secrets.list("cosmosdb"))

secret_value = dbutils.secrets.get(scope="cosmosdb", key="connection-string")
print(f"Retrived secret (hiddn): {secret_value}")


In [0]:
from pyspark.sql.functions import lit

# Define file paths with their respective names
file_details = [
    {"path": "dbfs:/dbfs/tmp/parquet_files/bushings.parquet", "name": "bushings"},
    {"path": "dbfs:/dbfs/tmp/parquet_files/gaskets.parquet", "name": "gaskets"},
    {"path": "dbfs:/dbfs/tmp/parquet_files/screws___bolts.parquet", "name": "screws__bolts"}
]

# Process each file independently
for file_detail in file_details:
    file_path = file_detail["path"]
    file_name = file_detail["name"]
    
    # Read the Parquet file
    df = spark.read.parquet(file_path)
    
    # Add the file name as a new column
    df_with_file_name = df.withColumn("family", lit(file_name))

    # Delete the nul rows
    cleaned_df = df_with_file_name.filter(
        F.col("part_number").isNotNull() &
        F.col("category").isNotNull() &
        F.col("subcategory").isNotNull()
    )
    
    # Process the DataFrame (Example: display it or apply transformations)
    print(f"Processing file: {file_name}")
    display(cleaned_df)
    cleaned_df.printSchema()
    
    # For example, save it back to another location or perform analytics
    cleaned_df.write.mode('overwrite').parquet(f"dbfs:/dbfs/tmp/parquet_files/{file_name}{1}.parquet")

In [0]:
# Cell: Configuration
from pyspark.sql import functions as F
from datetime import datetime
import json

# File paths in DBFS
FILE_PATHS = {
    "Bushings": "dbfs:/dbfs/tmp/parquet_files/bushings1.parquet",
    "Gaskets": "dbfs:/dbfs/tmp/parquet_files/gaskets1.parquet",
    "Screws__Bolts": "dbfs:/dbfs/tmp/parquet_files/screws__bolts1.parquet"
}

# CosmosDB NoSQL API Configuration (NOT MongoDB)
try:
    COSMOS_ENDPOINT = ""
    COSMOS_KEY = ""
except:
    # Fallback: Direct credentials (NOT recommended for production)
    COSMOS_ENDPOINT = ""
    COSMOS_KEY = ""
    print("⚠ Using direct credentials - consider using Databricks Secrets")

DB_NAME = "business"
CONTAINER_NAME = "Feature_Statistics"

# Configure Spark Cosmos DB Connector
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", COSMOS_ENDPOINT)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", COSMOS_KEY)

# Classification columns
CLASSIFICATION_COLUMNS = ["family"]

# Generate batch ID
BATCH_ID = f"BATCH-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}"

print(f"\nConfiguration:")
print(f"  Cosmos Endpoint: {COSMOS_ENDPOINT}")
print(f"  Database: {DB_NAME}")
print(f"  Container: {CONTAINER_NAME}")
print(f"  Batch ID: {BATCH_ID}")
print(f"  Classification columns: {CLASSIFICATION_COLUMNS}")
print(f"  Files to process: {len(FILE_PATHS)}")
print("✓ Spark Cosmos DB Connector configured")

In [0]:
def is_numeric_column(dtype):
    """Check if column is numeric (continuous data)"""
    numeric_types = [
        'int', 'long', 'float', 'double', 
        'decimal', 'bigint', 'smallint', 'tinyint'
    ]
    return any(t in str(dtype).lower() for t in numeric_types)

def is_categorical_column(dtype):
    """Check if column is categorical (string, boolean)"""
    categorical_types = ['string', 'boolean']
    return any(t in str(dtype).lower() for t in categorical_types)

print("✓ Data type detection functions defined")


In [0]:
def calculate_continuous_statistics(df, column_name):
    """Calculate statistics for continuous/numeric columns"""
    
    stats = df.select(
        F.avg(column_name).alias("average"),
        F.expr(f"percentile_approx(`{column_name}`, 0.5)").alias("median"),
        F.stddev(column_name).alias("standard_deviation"),
        F.min(column_name).alias("min"),
        F.max(column_name).alias("max"),
        F.expr(f"percentile_approx(`{column_name}`, 0.25)").alias("q1"),
        F.expr(f"percentile_approx(`{column_name}`, 0.75)").alias("q3"),
        F.variance(column_name).alias("variance"),
        F.skewness(column_name).alias("skewness"),
        F.kurtosis(column_name).alias("kurtosis"),
        F.count(F.when(F.col(column_name).isNull(), 1)).alias("missing_count"),
        F.count("*").alias("total_count")
    ).collect()[0]
    
    # Calculate mode (most frequent value)
    mode_df = df.groupBy(column_name).count().orderBy(F.desc("count")).limit(1).collect()
    mode_value = mode_df[0][column_name] if mode_df else None
    
    total_count = stats["total_count"]
    missing_count = stats["missing_count"]
    missing_percentage = (missing_count / total_count * 100) if total_count > 0 else 0
    
    return {
        "average": float(stats["average"]) if stats["average"] is not None else None,
        "median": float(stats["median"]) if stats["median"] is not None else None,
        "mode": float(mode_value) if mode_value is not None else None,
        "standard_deviation": float(stats["standard_deviation"]) if stats["standard_deviation"] is not None else None,
        "min": float(stats["min"]) if stats["min"] is not None else None,
        "max": float(stats["max"]) if stats["max"] is not None else None,
        "q1": float(stats["q1"]) if stats["q1"] is not None else None,
        "q3": float(stats["q3"]) if stats["q3"] is not None else None,
        "skewness": float(stats["skewness"]) if stats["skewness"] is not None else None,
        "variance": float(stats["variance"]) if stats["variance"] is not None else None,
        "kurtosis": float(stats["kurtosis"]) if stats["kurtosis"] is not None else None,
        "missing_count": int(missing_count),
        "missing_percentage": round(missing_percentage, 3)
    }

print("✓ Continuous statistics function defined")


In [0]:
def calculate_categorical_statistics(df, column_name):
    """Calculate statistics for categorical columns"""
    
    total_count = df.count()
    missing_count = df.filter(F.col(column_name).isNull()).count()
    
    # Get frequency distribution
    freq_df = df.groupBy(column_name)\
        .count()\
        .filter(F.col(column_name).isNotNull())\
        .orderBy(F.desc("count"))\
        .collect()
    
    distinct_categories = [row[column_name] for row in freq_df]
    unique_category_count = len(distinct_categories)
    
    # Calculate frequency distribution with percentages
    valid_count = total_count - missing_count
    frequency_distribution = []
    
    for row in freq_df:
        category = row[column_name]
        count = row["count"]
        percentage = (count / valid_count * 100) if valid_count > 0 else 0
        
        frequency_distribution.append({
            "category": str(category),
            "count": int(count),
            "percentage": round(percentage, 2)
        })
    
    # Mode is the most frequent category
    mode = distinct_categories[0] if distinct_categories else None
    
    missing_percentage = (missing_count / total_count * 100) if total_count > 0 else 0
    
    return {
        "distinct_categories": [str(c) for c in distinct_categories[:100]],  # Limit to 100
        "unique_category_count": unique_category_count,
        "frequency_distribution": frequency_distribution[:50],  # Limit to top 50
        "mode": str(mode) if mode is not None else None,
        "missing_count": int(missing_count),
        "missing_percentage": round(missing_percentage, 3)
    }

print("✓ Categorical statistics function defined")


In [0]:
def process_parquet_file_grouped_by_family(file_path):
    """
    Process a single parquet file and generate statistics for all columns grouped by the 'family' column.
    """
    print(f"\n{'=' * 80}")
    print(f"Processing file: {file_path}")
    print(f"{'=' * 80}\n")
    
    # Read parquet file
    df = spark.read.parquet(file_path)
    total_rows = df.count()
    
    print(f"Total rows: {total_rows:,}")
    print(f"Total columns: {len(df.columns)}")

    # Check if 'family' column exists
    if "family" not in df.columns:
        raise KeyError("'family' column does not exist in the dataset!")

    # Get unique family classifications
    family_groups = df.select("family").distinct().collect()
    total_families = len(family_groups)
    print(f"Unique families: {total_families}")
    
    documents = []

    # Loop through each family group
    for idx, family_row in enumerate(family_groups, 1):
        family_name = family_row["family"]
        print(f"\n[{idx}/{total_families}] Processing family: {family_name}")

        # Filter DataFrame for the current family
        filtered_df = df.filter(F.col("family") == family_name)
        row_count = filtered_df.count()

        # Track feature counts
        features = {}
        continuous_count = 0
        categorical_count = 0
        total_errors = 0

        # Get columns to process (exclude the "family" classification column itself)
        feature_columns = [col for col in df.columns if col != "family"]
        total_features = len(feature_columns)

        print(f"  Processing {total_features} feature columns... ", end="")

        for col_name in feature_columns:
            col_type = dict(df.dtypes)[col_name]

            # If column is numeric, calculate continuous statistics
            if is_numeric_column(col_type):
                try:
                    stats = calculate_continuous_statistics(filtered_df, col_name)
                    features[col_name] = {
                        "data_type": "continuous",
                        "statistics": stats
                    }
                    continuous_count += 1
                except Exception as e:
                    total_errors += 1
                    print(f"\n    Warning: Error calculating for {col_name}: {e}")

            # If column is categorical, calculate categorical statistics
            elif is_categorical_column(col_type):
                try:
                    stats = calculate_categorical_statistics(filtered_df, col_name)
                    features[col_name] = {
                        "data_type": "categorical",
                        "statistics": stats
                    }
                    categorical_count += 1
                except Exception as e:
                    total_errors += 1
                    print(f"\n    Warning: Error calculating for {col_name}: {e}")

        print(f"Done!")
        print(f"  ✓ Continuous: {continuous_count}, Categorical: {categorical_count}, Errors: {total_errors}")

        # Create the summary document for this family
        document = {
            "id": f"FAMILY-STATS-{family_name}-{BATCH_ID}",
            "pk": family_name,
            "type": "feature_statistics",
            "family": family_name,
            "batch_info": {
                "current_batch_id": BATCH_ID,
                "processed_date": datetime.utcnow().isoformat() + "Z",
                "total_rows_processed": row_count,
                "total_columns_processed": total_features,
                "source_file": file_path.replace("dbfs:", "")
            },
            "features": features,
            "summary": {
                "total_features": total_features,
                "continuous_features": continuous_count,
                "categorical_features": categorical_count,
                "total_records_in_family": row_count,
                "total_errors": total_errors
            },
            "version": "1.0",
            "schema_version": "1.0",
            "created_at": datetime.utcnow().isoformat() + "Z",
            "updated_at": datetime.utcnow().isoformat() + "Z",
            "source_system": "databricks"
        }

        documents.append(document)

    print(f"\n{'=' * 80}")
    print(f"Completed processing: {file_path}")
    print(f"  Total families processed: {total_families}")
    print(f"  Total documents created: {len(documents)}")
    print(f"{'=' * 80}\n")
    
    return documents

print("✓ Optimized processing function defined")


In [0]:
def insert_to_cosmosdb_spark(documents, batch_size=20):
    for i in range(0, len(documents), batch_size):
        batch = documents[i:i+batch_size]
        docs_json = [json.dumps(doc, default=str) for doc in batch]
        df = spark.read.json(spark.sparkContext.parallelize(docs_json))

        (
            df.write.format("cosmos.oltp")
            .option("spark.cosmos.accountEndpoint", COSMOS_ENDPOINT)
            .option("spark.cosmos.accountKey", COSMOS_KEY)
            .option("spark.cosmos.database", DB_NAME)
            .option("spark.cosmos.container", CONTAINER_NAME)
            .option("spark.cosmos.write.strategy", "ItemOverwrite")
            .option("spark.cosmos.write.bulk.enabled", "true")
            .option("spark.cosmos.write.maxRetryCount", "3")
            .mode("append")
            .save()
        )


In [0]:
# Process all parquet files
all_documents = []

for family_name, file_path in FILE_PATHS.items():
    try:
        documents = process_parquet_file_grouped_by_family(file_path)
        all_documents.extend(documents)
    except Exception as e:
        print(f"Error processing {family_name}: {e}")
        import traceback
        traceback.print_exc()

print(f"\n{'='*80}")
print(f"Total documents generated: {len(all_documents)}")
print(f"{'='*80}\n")


In [0]:
# CELL 11: Execute Insert - MODIFIED (change function name)
if all_documents:
    # Reduce batch size and increase partitions to avoid memory issues
    def insert_to_cosmosdb_spark(documents, batch_size=50):
        for i in range(0, len(documents), batch_size):
            batch = documents[i:i+batch_size]
            docs_json = [json.dumps(doc, default=str) for doc in batch]
            df = spark.read.json(spark.sparkContext.parallelize(docs_json))
            df = df.repartition(12)  # Increase partition count if needed

            df.write.format("cosmos.oltp") \
                .option("spark.cosmos.accountEndpoint", COSMOS_ENDPOINT) \
                .option("spark.cosmos.accountKey", COSMOS_KEY) \
                .option("spark.cosmos.database", DB_NAME) \
                .option("spark.cosmos.container", CONTAINER_NAME) \
                .option("spark.cosmos.write.strategy", "ItemOverwrite") \
                .option("spark.cosmos.write.bulk.enabled", "true") \
                .option("spark.cosmos.write.maxRetryCount", "3") \
                .mode("append") \
                .save()
    insert_to_cosmosdb_spark(all_documents)
else:
    print("⚠ No documents to insert!")
