In [None]:
%run Imports.ipynb
#install boto3, s3fs

AWS_ACCESS_KEY_ID = "<access_key>"
AWS_SECRET_ACCESS_KEY = "<secret_key>"

In [None]:
spark = SparkSession.builder.\
    config("spark.app.name","s3app1").\
    config("spark.jars.packages","org.apache.hadoop:hadoop-aws:3.2.0,org.apache.hadoop:hadoop-common:3.2.0").\
    getOrCreate()

In [None]:
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key",AWS_ACCESS_KEY_ID)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key",AWS_SECRET_ACCESS_KEY)
spark.sparkContext._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", \
                                     "com.amazonaws.auth.InstanceProfileCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A")

In [None]:
s3_path = "s3a://nft-membership-nyu-dune/source/*"

In [None]:
df = spark.read.parquet(s3_path)

# Removing Bot Activities

In [None]:
df.createOrReplaceTempView("Load")

In [None]:
df_stage_1 = spark.sql("SELECT * FROM Load Where buyer NOT IN \
                       (SELECT buyer FROM (SELECT buyer, count(buyer) as counter from Load \
                       GROUP BY buyer, (EXTRACT(HOUR FROM block_time)), (EXTRACT(MINUTE FROM block_time))) temp\
                       WHERE counter > 1)")

In [None]:
df_stage_1.createOrReplaceTempView("Load1")

In [None]:
df_stage_2 = spark.sql("SELECT * FROM Load1 Where buyer NOT IN \
                       (SELECT buyer FROM(SELECT a.buyer, a.original_amount, b.original_amount,(b.block_time - a.block_time) time_gap \
                        FROM Load1 a INNER JOIN Load1 b \
                        ON (a.buyer = b.seller AND a.block_time < b.block_time AND a.nft_token_id = b.nft_token_id )) \
                        WHERE (EXTRACT(MINUTES FROM time_gap)) < 1 AND (EXTRACT(HOURS FROM time_gap)) = 0 AND \
                       (EXTRACT(DAYS FROM time_gap)) = 0 ORDER BY time_gap ASC)")

In [None]:
import warnings
warnings.simplefilter('ignore')

import seaborn as sns
from pyspark.ml.feature import Bucketizer
from pyspark.sql.functions import avg

In [None]:
df_stage_2.createOrReplaceTempView("LoadData")

Main Query

In [None]:
df_all = spark.sql("SELECT * FROM ( SELECT n1.buyer, n1.original_currency, n1.nft_token_id, n1.nft_contract_address, n1.original_amount as Bought, n2.original_amount as Sold, n1.block_time as Bought_Timestamp, n2.block_time as Sold_Timestamp,n1.usd_amount as Bought_amount, n2.usd_amount as Sold_amount, (n2.usd_amount - n1.usd_amount) as net, n1.platform as n1platform, n2.platform as n2platform FROM LoadData n1, LoadData n2 where n1.buyer = n2.seller AND  n2.block_time > n1.block_time AND  n1.nft_token_id is not null AND n1.nft_token_id = n2.nft_token_id AND n1.nft_contract_address = n2.nft_contract_address) holdings")

In [None]:
conf = pyspark.SparkConf()

# Ranking 1 - Total Earnings

In [None]:
df_avg = df_all.groupBy("buyer").agg(sum("net").alias("total_earnings"))
df_avg_pd=df_avg.toPandas()

In [None]:
bin_labels = ['Range 1', 'Range 2', 'Range 3', 'Range 4', 'Range 5']
custom_bins = [df_avg.agg({'total_earnings': 'min'}).select(col('min(total_earnings)').cast('float')).first()[0], -50, 50, 1000, 10000, df_avg.agg({'total_earnings': 'max'}).select(col('max(total_earnings)').cast('float')).first()[0]]

df_avg_pd["buckets"] = pd.cut(df_avg_pd["total_earnings"], bins=custom_bins, labels=bin_labels)

# Convert the Interval object to a string representation of the bin interval
df_avg_pd["buckets"] = df_avg_pd["buckets"].apply(lambda x: str(x))

# Calculate the histogram of the binned column
histogram = df_avg_pd.groupby("buckets").size().reset_index(name="count")
histogram = histogram.sort_values("buckets")

colors = ['red', 'blue', 'green', 'orange', 'purple']

# Plot the histogram
fig, ax = plt.subplots()
ax.bar(histogram["buckets"], histogram["count"], width=1, edgecolor="black", color=colors)
ax.set_xlabel("Buckets")
ax.set_ylabel("Count")
ax.set_title("Histogram of Binned Values for Ranking 1")


In [None]:
colors = ['red', 'blue', 'green', 'orange', 'purple']

# Plot the histogram
fig, ax = plt.subplots()
ax.bar(histogram["buckets"], histogram["count"], width=1, edgecolor="black", color=colors)
ax.set_xlabel("Buckets")
ax.set_ylabel("Count")
ax.set_title("Histogram of Binned Values for Ranking 1")

In [None]:
range_boundaries = [-float("inf"),df_avg.agg({'total_earnings': 'min'}).select(col('min(total_earnings)').cast('float')).first()[0], -50, 50, 1000, 10000, df_avg.agg({'total_earnings': 'max'}).select(col('max(total_earnings)').cast('float')).first()[0], float("inf")]

# Use the Bucketizer function to split the data into ranges
bucketizer = Bucketizer(splits=range_boundaries, inputCol="total_earnings", outputCol="bins_sum_net")
df_bucketized_sum_net = bucketizer.transform(df_avg)

# Group the data by the "buyer" column and the range index, and compute the average duration

# Show the resulting DataFrame with the average duration per buyer and range


# Ranking 2 - Amount Spent

In [None]:
df_avg_spent = df_stage_2.groupBy("buyer").agg(sum("usd_amount").alias("amount_spent"))
df_avg_spent_pd=df_avg_spent.toPandas()

bin_labels = ['Range 1', 'Range 2', 'Range 3', 'Range 4', 'Range 5']
custom_bins = [df_avg_spent.agg({'amount_spent': 'min'}).select(col('min(amount_spent)').cast('float')).first()[0], 100, 1000, 10000, 100000, df_avg_spent.agg({'amount_spent': 'max'}).select(col('max(amount_spent)').cast('float')).first()[0]]

df_avg_spent_pd["buckets"] = pd.cut(df_avg_spent_pd["amount_spent"], bins=custom_bins, labels=bin_labels)

# Convert the Interval object to a string representation of the bin interval
df_avg_spent_pd["buckets"] = df_avg_spent_pd["buckets"].apply(lambda x: str(x))

# Calculate the histogram of the binned column
histogram = df_avg_spent_pd.groupby("buckets").size().reset_index(name="count")
histogram = histogram.sort_values("buckets")

colors = ['red', 'blue', 'green', 'orange', 'purple']

# Plot the histogram
fig, ax = plt.subplots()
ax.bar(histogram["buckets"], histogram["count"], width=1, edgecolor="black", color=colors)
ax.set_xlabel("Buckets")
ax.set_ylabel("Count")
ax.set_title("Histogram of Binned Values for Ranking 2")


In [None]:
range_boundaries = custom_bins
#range_boundaries = [-float("inf"),df_avg_spent.agg({'amount_spent': 'min'}).select(col('min(amount_spent)').cast('float')).first()[0], -50, 50, 1000, 10000, df_avg_spent.agg({'amount_spent': 'max'}).select(col('max(amount_spent)').cast('float')).first()[0], float("inf")]

# Use the Bucketizer function to split the data into ranges
bucketizer = Bucketizer(splits=range_boundaries, inputCol="amount_spent", outputCol="bins_spent")
df_bucketized_spent = bucketizer.transform(df_avg_spent)


# Ranking 3 - number of transactions

In [None]:
df_count = df_stage_2.groupBy("buyer").agg(count("*").alias("number_txns"))
df_count_pd=df_count.toPandas()
# df_count.select("number_txns").describe().show()

In [None]:
#to get the spread
unique_values = df_count_pd['number_txns'].value_counts().to_frame().reset_index()
unique_values.columns = ['unique_values', 'count']
bin_labels = ['Range 1', 'Range 2', 'Range 3', 'Range 4', 'Range 5']
custom_bins = [df_count.agg({'number_txns': 'min'}).select(col('min(number_txns)').cast('float')).first()[0], 2, 5, 10, 20, df_count.agg({'number_txns': 'max'}).select(col('max(number_txns)').cast('float')).first()[0]]
df_count_pd["buckets"] = pd.cut(df_count_pd["number_txns"], bins=custom_bins, labels=bin_labels)
df_count_pd["buckets"] = df_count_pd["buckets"].apply(lambda x: str(x))
histogram = df_count_pd.groupby("buckets").size().reset_index(name="count")
histogram = histogram.sort_values("buckets")
colors = ['red', 'blue', 'green', 'orange', 'purple']
fig, ax = plt.subplots()
ax.bar(histogram["buckets"], histogram["count"], width=1, edgecolor="black", color=colors)
ax.set_xlabel("Buckets")
ax.set_ylabel("Count")
ax.set_title("Histogram of Binned Values for Ranking 3")

In [None]:
range_boundaries = custom_bins

bucketizer_ranking2 = Bucketizer(splits=range_boundaries, inputCol="number_txns", outputCol="bins_number_txns")
df_bucketized_number_txns = bucketizer_ranking2.transform(df_count)
# df_bucketized_number_txns.show()

# Diversity Ranking 4 - Number of unique currencies transaction done in

In [None]:
df_currency = df_all.groupBy("buyer").agg(countDistinct("original_currency").alias("num_currency"))
df_currency_pd=df_currency.toPandas()
# df_currency.select("num_currency").describe().show()

In [None]:
unique_values = df_currency_pd['num_currency'].value_counts().to_frame().reset_index()
unique_values.columns = ['unique_values', 'count']
print(unique_values)

In [None]:
bin_labels = ['Range 1', 'Range 2', 'Range 3', 'Range 4']
custom_bins = [df_currency.agg({'num_currency': 'min'}).select(col('min(num_currency)').cast('float')).first()[0], 2, 3, 4, df_currency.agg({'num_currency': 'max'}).select(col('max(num_currency)').cast('float')).first()[0] + 2]
df_currency_pd["buckets"] = pd.cut(df_currency_pd["num_currency"], bins=custom_bins, labels=bin_labels)
df_currency_pd["buckets"] = df_currency_pd["buckets"].apply(lambda x: str(x))
histogram = df_currency_pd.groupby("buckets").size().reset_index(name="count")
histogram = histogram.sort_values("buckets")
colors = ['red', 'blue', 'green', 'yellow']
fig, ax = plt.subplots()
ax.bar(histogram["buckets"], histogram["count"], width=1, edgecolor="black", color=colors)
ax.set_xlabel("Buckets")
ax.set_ylabel("Count")
ax.set_title("Histogram of Binned Values for Ranking 4")

In [None]:
range_boundaries = custom_bins
bucketizer_ranking4 = Bucketizer(splits=range_boundaries, inputCol="num_currency", outputCol="bins_num_currency")
df_bucketized_df_currency = bucketizer_ranking4.transform(df_currency)
# df_bucketized_df_currency.show()

# Ranking 5 - Average Holding Duration (in seconds)

In [None]:
df_all = df_all.withColumn("duration", (unix_timestamp(col("Sold_Timestamp")) - unix_timestamp(col("Bought_Timestamp"))))

In [None]:
df_avg_duration = df_all.groupBy("buyer").agg(avg("duration").alias("avg_duration"))
# df_avg_duration.select("avg_duration").describe().show()
df_avg_duration_pd=df_avg_duration.toPandas()

In [None]:
bin_labels = ['Range 1', 'Range 2', 'Range 3', 'Range 4', 'Range 5']
#in a day, in a week , in month, in year(max value)
custom_bins = [0, 86400,604800,2628000, 31540000, df_avg_duration.agg({'avg_duration': 'max'}).select(col('max(avg_duration)').cast('float')).first()[0]]

df_avg_duration_pd["buckets"] = pd.cut(df_avg_duration_pd["avg_duration"], bins=custom_bins, labels=bin_labels)
df_avg_duration_pd["buckets"] = df_avg_duration_pd["buckets"].apply(lambda x: str(x))
histogram = df_avg_duration_pd.groupby("buckets").size().reset_index(name="count")
histogram = histogram.sort_values("buckets")
colors = ['red', 'blue', 'green', 'orange']
fig, ax = plt.subplots()
ax.bar(histogram["buckets"], histogram["count"], width=1, edgecolor="black", color=colors)
ax.set_xlabel("Buckets")
ax.set_ylabel("Count")
ax.set_title("Histogram of Binned Values for Ranking 5")

In [None]:
range_boundaries = custom_bins
bucketizer_ranking5 = Bucketizer(splits=range_boundaries, inputCol="avg_duration", outputCol="bins_avg_duration")
df_bucketized_avg_duration = bucketizer_ranking5.transform(df_avg_duration)
# df_bucketized_avg_duration.show()

# Ranking 6 - Number of NFT Contract Addresses

In [None]:
df_nft = df_stage_2.groupBy("buyer").agg(sum("number_of_items").alias("number_of_items"))
df_nft_pd=df_nft.toPandas()

bin_labels = ['Range 1', 'Range 2', 'Range 3', 'Range 4', 'Range 5']
custom_bins = [0, 10, 20, 50, 100, df_nft.agg({'number_of_items': 'max'}).select(col('max(number_of_items)').cast('float')).first()[0]]

df_nft_pd["buckets"] = pd.cut(df_nft_pd["number_of_items"], bins=custom_bins, labels=bin_labels)

# Convert the Interval object to a string representation of the bin interval
df_nft_pd["buckets"] = df_nft_pd["buckets"].apply(lambda x: str(x))

# Calculate the histogram of the binned column
histogram = df_nft_pd.groupby("buckets").size().reset_index(name="count")
histogram = histogram.sort_values("buckets")

colors = ['red', 'blue', 'green', 'orange', 'purple']

# Plot the histogram
fig, ax = plt.subplots()
ax.bar(histogram["buckets"], histogram["count"], width=1, edgecolor="black", color=colors)
ax.set_xlabel("Buckets")
ax.set_ylabel("Count")
ax.set_title("Histogram of Binned Values for Ranking 6")


In [None]:
range_boundaries = custom_bins

# Use the Bucketizer function to split the data into ranges
bucketizer = Bucketizer(splits=range_boundaries, inputCol="number_of_items", outputCol="bins_nft")
df_bucketized_nft = bucketizer.transform(df_nft)

# Group the data by the "buyer" column and the range index, and compute the average duration

# Show the resulting DataFrame with the average duration per buyer and range
# df_bucketized_nft.show()

# Joining Bins

In [None]:
joined_df = df_bucketized_spent \
.join(df_bucketized_avg_duration, on='buyer', how='inner') \
.join(df_bucketized_df_currency, on='buyer', how='inner') \
.join(df_bucketized_number_txns, on='buyer', how='inner') \
.join(df_bucketized_nft, on='buyer', how='inner') \
.join(df_bucketized_sum_net, on='buyer', how='inner')

In [None]:
joined_df.show(5)

In [None]:
weight_1 = 10
weight_2 = 10
weight_3 = 5
weight_4 = 10
weight_5 = 10
weight_6 = 10

joined_df = joined_df.withColumn("Score", reduce(lambda x, y: x + y, [(col(col_name) * const) for col_name, const in [("bins_spent", weight_1), ("bins_avg_duration", weight_2), ("bins_num_currency", weight_3), ("bins_number_txns", weight_4), ("bins_nft", weight_5), ("bins_sum_net", weight_6)]]))


In [None]:
joined_df.show(10)

In [None]:
percentiles = joined_df.approxQuantile("Score", [0.2, 0.8], 0.01)

In [None]:
joined_df = joined_df.withColumn(
    "tier",
    when(col("Score") <= percentiles[0], lit("tier1"))
    .when((col("Score") > percentiles[0]) & (col("Score") <= percentiles[1]), lit("tier2"))
    .otherwise(lit("tier3"))
)
