In [None]:
import pandas as pd
import io
import requests

# URL that returns parquet data
url = ""
# Make the GET request
response = requests.get(url)

# Raise an error if the request failed
response.raise_for_status()

# Read the parquet content into a pandas DataFrame using pyarrow engine
df = pd.read_parquet(io.BytesIO(response.content), engine='pyarrow')

In [None]:
import joblib
import ast
import os
from sklearn.preprocessing import StandardScaler
from minisom import MiniSom
from math import sqrt
from collections import Counter
import numpy as np
import pandas as pd

# Create models2 directory if it doesn't exist
os.makedirs('models2', exist_ok=True)

# === SETUP ===
import pandas as pd

df = pd.read_parquet('data_dec_2024.parquet')
df = df[df['DpiPolicy'] != 'Unknown']

# Convert numeric columns if they're stored as strings
numeric_columns = ['bytesFromClient', 'bytesFromServer', 'sessions_count', 'transationDuration']
for col in numeric_columns:
    # Check if column exists
    if col in df.columns:
        # Check if column is string type and contains quotes
        if df[col].dtype == 'object':
            try:
                df[col] = df[col].str.replace('"', '').astype(float)
            except AttributeError:
                # If it's already numeric, just convert to float
                try:
                    df[col] = df[col].astype(float)
                except:
                    print(f"Warning: Could not convert column {col} to numeric")

result = df.groupby('SubscriberID').agg({
    'DpiPolicy': lambda x: list(set(x)),
    'appName': lambda x: list(set(x)),
    'contentType': lambda x: list(set(x)),
    'IpProtocol': lambda x: list(set(x)),
    'bytesFromClient': 'sum',
    'bytesFromServer': 'sum',
    'sessions_count': 'sum',
    'transationDuration': 'sum'
})

# Format the numeric columns to remove decimal points for integers
result['bytesFromClient'] = result['bytesFromClient'].astype(int)
result['bytesFromServer'] = result['bytesFromServer'].astype(int)
result['sessions_count'] = result['sessions_count'].astype(int)
result['transationDuration'] = result['transationDuration'].astype(int)

# Keep track of original offers for each subscriber
# For Parquet files, lists might already be proper Python lists, not strings
original_offers = df['DpiPolicy'].copy()

# === STEP 1: Encode DpiPolicy to bitmap ===
policy_values = [
    'BLOCKALL', 'BlockNonProvSub', 'ClientIPWhitelist', 'ExpireBlock',
    'F1200G50M', 'F3000G100M', 'F3000G200M', 'F4000G300M', 'F4000G400M',
    'F4000G500M', 'F4000G550M', 'F4000G600M', 'F6000G1000M',
    'NoCRBNBlock', 'SuspendBlock', 'ZeroRated'
]
policy_to_bit = {policy: i for i, policy in enumerate(policy_values)}
joblib.dump(policy_to_bit, 'models2/policy_to_bit.joblib')

dpi_bitmap = []
for val in df['DpiPolicy']:
    if pd.isna(val):
        dpi_bitmap.append(0)
    else:
        try:
            # Check if already a list (common in parquet) or needs evaluation (common in CSV)
            if isinstance(val, list):
                policies = val
            elif isinstance(val, str):
                # Try parsing if it's a string representation of a list
                if val.startswith('[') and val.endswith(']'):
                    try:
                        policies = ast.literal_eval(val)
                    except:
                        policies = [val]  # If parsing fails, treat as single value
                else:
                    policies = [val]  # Single string value
            else:
                policies = [val]  # Single value of another type
                
            bitsum = sum(1 << policy_to_bit[p] for p in policies if p in policy_to_bit)
            dpi_bitmap.append(bitsum)
        except Exception as e:
            print(f"Error processing DpiPolicy value: {val}, Error: {e}")
            dpi_bitmap.append(0)
            
df['DpiPolicy'] = dpi_bitmap

# === STEP 2: Encode contentType to bitmap ===
content_types = [
    'Ads & Trackers', 'Browsing', 'Domain Name Service', 'Email', 'FTP',
    'File Sharing', 'Gaming', 'Instant Messaging', 'Internet Privacy',
    'Location Based Services', 'Music Streaming', 'Net Admin', 'One Click Hosting',
    'Other', 'Other File Sharing', 'Social Media', 'Streaming', 'Trading',
    'Unknown', 'Voice & Video Calls', 'Webmail'
]
content_to_bit = {c: i for i, c in enumerate(content_types)}
joblib.dump(content_to_bit, 'models2/content_to_bit.joblib')

content_bitmap = []
for val in df['contentType']:
    if pd.isna(val):
        content_bitmap.append(0)
    else:
        try:
            # Similar approach as DpiPolicy
            if isinstance(val, list):
                contents = val
            elif isinstance(val, str):
                if val.startswith('[') and val.endswith(']'):
                    try:
                        contents = ast.literal_eval(val)
                    except:
                        contents = [val]
                else:
                    contents = [val]
            else:
                contents = [val]
                
            bitsum = sum(1 << content_to_bit[c] for c in contents if c in content_to_bit)
            content_bitmap.append(bitsum)
        except Exception as e:
            print(f"Error processing contentType value: {val}, Error: {e}")
            content_bitmap.append(0)
            
df['contentType'] = content_bitmap

# === STEP 3: Encode IpProtocol to bitmap ===
ip_protocols = ['ESP', 'GRE', 'ICMP', 'TCP', 'UDP']
proto_to_bit = {p: i for i, p in enumerate(ip_protocols)}
joblib.dump(proto_to_bit, 'models2/proto_to_bit.joblib')

proto_bitmap = []
for val in df['IpProtocol']:
    if pd.isna(val):
        proto_bitmap.append(0)
    else:
        try:
            # Similar approach as before
            if isinstance(val, list):
                protos = val
            elif isinstance(val, str):
                if val.startswith('[') and val.endswith(']'):
                    try:
                        protos = ast.literal_eval(val)
                    except:
                        protos = [val]
                else:
                    protos = [val]
            else:
                protos = [val]
                
            bitsum = sum(1 << proto_to_bit[p] for p in protos if p in proto_to_bit)
            proto_bitmap.append(bitsum)
        except Exception as e:
            print(f"Error processing IpProtocol value: {val}, Error: {e}")
            proto_bitmap.append(0)
            
df['IpProtocol'] = proto_bitmap

# === STEP 4: Convert appName to app_count ===
app_counts = []
for val in df['appName']:
    if pd.isna(val):
        app_counts.append(0)
    else:
        try:
            # Similar approach as before
            if isinstance(val, list):
                apps = val
            elif isinstance(val, str):
                if val.startswith('[') and val.endswith(']'):
                    try:
                        apps = ast.literal_eval(val)
                    except:
                        apps = [val]
                else:
                    apps = [val]
            else:
                apps = [val]
                
            app_counts.append(len(apps) if isinstance(apps, list) else 1)
        except Exception as e:
            print(f"Error processing appName value: {val}, Error: {e}")
            app_counts.append(1)
            
df['app_count'] = app_counts
df.drop(columns=['appName'], inplace=True)

# Save columns affected
joblib.dump(['appName'], 'models2/dropped_columns.joblib')
joblib.dump(['app_count'], 'models2/added_columns.joblib')

# === STEP 5: Check for and handle any remaining non-numeric columns ===
# Identify object columns that need conversion
object_cols = df.select_dtypes(include=['object']).columns.tolist()
if object_cols:
    print(f"Found object columns that need conversion: {object_cols}")
    for col in object_cols:
        print(f"Converting column: {col}")
        # Try to convert string representations of lists to numeric values
        try:
            # For any remaining object columns, we'll drop them or convert as appropriate
            # Here we'll convert to category codes as a default strategy
            df[col] = pd.Categorical(df[col]).codes
        except Exception as e:
            print(f"Error converting {col}: {e}")
            print(f"Dropping column {col}")
            df.drop(columns=[col], inplace=True)

# === STEP 6: Fix zeros/negatives ===
medians = {}
fix_cols = ['bytesFromClient', 'bytesFromServer', 'transationDuration']
for col in fix_cols:
    if col in df.columns:
        median = df.loc[df[col] > 0, col].median()
        median = median if not np.isnan(median) else 1
        medians[col] = median
        df[col] = df[col].apply(lambda x: median if pd.isna(x) or x <= 0 else x)
joblib.dump(medians, 'models2/medians.joblib')

# === STEP 7: Standardize numeric columns ===
# Make sure all columns are numeric at this point
print("Columns before selecting numeric types:")
print(df.columns.tolist())
print("Data types before selecting numeric types:")
print(df.dtypes)

# Convert any non-numeric columns that should be numeric
# This step helps with columns that might have been lists in parquet
for col in df.columns:
    if df[col].dtype == 'object' or pd.api.types.is_object_dtype(df[col]):
        try:
            df[col] = pd.to_numeric(df[col], errors='coerce')
            df[col].fillna(df[col].median() if not df[col].isna().all() else 0, inplace=True)
            print(f"Converted column {col} to numeric")
        except Exception as e:
            print(f"Could not convert column {col} to numeric: {e}")
            print(f"Dropping column {col}")
            df.drop(columns=[col], inplace=True)

# Now select only numeric columns
df = df.select_dtypes(include=[np.number])
print("Final data types before scaling:")
print(df.dtypes)

numeric_cols = df.columns.tolist()
scaler = StandardScaler()
scaler.fit(df[numeric_cols])
df[numeric_cols] = scaler.transform(df[numeric_cols])
joblib.dump(scaler, 'models2/scaler.joblib')
joblib.dump(numeric_cols, 'models2/numeric_cols.joblib')

# Print sample of processed data to verify
print("\nSample of processed data:")
print(df.head())

# === NOW START SOM MODEL TRAINING ===
# Drop SubscriberID if it exists (identifier column not useful for training)
if 'SubscriberID' in df.columns:
    df.drop(columns=['SubscriberID'], inplace=True)
    print("Dropped 'SubscriberID' column before training SOM.")

X = df.values  # Use the processed and scaled data for SOM
print(f"Data shape for SOM: {X.shape}")
print(f"Data type: {X.dtype}")

# Check for any non-numeric values in X
if np.issubdtype(X.dtype, np.number) is False:
    print("WARNING: Non-numeric values detected in the data")
    # Convert data to float64 and replace any non-numeric values with 0
    X = np.nan_to_num(X.astype(float))

# === SOM Setup ===
num_rows, num_cols = 10, 10
grid_diagonal = sqrt(num_rows**2 + num_cols**2)
sigma = 0.8 * grid_diagonal
learning_rate = 0.8
epochs = 1
total_iterations = epochs * X.shape[0]

# === Custom Weight Initialization Function ===
def custom_weight_init(som, interval=0.3):
    """
    Initialize SOM weights with values centered around zero in the given interval.
    Uses a uniform distribution between [-interval, interval]
    """
    som._weights = np.random.uniform(-interval, interval, 
                                    (som._weights.shape[0], 
                                    som._weights.shape[1], 
                                    som._weights.shape[2]))
    return som

# Initialize SOM
som = MiniSom(x=num_rows, y=num_cols, input_len=X.shape[1],
            sigma=sigma, learning_rate=learning_rate)

# Use custom weight initialization centered on zero with interval [-0.3, 0.3]
custom_weight_init(som, interval=0.3)

# === Train SOM ===
weights_evolution = []
weights_evolution.append(som.get_weights().copy())

try:
    for iteration in range(total_iterations):
        idx = np.random.randint(0, X.shape[0])
        sample = X[idx]
        
        # Debug info for first few iterations
        if iteration < 5:
            print(f"Iteration {iteration}, Sample shape: {sample.shape}, Sample type: {sample.dtype}")
            print(f"Sample contains NaN: {np.isnan(sample).any()}")
        
        # Ensure no NaN values in sample
        if np.isnan(sample).any():
            print(f"NaN values found in sample at iteration {iteration}, replacing with zeros")
            sample = np.nan_to_num(sample)
            
        som.update(sample, som.winner(sample), iteration, total_iterations)

        if iteration % (total_iterations // 10) == 0:
            weights_evolution.append(som.get_weights().copy())
            print(f"Completed {iteration}/{total_iterations} iterations")
except Exception as e:
    print(f"Error during training: {e}")
    # If we hit an error, try to print detailed info about the problematic sample
    print(f"Problem with sample at index {idx}:")
    print(f"Sample: {sample}")
    print(f"Sample dtype: {sample.dtype}")
    print(f"Contains non-finite values: {np.isfinite(sample).all()}")
    raise

# === Save the trained SOM model to a file ===
joblib.dump(som, 'models2/som_model.joblib')
print("SOM model trained and saved successfully!")

# === NEW CODE: AUTOMATICALLY GENERATE CLUSTER_OFFERS MAPPING ===
# Function to parse offer strings from original data
def parse_offers(offer_val):
    if pd.isna(offer_val):
        return []
    try:
        # Handle different data types from parquet vs CSV
        if isinstance(offer_val, list):
            offers = offer_val
        elif isinstance(offer_val, str):
            # Try parsing if it's a string representation of a list
            if offer_val.startswith('[') and offer_val.endswith(']'):
                try:
                    offers = ast.literal_eval(offer_val)
                except:
                    offers = [offer_val]
            else:
                offers = [offer_val]
        else:
            offers = [offer_val]
            
        return offers if isinstance(offers, list) else [offers]
    except Exception as e:
        print(f"Error parsing offer: {offer_val}, Error: {e}")
        return []

# Create a list of offers for each data point
print("Generating cluster_offers mapping from SOM...")
original_offers_parsed = [parse_offers(offer) for offer in original_offers]

# Find BMU for each data point
bmu_indices = []
for i, x in enumerate(X):
    # Ensure no NaN values
    if np.isnan(x).any():
        x = np.nan_to_num(x)
    winner = som.winner(x)
    bmu_indices.append((winner[0], winner[1]))

# Create a mapping from BMU to cluster ID
cluster_mapping = {}
for i, (row, col) in enumerate(bmu_indices):
    cluster_id = row * num_cols + col
    if cluster_id not in cluster_mapping:
        cluster_mapping[cluster_id] = []
    cluster_mapping[cluster_id].append(i)

# Create a mapping from cluster ID to offers
cluster_offers = {}
for cluster_id, indices in cluster_mapping.items():
    # Flatten list of lists and count occurrences
    all_offers = [offer for idx in indices for offer in original_offers_parsed[idx]]
    if not all_offers:
        cluster_offers[cluster_id] = []
        continue
    
    # Define offers to exclude globally
    excluded_offers = {'F3000G200M', 'F3000G100M', 'F1200G50M'}
    
    offer_counts = Counter(all_offers)
    
    # Filter out excluded offers
    filtered_offer_counts = {offer: count for offer, count in offer_counts.items() if offer not in excluded_offers}
    
    # Strategy: Take offers that appear in at least 20% of data points in the cluster
    min_threshold = max(1, len(indices) * 0.2)
    top_offers = [offer for offer, count in filtered_offer_counts.items() if count >= min_threshold]
    
    # If not enough, take next most common offers excluding the excluded ones
    if len(top_offers) < 2:
        # Get most common, but make sure we don't try to get more than we have
        most_common_count = min(2, len(filtered_offer_counts))
        top_offers = [offer for offer, _ in Counter(filtered_offer_counts).most_common(most_common_count)]

    cluster_offers[cluster_id] = top_offers

# Save the automatically generated cluster_offers mapping
np.save('som_weights1.npy', som.get_weights())
joblib.dump(cluster_offers, 'models2/cluster_offers.joblib')

print("Cluster offers mapping generated and saved successfully!")
print(f"Generated {len(cluster_offers)} cluster mappings.")

# Print a sample of the mappings
print("\nSample of cluster_offers mappings:")
sample_size = min(10, len(cluster_offers))
sample_clusters = list(cluster_offers.keys())[:sample_size]
for cluster in sample_clusters:
    print(f"Cluster {cluster}: {cluster_offers[cluster]}")