In [5]:
uri="mongodb+srv://admin:admin@cluster0.3og2uv4.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"

In [6]:
db_name='fraud_db'
collection_name='transactions'

In [1]:
import pandas as pd
import dask.dataframe as dd
from dask_ml.preprocessing import DummyEncoder, StandardScaler
from dask_ml.impute import SimpleImputer
from dask_ml.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from dask_ml.wrappers import ParallelPostFit
from sklearn.metrics import classification_report

# Define the columns to load and their data types
selected_columns = ['TransactionID', 'TransactionDT', 'TransactionAmt', 'ProductCD', 'card1', 'card2', 
                    'card3', 'card4', 'card5', 'card6', 'dist1', 'dist2']

dtypes = {
    'TransactionID': 'int64',
    'TransactionDT': 'int64',
    'TransactionAmt': 'float64',
    'ProductCD': 'object',
    'card1': 'int64',
    'card2': 'float64',
    'card3': 'float64',
    'card4': 'object',
    'card5': 'float64',
    'card6': 'object',
    'dist1': 'float64',
    'dist2': 'float64'
}


# Load dataset
df = dd.read_csv('train_transaction.csv', usecols=selected_columns + ['isFraud'], dtype=dtypes).set_index('TransactionID')

# Identify numeric and categorical columns
numeric_columns = ['TransactionDT', 'TransactionAmt', 'card1', 'card2', 'card3', 'card5', 'dist1', 'dist2']
categorical_columns = ['ProductCD', 'card4', 'card6']

# Take a small sample to fit the imputer and scaler
sample = df.sample(frac=0.1, random_state=42).compute()

# Fit the imputers
imputer_numeric = SimpleImputer(strategy='mean').fit(sample[numeric_columns])
imputer_categorical = SimpleImputer(strategy='most_frequent').fit(sample[categorical_columns])

# Fit the scaler
scaler = StandardScaler().fit(sample[numeric_columns])

# Handle missing values for numeric columns in parallel
df[numeric_columns] = df[numeric_columns].map_partitions(lambda df: pd.DataFrame(imputer_numeric.transform(df), columns=df.columns), meta=df[numeric_columns]._meta)

# Handle missing values for categorical columns in parallel
df[categorical_columns] = df[categorical_columns].map_partitions(lambda df: pd.DataFrame(imputer_categorical.transform(df), columns=df.columns), meta=df[categorical_columns]._meta)

# Convert categorical columns to categorical dtype
df[categorical_columns] = df[categorical_columns].categorize()

# Encode categorical variables
encoder = DummyEncoder()
df = encoder.fit_transform(df)

# Scale numeric features in parallel
df[numeric_columns] = df[numeric_columns].map_partitions(lambda df: pd.DataFrame(scaler.transform(df), columns=df.columns), meta=df[numeric_columns]._meta)


In [2]:
from dask.distributed import Client

client = Client()  # Starts a local scheduler and worker if no arguments are provided
print(client)


Perhaps you already have a cluster running?
Hosting the HTTP server on port 51658 instead


<Client: 'tcp://127.0.0.1:51659' processes=4 threads=4, memory=7.92 GiB>


In [3]:
from dask.distributed import get_worker
from pymongo import MongoClient

def get_mongo_client():
    worker = get_worker()  # This raises ValueError if not running within a worker context
    if not hasattr(worker, 'mongo_client'):
        connection_string = "mongodb+srv://admin:admin@cluster0.3og2uv4.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"
        worker.mongo_client = MongoClient(connection_string)
    return worker.mongo_client

def insert_into_mongo(df_part):
    try:
        client = get_mongo_client()
        db = client.fraud_db
        transactions_collection = db.transactions
        records = df_part.to_dict(orient='records')
        if records:
            transactions_collection.insert_many(records)
    except ValueError:
        print("Not running on a worker. Proper MongoDB operations can't be performed.")


In [4]:
df.map_partitions(insert_into_mongo, meta=int).compute()




0    None
1    None
2    None
3    None
4    None
5    None
6    None
7    None
8    None
9    None
dtype: object

In [8]:
def load_data_from_mongo(uri, db_name, collection_name):
    client = MongoClient(uri)
    db = client[db_name]
    collection = db[collection_name]
    cursor = collection.find({})

    # Convert cursor to DataFrame directly if memory allows
    return pd.DataFrame(list(cursor))


In [9]:
from dask.delayed import delayed
chunks = load_data_from_mongo(uri, db_name, collection_name, chunk_size=10000)

# Create Dask DataFrame from delayed chunks
ddf = dd.from_delayed([delayed(pd.DataFrame)(chunk) for chunk in chunks])

In [10]:
ddf.head()

This may cause some slowdown.
Consider scattering data ahead of time and using futures.


Unnamed: 0,_id,isFraud,TransactionDT,TransactionAmt,card1,card2,card3,card5,dist1,dist2,...,ProductCD_S,ProductCD_W,card4_american express,card4_discover,card4_mastercard,card4_visa,card6_charge card,card6_credit,card6_debit,card6_debit or credit
0,6679cb8bc99f35ede77fcb1a,1,1.440697,0.098095,1.26736,-1.600764,-0.281335,0.651552,0.0,0.0,...,False,False,False,False,False,True,False,False,True,False
1,6679cb8bc99f35ede77fcb1b,0,1.440729,-0.331915,-0.414349,0.830516,-0.281335,0.651552,0.0,0.0,...,False,True,False,False,False,True,False,True,False,False
2,6679cb8bc99f35ede77fcb1c,1,1.440739,0.098095,1.191113,-0.261338,-0.281335,-0.098031,0.0,0.0,...,False,False,False,False,False,True,False,False,True,False
3,6679cb8bc99f35ede77fcb1d,1,1.440831,0.098095,-1.513528,-0.267686,-0.281335,0.651552,0.0,0.0,...,False,False,False,False,False,True,False,True,False,False
4,6679cb8bc99f35ede77fcb1e,1,1.440907,0.098095,1.138248,0.805124,-0.281335,0.651552,0.0,0.0,...,False,False,False,False,False,True,False,False,True,False
