In [1]:
import os
import sqlite3
from multiprocessing import freeze_support
from modin.db_conn import ModinDatabaseConnection
import modin.pandas as mpd
%load_ext autoreload
%autoreload 2
os.environ["MODIN_ENGINE"] = "ray"  # Modin will use Ray
def load_data_from_db(con):
    try:
        df = mpd.read_sql("SELECT * FROM data", con)
        return df
    except Exception as e:
        print(f"Error loading data: {e}")
        raise


freeze_support()
dbfile = '/home/tompouce/workspaces/mafat-challenge/train_data_for_competition/mini_training_set.db'

conn = ModinDatabaseConnection('sqlalchemy', f'sqlite:///{dbfile}')

# Can use get_connection to get underlying sqlalchemy engine
conn.get_connection()
db_df = load_data_from_db(conn)
print(db_df.head())

   Device_ID                   Datetime      URL  Domain_Name  Domain_cls1  \
0        124  2023-04-23 03:04:30+03:00     6466      2368671          755   
1        124  2023-04-23 03:04:30+03:00  2245864      1792903            0   
2        124  2023-04-23 03:04:30+03:00  1839478       107342          332   
3        124  2023-04-23 03:14:50+03:00  1172090       107342          332   
4        124  2023-04-23 03:14:50+03:00  1839478       107342          332   

   Domain_cls2  Domain_cls3  Domain_cls4  Target  
0          799            0            0       0  
1            0            0            0       0  
2            0            0            0       0  
3            0            0            0       0  
4            0            0            0       0  


In [3]:
# "23-04 to 18-05"
db_df["Datetime"] = mpd.to_datetime(db_df["Datetime"])
db_df["Datetime"]
# db_df.groupby("Device_ID").apply(lambda x: (x-x["Datetime"].min()).dt.days)

0          2023-04-23 03:04:30+03:00
1          2023-04-23 03:04:30+03:00
2          2023-04-23 03:04:30+03:00
3          2023-04-23 03:14:50+03:00
4          2023-04-23 03:14:50+03:00
                      ...           
32248978   2023-05-13 21:26:02+03:00
32248979   2023-05-13 21:26:03+03:00
32248980   2023-05-13 21:32:33+03:00
32248981   2023-05-13 21:32:38+03:00
32248982   2023-05-13 21:32:43+03:00
Name: Datetime, Length: 32248983, dtype: datetime64[ns, UTC+03:00]

In [4]:
def get_domain_counts(db_df, pivot=False):
    domain_counts = db_df.groupby(["Device_ID","Domain_Name","Target"]).count()
    domain_counts = domain_counts.reset_index()
    domain_counts = domain_counts[["Device_ID","Domain_Name","Target","Datetime"]]
    domain_counts.rename(columns={"Datetime":"count"}, inplace=True)
    if pivot:
        pivot_matrix = train_domain_counts.pivot(index='source', columns='target', values='count').fillna(0)
        return pivot_matrix
    return domain_counts


Unnamed: 0,source,target,Target,weight
0,124,3930,0,4
1,124,4136,0,3
2,124,6450,0,3
3,124,12837,0,1
4,124,17665,0,2
...,...,...,...,...
288003,69967,2361028,1,4
288004,69967,2387835,1,34
288005,69967,2389761,1,72
288006,69967,2390487,1,1


In [14]:
def get_train_test_masks(domain_counts, test_size=0.2, random_state=42):
    # Get unique device IDs and their corresponding targets
    device_target_df = domain_counts.groupby('Device_ID')['Target'].first().reset_index()
    
    # Perform stratified split on device IDs
    train_device_ids, test_device_ids = train_test_split(
        device_target_df['Device_ID'],
        test_size=test_size,
        random_state=random_state,
        stratify=device_target_df['Target']
    )
    
    # Create mask for train/test split in domain_counts
    train_mask = domain_counts['Device_ID'].isin(train_device_ids)
    test_mask = domain_counts['Device_ID'].isin(test_device_ids)
    
    # Print statistics
    print(f"Total devices: {len(device_target_df)}")
    print(f"Train devices: {len(train_device_ids)}")
    print(f"Test devices: {len(test_device_ids)}")
    print(f"\nTrain samples: {len(domain_counts[train_mask])}")
    print(f"Test samples: {len(domain_counts[test_mask])}")
    
    # Print class distribution
    print("\nTarget distribution in train set:")
    print(domain_counts[train_mask].groupby('Target').size() / len(domain_counts[train_mask]))
    print("\nTarget distribution in test set:")
    print(domain_counts[test_mask].groupby('Target').size() / len(domain_counts[test_mask]))
    
    return train_mask, test_mask


Total devices: 615
Train devices: 492
Test devices: 123

Train samples: 228881
Test samples: 59127


In [None]:
print("keep in mind that the best resolution will be achieved with a resolution of urls/chain of urls, not domains")
print("Cluster url walks")
print("I want to cluster urls/url walks from a given domain, to 3 categories")
print("positive correlation, zero correlation, negative correlation, to_label")

In [None]:
import numpy as np
import ray

def compute_chunked_covariance(pivot_matrix, batch_size=2000):
    # Convert to numpy array for faster computation
    matrix_dense = pivot_matrix.to_numpy()
    matrix_centered = matrix_dense - np.mean(matrix_dense, axis=0)

    # Initialize parameters
    n_cols = pivot_matrix.shape[1]
    futures = []

    # Submit tasks to Ray
    for i in range(0, n_cols, batch_size):
        batch_end = min(i + batch_size, n_cols)
        futures.append(calculate_chunk_covariance.remote(matrix_centered, i, batch_end, pivot_matrix.columns))

    # Collect results and combine
    cov_chunks = []
    for future in ray.get(futures):
        start_idx, end_idx, chunk_cov = future
        chunk_df = mpd.DataFrame(
            chunk_cov,
            index=pivot_matrix.columns[start_idx:end_idx],
            columns=pivot_matrix.columns
        )
        cov_chunks.append(chunk_df)

    # Combine all chunks
    return mpd.concat(cov_chunks)

In [None]:
first_half = cov_mat.iloc[:,:cov_mat.shape[1]//2]
second_half = cov_mat.iloc[:,cov_mat.shape[1]//2:]

In [84]:
first_quarter = first_half.iloc[:first_half.shape[1]//100,:first_half.shape[1]//100]

In [None]:
import ray.data as rd
import ray
# Convert Modin DataFrame to Ray Dataset
# First get the Pandas partitions from Modin
# Initialize Ray if not already initialized
if not ray.is_initialized():
    ray.init()

# Convert the large Modin DataFrame to Ray Dataset in batches
ds = rd.from_modin(cov_mat)

# Write the dataset to parquet files with automatic batching
ds.write_parquet(
    'url_cov_mat_rays',
    filesystem=None,  # Local filesystem
    row_group_size_bytes=100_000_000,  # Adjust based on available memory (100MB)
    compression='snappy'
)


In [None]:
chunk_size = 100
n_cols = cov_mat.shape[1]
cov_path = "/home/tompouce/workspaces/mafat-challenge/eda/cov_matrices"
for i in range(0, n_cols, chunk_size):
    end_idx = min(i + chunk_size, n_cols)
    chunk = cov_mat.iloc[:, i:end_idx]
    chunk_name = f'url_cov_mat_chunk_{i}_{end_idx}.parquet'
    chunk.to_parquet(os.path.join(cov_path,chunk_name), engine='pyarrow', compression='snappy')

In [None]:
# (cov_mat*100).astype("uint8").to_parquet('url_cov_mat.parquet', engine='pyarrow', compression='snappy')

In [None]:
# N2V = Node2Vec(G, dimensions=64, walk_length=30, num_walks=200, workers=8)
# model = N2V.fit(window=10, min_count=1, batch_words=8)


In [None]:
# model.wv.save_word2vec_format("user2url_node2vec_embeddings.emb")

In [None]:
db_df.groupby("Domain_cls2")


In [None]:
db_df.head()
divide_ids = list(db_df["Device_ID"].unique())
print(len(divide_ids))
regular_df = db_df[db_df["Device_ID"].isin(divide_ids[:20])]
# %debug

In [11]:
regular_df = regular_df._to_pandas()

In [None]:
regular_df.columns

In [5]:
# db_df["count_weighted_by_days"] = db_df.groupby("Device_ID").apply(lambda x: (x["Datetime"]-x["Datetime"].min()).dt.days).astype("float32")
# db_df["count_weighted_by_days"] = db_df["count_weighted_by_days"] *0.1+1
# db_df.groupby("Device_ID").apply(lambda x: (x["count_weighted_by_days"]/x["count_weighted_by_days"].sum())).astype("float64")
# db_df.groupby("Device_ID").apply(lambda x: (x["count_weighted_by_days"]/x["count_weighted_by_days"].sum())).astype("float64")

# db_df.groupby("Device_ID")["count_weighted_by_days"].sum()