In [1]:
!pip install pyspark



DEPRECATION: mermaid 0.3.2 has a non-standard dependency specifier torch>=1.7torchvision. pip 23.3 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of mermaid or contact the author to suggest that they release a version with a conforming dependency specifiers. Discussion can be found at https://github.com/pypa/pip/issues/12063


In [2]:
#load_ext nb_black
from pyspark.sql import SparkSession
#session all related to df not rdd
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()

In [3]:
spark

In [4]:
df = spark.read.csv("bank_details.csv", inferSchema=True, header=True)

In [5]:
df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [6]:
df.show(2)

+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|   type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1|PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1|PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
only showing top 2 rows



Drop columns from the data :
* In any machine learning project, we always have a few columns that are not required for solving the problem.

for instance, we can use the drop function to remove the column from the data. Use the asterisk (*) sign before the list to drop multiple columns from the dataset:
my_data = my_data.drop(*['Batsman', 'Bowler', 'Id'])
my_data.columns

For our instance ,we don't really need " nameOrig " , " nameDest " , " oldbalanceDest " , " oldbalanceDest " ," newbalanceDest " ,we really don't need these as these are not the factors which affects the frud data or not.

In [7]:
df = df.select("type", "amount", "oldbalanceOrg", "newbalanceOrig", "isFraud")

we are only slecting some colum which is an alternative for droping a colum rather ,efficent.

In [8]:
df.show(2)

+-------+-------+-------------+--------------+-------+
|   type| amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+-------+-------------+--------------+-------+
|PAYMENT|9839.64|     170136.0|     160296.36|      0|
|PAYMENT|1864.28|      21249.0|      19384.72|      0|
+-------+-------+-------------+--------------+-------+
only showing top 2 rows



In [9]:
df.printSchema()

root
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- isFraud: integer (nullable = true)



## Data Dimensions 

In [10]:
(df.count() , len(df.columns))

(6362620, 5)

In [11]:
df.select('amount','oldbalanceOrg','newbalanceOrig','isFraud').describe().show()

+-------+------------------+------------------+-----------------+--------------------+
|summary|            amount|     oldbalanceOrg|   newbalanceOrig|             isFraud|
+-------+------------------+------------------+-----------------+--------------------+
|  count|           6362620|           6362620|          6362620|             6362620|
|   mean|179861.90354913156| 833883.1040744876|855113.6685785913|0.001290820448180152|
| stddev| 603858.2314629381|2888242.6730375625|2924048.502954259|  0.0359047968016041|
|    min|               0.0|               0.0|              0.0|                   0|
|    max|     9.244551664E7|     5.958504037E7|    4.958504037E7|                   1|
+-------+------------------+------------------+-----------------+--------------------+



In [12]:
# null values in each column
data_agg = df.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns])
data_agg.show()

+----+------+-------------+--------------+-------+
|type|amount|oldbalanceOrg|newbalanceOrig|isFraud|
+----+------+-------------+--------------+-------+
|   0|     0|            0|             0|      0|
+----+------+-------------+--------------+-------+



In [13]:
# value counts of Type column
df.groupBy('type').count().show()

+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 532909|
| CASH_IN|1399284|
|CASH_OUT|2237500|
| PAYMENT|2151495|
|   DEBIT|  41432|
+--------+-------+



### Train/test split

In [14]:
train, test = df.randomSplit([0.7, 0.3], seed=7)

so here is thr things we need to select data for train and test 


random selection of data makes the good distribution of record ,leads to good model

data weight is in fraction so 
0.7 ie 70% of the data from is being randomly selected and loaded but with a random factor seed ie 7

randomSplit:
Seed is random number generator seed. This is important because you might want to be able to hard code the same seed for your tests so that you always get the same results in test, but in prod code replace it with current time in milliseconds or a random number from a good entropy source.

Basically the splited in random way. 

In [15]:
print(f"Train set length: {train.count()} records")
print(f"Test set length: {test.count()} records")

Train set length: 4454328 records
Test set length: 1908292 records


In [16]:
train.show(2)

+-------+------+-------------+--------------+-------+
|   type|amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+------+-------------+--------------+-------+
|CASH_IN|  5.66|   5061561.06|    5061566.72|      0|
|CASH_IN| 13.86|   6868100.18|    6868114.04|      0|
+-------+------+-------------+--------------+-------+
only showing top 2 rows



### Dtypes
In this dataset, any column of type string is treated as a categorical feature, but sometimes we might have numeric features we want treated as categorical or vice versa. We’ll need to carefully identify which columns are numeric and which are categorical.

In [17]:
train.dtypes

[('type', 'string'),
 ('amount', 'double'),
 ('oldbalanceOrg', 'double'),
 ('newbalanceOrig', 'double'),
 ('isFraud', 'int')]

To use ML we need to conver string into values ,check the Data Type for that!

In [18]:
catCols = [x for (x, dataType) in train.dtypes if dataType == "string"]
numCols = [ x for (x, dataType) in train.dtypes if (dataType == "double") ]
#numCols = [ x for (x, dataType) in train.dtypes if ((dataType == "double") & (x != "isFraud")) ]
#skip the "isFraud" but 

Here we seperated the numerical col and string col;

these fancy is [ ] is just way to get the data in list format!! ie the output will be list!


In [19]:
print(numCols)
print(catCols)

['amount', 'oldbalanceOrg', 'newbalanceOrig']
['type']


### One hot encoding

StringIndexer:
Converts a single feature to an index feature.
http://spark.apache.org/docs/latest/ml-features#stringindexer


OneHotEncoder:
http://spark.apache.org/docs/latest/ml-features#onehotencoder

For more info: http://spark.apache.org/docs/latest/ml-features

In [20]:
train.agg(F.countDistinct("type")).show()

+--------------------+
|count(DISTINCT type)|
+--------------------+
|                   5|
+--------------------+



use agg when sum is indeed.
F is sql function ie countDistinct
DF goes along with sql as DF is in tabel format.
SQL function maily applies in the record of the colum or the whole record itself.


In [21]:
train.groupBy("type").count().show()

+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 372938|
| CASH_IN| 979390|
|CASH_OUT|1565416|
| PAYMENT|1507530|
|   DEBIT|  29054|
+--------+-------+



## Encode Categorical Variables using PySpark
Most machine learning algorithms accept the data only in numerical form. So, it is essential to convert any categorical variables present in our dataset into numbers.
* String Indexing is similar to Label Encoding.

## One-Hot Encoding
One-hot encoding is a concept every data scientist should know. I’ve relied on it multiple times when dealing with missing values. It’s a lifesaver!

Here’s the caveat – Spark’s OneHotEncoder does not directly encode the categorical variable.
> First, we need to use the String Indexer to convert the variable into numerical form and then use OneHotEncoderEstimator to encode multiple columns of the dataset.

It creates a Sparse Vector for each row.

In [22]:
from pyspark.ml.feature import (
    OneHotEncoder,
    StringIndexer,
)

df = df.select("type","isFraud")
#categorical value 

In [23]:
#catCols are the cols with string
string_indexer = [
    StringIndexer(inputCol=x, outputCol=x + "_StringIndexer", handleInvalid="skip")
    for x in catCols
]

here fun takes 2 parameters and  _StringIndexer will give the numeric value of of x as output when put like this : 
x + "_StringIndexer"

## So we use it to conver string into numerical values.

# fromat for [] is var =[ output function/var for(loop)   ] then the output will stored as a list of item.


In [24]:
string_indexe=string_indexer[0].fit(df).transform(df)
string_indexe.show()

+--------+---------+-------------+--------------+-------+------------------+
|    type|   amount|oldbalanceOrg|newbalanceOrig|isFraud|type_StringIndexer|
+--------+---------+-------------+--------------+-------+------------------+
| PAYMENT|  9839.64|     170136.0|     160296.36|      0|               1.0|
| PAYMENT|  1864.28|      21249.0|      19384.72|      0|               1.0|
|TRANSFER|    181.0|        181.0|           0.0|      1|               3.0|
|CASH_OUT|    181.0|        181.0|           0.0|      1|               0.0|
| PAYMENT| 11668.14|      41554.0|      29885.86|      0|               1.0|
| PAYMENT|  7817.71|      53860.0|      46042.29|      0|               1.0|
| PAYMENT|  7107.77|     183195.0|     176087.23|      0|               1.0|
| PAYMENT|  7861.64|    176087.23|     168225.59|      0|               1.0|
| PAYMENT|  4024.36|       2671.0|           0.0|      0|               1.0|
|   DEBIT|  5337.77|      41720.0|      36382.23|      0|               4.0|

In [25]:
one_hot_encoder = [
    OneHotEncoder(
        inputCols=[f"{x}_StringIndexer" for x in catCols],
        outputCols=[f"{x}_OneHotEncoder" for x in catCols],
    )
]

In [26]:
one_hot_encoder_df=one_hot_encoder[0].fit(string_indexe).transform(string_indexe)
one_hot_encoder_df.show()


+--------+---------+-------------+--------------+-------+------------------+------------------+
|    type|   amount|oldbalanceOrg|newbalanceOrig|isFraud|type_StringIndexer|type_OneHotEncoder|
+--------+---------+-------------+--------------+-------+------------------+------------------+
| PAYMENT|  9839.64|     170136.0|     160296.36|      0|               1.0|     (4,[1],[1.0])|
| PAYMENT|  1864.28|      21249.0|      19384.72|      0|               1.0|     (4,[1],[1.0])|
|TRANSFER|    181.0|        181.0|           0.0|      1|               3.0|     (4,[3],[1.0])|
|CASH_OUT|    181.0|        181.0|           0.0|      1|               0.0|     (4,[0],[1.0])|
| PAYMENT| 11668.14|      41554.0|      29885.86|      0|               1.0|     (4,[1],[1.0])|
| PAYMENT|  7817.71|      53860.0|      46042.29|      0|               1.0|     (4,[1],[1.0])|
| PAYMENT|  7107.77|     183195.0|     176087.23|      0|               1.0|     (4,[1],[1.0])|
| PAYMENT|  7861.64|    176087.23|     1

## one-hot numeric array of the features

### Vector assembling

VectorAssembler:
Combines the values of input columns into a single vector.
http://spark.apache.org/docs/latest/ml-features#vectorassembler

It accepts numeric, boolean and vector type columns


In [27]:
from pyspark.ml.feature import VectorAssembler

In [28]:
assemblerInput = [x for x in numCols]
assemblerInput += [f"{x}_OneHotEncoder" for x in catCols]

In [29]:
assemblerInput

['amount', 'oldbalanceOrg', 'newbalanceOrig', 'type_OneHotEncoder']

In [30]:
vector_assembler = VectorAssembler(
    inputCols=assemblerInput, outputCol="VectorAssembler_features"
)

## Transformers and Estimators
As the name suggests, Transformers convert one dataframe into another either by updating the current values of a particular column (like converting categorical columns to numeric) or mapping it to some other values by using a defined logic.

An Estimator implements the fit() method on a dataframe and produces a model. For example, LogisticRegression is an Estimator that trains a classification model when we call the fit() method.




## Now all the feature of record will be represted by the a single vector

In [31]:
stages = []
stages += string_indexer
stages += one_hot_encoder
stages += [vector_assembler]


We need stages and pipeline so that we don't need to apply the indexer and encoder to to

In [32]:
stages

[StringIndexer_9f2895271ac7,
 OneHotEncoder_0623035e272e,
 VectorAssembler_a751e6df69ca]

> A pipeline allows us to maintain the data flow of all the relevant transformations that are required to reach the end result.

In [33]:
#%%time
from pyspark.ml import Pipeline

pipeline = Pipeline().setStages(stages)
model = pipeline.fit(train)

pp_df = model.transform(train)

In [34]:
pp_df.select(
    "type", "amount", "oldbalanceOrg", "newbalanceOrig", "VectorAssembler_features",
).show(truncate=False)

+-------+------+-------------+--------------+---------------------------------------------------+
|type   |amount|oldbalanceOrg|newbalanceOrig|VectorAssembler_features                           |
+-------+------+-------------+--------------+---------------------------------------------------+
|CASH_IN|5.66  |5061561.06   |5061566.72    |[5.66,5061561.06,5061566.72,0.0,0.0,1.0,0.0]       |
|CASH_IN|13.86 |6868100.18   |6868114.04    |[13.86,6868100.18,6868114.04,0.0,0.0,1.0,0.0]      |
|CASH_IN|14.54 |3347286.5    |3347301.03    |[14.54,3347286.5,3347301.03,0.0,0.0,1.0,0.0]       |
|CASH_IN|15.52 |4368030.06   |4368045.59    |[15.52,4368030.06,4368045.59,0.0,0.0,1.0,0.0]      |
|CASH_IN|17.33 |8964056.72   |8964074.05    |[17.33,8964056.72,8964074.05,0.0,0.0,1.0,0.0]      |
|CASH_IN|25.01 |3077402.04   |3077427.04    |[25.01,3077402.04,3077427.04,0.0,0.0,1.0,0.0]      |
|CASH_IN|38.77 |6210013.72   |6210052.49    |[38.77,6210013.72,6210052.49,0.0,0.0,1.0,0.0]      |
|CASH_IN|52.64 |3468

In [35]:
pp_df.show()

+-------+------+-------------+--------------+-------+------------------+------------------+------------------------+
|   type|amount|oldbalanceOrg|newbalanceOrig|isFraud|type_StringIndexer|type_OneHotEncoder|VectorAssembler_features|
+-------+------+-------------+--------------+-------+------------------+------------------+------------------------+
|CASH_IN|  5.66|   5061561.06|    5061566.72|      0|               2.0|     (4,[2],[1.0])|    [5.66,5061561.06,...|
|CASH_IN| 13.86|   6868100.18|    6868114.04|      0|               2.0|     (4,[2],[1.0])|    [13.86,6868100.18...|
|CASH_IN| 14.54|    3347286.5|    3347301.03|      0|               2.0|     (4,[2],[1.0])|    [14.54,3347286.5,...|
|CASH_IN| 15.52|   4368030.06|    4368045.59|      0|               2.0|     (4,[2],[1.0])|    [15.52,4368030.06...|
|CASH_IN| 17.33|   8964056.72|    8964074.05|      0|               2.0|     (4,[2],[1.0])|    [17.33,8964056.72...|
|CASH_IN| 25.01|   3077402.04|    3077427.04|      0|           

In [36]:
test.count()

1908292

In [37]:
df_test=test.where(test.isFraud == 1)

In [38]:
df_test.show()

+--------+---------+-------------+--------------+-------+
|    type|   amount|oldbalanceOrg|newbalanceOrig|isFraud|
+--------+---------+-------------+--------------+-------+
|CASH_OUT| 13707.11|     13707.11|           0.0|      1|
|CASH_OUT| 14949.84|     14949.84|           0.0|      1|
|CASH_OUT|  22877.0|      22877.0|           0.0|      1|
|CASH_OUT| 29707.86|     29707.86|           0.0|      1|
|CASH_OUT| 33332.86|     33332.86|           0.0|      1|
|CASH_OUT| 39713.28|          0.0|           0.0|      1|
|CASH_OUT| 48375.02|     48375.02|           0.0|      1|
|CASH_OUT|  59835.0|      59835.0|           0.0|      1|
|CASH_OUT| 65488.05|     65488.05|           0.0|      1|
|CASH_OUT| 85354.69|     85354.69|           0.0|      1|
|CASH_OUT| 86070.17|     86070.17|           0.0|      1|
|CASH_OUT| 89631.24|     89631.24|           0.0|      1|
|CASH_OUT|132842.64|      4499.08|           0.0|      1|
|CASH_OUT|169941.73|    169941.73|           0.0|      1|
|CASH_OUT|1817

### Logistic Regression

In [39]:
from pyspark.ml.classification import LogisticRegression

In [40]:
data = pp_df.select(
    F.col("VectorAssembler_features").alias("features"),
    F.col("isFraud").alias("label"),
)

In [41]:
data.show(5, truncate=False)

+---------------------------------------------+-----+
|features                                     |label|
+---------------------------------------------+-----+
|[5.66,5061561.06,5061566.72,0.0,0.0,1.0,0.0] |0    |
|[13.86,6868100.18,6868114.04,0.0,0.0,1.0,0.0]|0    |
|[14.54,3347286.5,3347301.03,0.0,0.0,1.0,0.0] |0    |
|[15.52,4368030.06,4368045.59,0.0,0.0,1.0,0.0]|0    |
|[17.33,8964056.72,8964074.05,0.0,0.0,1.0,0.0]|0    |
+---------------------------------------------+-----+
only showing top 5 rows



In [42]:
%%time
model = LogisticRegression().fit(data)
data=model.transform(data)


CPU times: total: 0 ns
Wall time: 24.3 s


In [43]:
data.show()

+--------------------+-----+--------------------+-----------+----------+
|            features|label|       rawPrediction|probability|prediction|
+--------------------+-----+--------------------+-----------+----------+
|[5.66,5061561.06,...|    0|[297.631026522620...|  [1.0,0.0]|       0.0|
|[13.86,6868100.18...|    0|[301.242388859440...|  [1.0,0.0]|       0.0|
|[14.54,3347286.5,...|    0|[294.205312913211...|  [1.0,0.0]|       0.0|
|[15.52,4368030.06...|    0|[296.245566519871...|  [1.0,0.0]|       0.0|
|[17.33,8964056.72...|    0|[305.431873076313...|  [1.0,0.0]|       0.0|
|[25.01,3077402.04...|    0|[293.666648024759...|  [1.0,0.0]|       0.0|
|[38.77,6210013.72...|    0|[299.928863067428...|  [1.0,0.0]|       0.0|
|[52.64,3468172.49...|    0|[294.449692300089...|  [1.0,0.0]|       0.0|
|[57.98,9021204.76...|    0|[305.549042497369...|  [1.0,0.0]|       0.0|
|[66.78,61853.0,61...|    0|[287.642434339782...|  [1.0,0.0]|       0.0|
|[71.85,1.68834513...|    0|[321.264486533158...|  

## Model Testing

**Preparing the test data by passing it to the pipeline**

In [44]:
model = pipeline.fit(df_test)

pp_df_test = model.transform(df_test)

In [45]:
data_test = pp_df_test.select(
    F.col("VectorAssembler_features").alias("features"),
    F.col("isFraud").alias("label"),
)

In [46]:
data_test.show(5, truncate=False)

+---------------------------+-----+
|features                   |label|
+---------------------------+-----+
|[13707.11,13707.11,0.0,0.0]|1    |
|[14949.84,14949.84,0.0,0.0]|1    |
|[22877.0,22877.0,0.0,0.0]  |1    |
|[29707.86,29707.86,0.0,0.0]|1    |
|[33332.86,33332.86,0.0,0.0]|1    |
+---------------------------+-----+
only showing top 5 rows



In [47]:
model = LogisticRegression().fit(data_test)
data=model.transform(data_test)
data.show()

+--------------------+-----+--------------------+-----------+----------+
|            features|label|       rawPrediction|probability|prediction|
+--------------------+-----+--------------------+-----------+----------+
|[13707.11,13707.1...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[14949.84,14949.8...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[22877.0,22877.0,...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[29707.86,29707.8...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[33332.86,33332.8...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|  (4,[0],[39713.28])|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[48375.02,48375.0...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[59835.0,59835.0,...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[65488.05,65488.0...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[85354.69,85354.6...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[86070.17,86070.1...|    1|[-Infinity,Infinity]|  

In [48]:
df.limit

<bound method DataFrame.limit of DataFrame[type: string, amount: double, oldbalanceOrg: double, newbalanceOrig: double, isFraud: int]>

In [49]:
model.summary.areaUnderROC

1.0

In [50]:
model.summary.pr.show()

+------+---------+
|recall|precision|
+------+---------+
|   0.0|      1.0|
|   1.0|      1.0|
+------+---------+



In [51]:
pip install "pymongo[srv]"

Note: you may need to restart the kernel to use updated packages.


DEPRECATION: mermaid 0.3.2 has a non-standard dependency specifier torch>=1.7torchvision. pip 23.3 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of mermaid or contact the author to suggest that they release a version with a conforming dependency specifiers. Discussion can be found at https://github.com/pypa/pip/issues/12063


In [52]:
from pymongo import MongoClient

# Connect to MongoDB (same as your previous code)
client = MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False)
db = client["test"]  # Access the "bdt" database
collection = db["bdt"]  # Access the "bdt" collection within "bdt" database

In [53]:
target_name_orig = "C1231006815"

query = {"nameOrig": target_name_orig}
item_details = collection.find_one(query)

if item_details:
    print(f"Details for nameOrig {target_name_orig}:\n{item_details}")
else:
    print(f"No matching record found for nameOrig {target_name_orig}")

Details for nameOrig C1231006815:
{'_id': ObjectId('6604284316ae4e14301d7dd2'), 'step': 1, 'type': 'PAYMENT', 'amount': 9839.64, 'nameOrig': 'C1231006815', 'oldbalanceOrg': 170136.0, 'newbalanceOrig': 160296.36, 'nameDest': 'M1979787155', 'oldbalanceDest': 0.0, 'newbalanceDest': 0.0, 'isFraud': 0, 'isFlaggedFraud': 0}


In [55]:
pipeline = [
    {"$group": {"_id": "$isFraud", "count": {"$sum": 1}}}
]

result = collection.aggregate(pipeline)

for entry in result:
    print(f"Value: {entry['_id']}, Count: {entry['count']}")

Value: 1, Count: 8213
Value: 0, Count: 6354407


In [56]:
pipeline = [
    {"$group": {"_id": "$type", "count": {"$sum": 1}}}
]

result = collection.aggregate(pipeline)

for entry in result:
    print(f"Value: {entry['_id']}, Count: {entry['count']}")

Value: TRANSFER, Count: 532909
Value: PAYMENT, Count: 2151495
Value: DEBIT, Count: 41432
Value: CASH_OUT, Count: 2237500
Value: CASH_IN, Count: 1399284


In [57]:
pipeline = [
    {
        "$group": {
            "_id": {"type": "$type", "isFraud": "$isFraud"},
            "count": {"$sum": 1}
        }
    },
    {
        "$project": {
            "_id": 0,
            "type": "$_id.type",
            "isFraud": "$_id.isFraud",
            "count": "$count"
        }
    }
]

result = collection.aggregate(pipeline)
for entry in result:
    print(f"Type: {entry['type']}, isFraud: {entry['isFraud']}, Count: {entry['count']}")

Type: CASH_OUT, isFraud: 1, Count: 4116
Type: DEBIT, isFraud: 0, Count: 41432
Type: CASH_OUT, isFraud: 0, Count: 2233384
Type: TRANSFER, isFraud: 0, Count: 528812
Type: CASH_IN, isFraud: 0, Count: 1399284
Type: PAYMENT, isFraud: 0, Count: 2151495
Type: TRANSFER, isFraud: 1, Count: 4097
