In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### PySpark Setup.

In [7]:
#######################################
### START INIT ENVIRONMENT
!ls /content/drive/MyDrive/DA231o-Detecting-Suspicious-Cryptocurrency-Transactions/SparkData/spark-3.5.2-bin-hadoop3.tgz
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!tar xf /content/drive/MyDrive/DA231o-Detecting-Suspicious-Cryptocurrency-Transactions/SparkData/spark-3.5.2-bin-hadoop3.tgz
!pip install -q findspark
!pip install -q pyspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.2-bin-hadoop3"
### END INIT ENVIRONMENT

/content/drive/MyDrive/DA231o-Detecting-Suspicious-Cryptocurrency-Transactions/SparkData/spark-3.5.2-bin-hadoop3.tgz


### Mount the drive and load the data from the shared drive

All the data files are mounted at `/content/data`. You need to load the needed files while you're answering the question. Load the correct folder based on which scale factor of data you want to test on.

In [8]:
!mkdir -p /content/data
!rm -rf /content/data/*.pq
!ln -s /content/drive/MyDrive/DA231o-Detecting-Suspicious-Cryptocurrency-Transactions/data/*.pq /content/data/

In [10]:
ls /content/data

[0m[01;36mbitcoin.pq[0m@


### Create spark session

In [9]:
#######################################
### START OF PYSPARK INIT
import findspark
findspark.init()
findspark.find()
from pyspark.sql import SparkSession
input_type = 'sample'
spark = SparkSession.builder\
         .master("local")\
         .appName("Colab")\
         .config('spark.ui.port', '4050')\
         .getOrCreate()
# Spark is ready to go within Colab!
### END OF PYSPARK INIT

### Create DataFrames for all relevant files

In [51]:
#######################################
### START COMMON USER IMPORTS
#######################################

from pyspark.sql import functions as F
from pyspark.sql.functions import col,sum
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
import shutil
from pyspark.sql.window import Window
from scipy.stats import boxcox

from pyspark.ml.classification import (
    LogisticRegression,
    DecisionTreeClassifier,
    RandomForestClassifier,
    NaiveBayes,
    GBTClassifier,
    LinearSVC,
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [None]:
def read_pq(path_to_file):
  return spark.read.parquet(path_to_file)

df = read_pq('/content/data/bitcoin.pq')

In [None]:
df.show()

In [13]:
df.printSchema()

root
 |-- address: string (nullable = true)
 |-- year: long (nullable = true)
 |-- day: long (nullable = true)
 |-- length: long (nullable = true)
 |-- weight: double (nullable = true)
 |-- count: long (nullable = true)
 |-- looped: long (nullable = true)
 |-- neighbors: long (nullable = true)
 |-- income: double (nullable = true)
 |-- label: string (nullable = true)



Summary Statistics

In [14]:
df.describe().show()

+-------+--------------------+-----------------+-----------------+-----------------+--------------------+------------------+------------------+-----------------+--------------------+-----------+
|summary|             address|             year|              day|           length|              weight|             count|            looped|        neighbors|              income|      label|
+-------+--------------------+-----------------+-----------------+-----------------+--------------------+------------------+------------------+-----------------+--------------------+-----------+
|  count|             2916697|          2916697|          2916697|          2916697|             2916697|           2916697|           2916697|          2916697|             2916697|    2916697|
|   mean|                NULL|2014.475011288454| 181.457211016434|45.00859293920486|  0.5455192341637917| 721.6446428957139|238.50669884461772|2.206516137946451|4.4648890071859665E9|       NULL|
| stddev|                

Null Values count

In [17]:
df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

+-------+----+---+------+------+-----+------+---------+------+-----+
|address|year|day|length|weight|count|looped|neighbors|income|label|
+-------+----+---+------+------+-----+------+---------+------+-----+
|      0|   0|  0|     0|     0|    0|     0|        0|     0|    0|
+-------+----+---+------+------+-----+------+---------+------+-----+



Correlation Analysis

In [None]:
assembler = VectorAssembler(inputCols=["year", "day", "length", "weight", "count", "looped", "neighbors", "income"],
                            outputCol="features")
vector_df = assembler.transform(df).select("features")
correlation = Correlation.corr(vector_df, "features").head()[0]
print("Correlation Matrix:\n", correlation.toArray())

In [70]:
df.groupBy("year").count().orderBy("year").show()

+----+-----+
|year|count|
+----+-----+
|2011| 5438|
|2012| 6318|
|2013|12800|
|2014|15919|
|2015| 9154|
|2016|21107|
|2017| 9062|
|2018| 4998|
+----+-----+



In [73]:
df.groupBy("day").count().orderBy("day").show(10)

+---+-----+
|day|count|
+---+-----+
|  1|  140|
|  2|  176|
|  3|  194|
|  4|  173|
|  5|  175|
|  6|  175|
|  7|  152|
|  8|  133|
|  9|  139|
| 10|  175|
+---+-----+
only showing top 10 rows



Preproccesing of "label" column

In [40]:
# Down-sample the majority class
majority_class = df.filter(F.col("label") == "white")
minority_class = df.filter(F.col("label") != "white")
# change minority class label to "black"
minority_class = minority_class.withColumn("label", F.lit("black"))
# minority class has around 42k rows
majority_class_downsampled = majority_class.sample(fraction=0.015, seed=1024)
balanced_df = majority_class_downsampled.union(minority_class)
shutil.rmtree("/content/data/processed_data.pq", ignore_errors=True)
balanced_df.write.parquet("/content/data/processed_data.pq")

In [46]:
df = spark.read.parquet("/content/data/processed_data.pq")

# New features
df = df.withColumn("num_addresses", F.count("address").over(Window.partitionBy("address")))
df = df.withColumn(
    "day_of_week",
    F.dayofweek(F.to_date(F.concat_ws("-", F.col("year"), F.col("day")), "yyyy-D")),
)
df = df.withColumn("is_holiday", F.when(F.col("day_of_week").isin([1, 7]), 1).otherwise(0))

# Interaction features
df = df.withColumn("length_weight_interaction", F.col("length") * F.col("weight"))
df = df.withColumn("income_count_interaction", F.col("income") * F.col("count"))

# Boxcox transformation
pandas_df = df.toPandas()
for feature in ["length", "income", "weight", "count", "neighbors"]:
    pandas_df[f"{feature}_boxcox"], _ = boxcox(pandas_df[feature] + 1)

# Convert back to Spark DataFrame
df = spark.createDataFrame(pandas_df)

shutil.rmtree("/content/data/feature_engineered_data.pq", ignore_errors=True)
df.write.parquet("/content/data/feature_engineered_data.pq")

In [72]:
df = spark.read.parquet("/content/data/feature_engineered_data.pq")
# white as 1 and black as 0
df = df.withColumn("label", (df["label"] == "white").cast("int"))
df.show(5)

+--------------------+----+---+------+------------------+-----+------+---------+------------+-----+-------------+-----------+----------+-------------------------+------------------------+-----------------+------------------+--------------------+------------------+------------------+
|             address|year|day|length|            weight|count|looped|neighbors|      income|label|num_addresses|day_of_week|is_holiday|length_weight_interaction|income_count_interaction|    length_boxcox|     income_boxcox|       weight_boxcox|      count_boxcox|  neighbors_boxcox|
+--------------------+----+---+------+------------------+-----+------+---------+------------+-----+-------------+-----------+----------+-------------------------+------------------------+-----------------+------------------+--------------------+------------------+------------------+
|11116n3FtiFDohakQ...|2014|296|   144| 0.209228476694109| 1159|     0|        1|     7.599E7|    1|            1|          5|         0|       30.12

Vector assembler for feature column

In [53]:
assembler = VectorAssembler(
    inputCols=[
        "year",
        "day",
        "length",
        "weight",
        "count",
        "looped",
        "neighbors",
        "income",
        "num_addresses",
        "day_of_week",
        "length_weight_interaction",
        "income_count_interaction",
        "length_boxcox",
        "income_boxcox",
        "weight_boxcox",
        "count_boxcox",
        "neighbors_boxcox",
    ],
    outputCol="features",
)
data = assembler.transform(df)

Data split code in 80:20 ratio

In [52]:
def spark_split(df, ratios: list = [0.8, 0.2], target_col: str = "target"):
    pos = df.filter(F.col(target_col) == 1)
    neg = df.filter(F.col(target_col) == 0)

    train_pos, test_pos = pos.randomSplit(ratios, seed=1024)
    train_neg, test_neg = neg.randomSplit(ratios, seed=1024)

    return train_pos.union(train_neg), test_pos.union(test_neg)

List of used machine learning algorithms

In [80]:
algorithms = {
    "LogisticRegression": LogisticRegression(featuresCol="features", labelCol="label"),
    "DecisionTreeClassifier": DecisionTreeClassifier(featuresCol="features", labelCol="label"),
    "RandomForestClassifier": RandomForestClassifier(featuresCol="features", labelCol="label"),
    "NaiveBayes": NaiveBayes(featuresCol="features", labelCol="label"),
    "GBTClassifier": GBTClassifier(featuresCol="features", labelCol="label"),
    "LinearSVC": LinearSVC(featuresCol="features", labelCol="label")
}

Split data into train and test data

In [55]:
train, test = spark_split(data, target_col="label")

In [81]:
for name, algorithm in algorithms.items():

    model = algorithm.fit(train)
    predictions = model.transform(test)
    evaluator = BinaryClassificationEvaluator(
        labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC"
    )
    auc = evaluator.evaluate(predictions)

    # Additional evaluators for other metrics
    precision_evaluator = MulticlassClassificationEvaluator(
        labelCol="label", predictionCol="prediction", metricName="weightedPrecision"
    )
    recall_evaluator = MulticlassClassificationEvaluator(
        labelCol="label", predictionCol="prediction", metricName="weightedRecall"
    )
    f1_evaluator = MulticlassClassificationEvaluator(
        labelCol="label", predictionCol="prediction", metricName="f1"
    )
    accuracy_evaluator = MulticlassClassificationEvaluator(
        labelCol="label", predictionCol="prediction", metricName="accuracy"
    )

    precision = precision_evaluator.evaluate(predictions)
    recall = recall_evaluator.evaluate(predictions)
    f1 = f1_evaluator.evaluate(predictions)
    accuracy = accuracy_evaluator.evaluate(predictions)

    print(f"Algo = {name}")
    print(f"precision = {precision:.6f}")
    print(f"recall = {recall:.6f}")
    print(f"f1 = {f1:.6f}")
    print(f"accuracy = {accuracy:.6f}")


Algo = LogisticRegression
precision = 0.759590
recall = 0.753197
f1 = 0.750930
accuracy = 0.753197
Algo = DecisionTreeClassifier
precision = 0.903364
recall = 0.900583
f1 = 0.900519
accuracy = 0.900583
Algo = RandomForestClassifier
precision = 0.909422
recall = 0.909423
f1 = 0.909417
accuracy = 0.909423
Algo = NaiveBayes
precision = 0.489944
recall = 0.499912
f1 = 0.454319
accuracy = 0.499912
Algo = GBTClassifier
precision = 0.922342
recall = 0.921975
f1 = 0.921986
accuracy = 0.921975
Algo = LinearSVC
precision = 0.837814
recall = 0.764512
f1 = 0.749225
accuracy = 0.764512
