In [4]:
import pandas as pd
import json
import sys
import os
import time
from tqdm import tqdm

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
spark = SparkSession.builder \
                    .appName("example") \
                    .config("spark.executor.memory", "10g") \
                    .config("spark.executor.cores", "2") \
                    .config("spark.driver.memory", "10g").getOrCreate()

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [10]:
from_file_path = "transfer_from-001.json"

from_data = json.load(open(from_file_path, "r"))

print("Loaded json")
# spark_from_df = spark.createDataFrame(from_df)

Loaded json


In [4]:
schema = StructType([
    StructField("contract_address", StringType(), True),
    StructField("transaction_hash", StringType(), True),
    StructField("log_index", IntegerType(), True),
    StructField("block_number", IntegerType(), True),
    StructField("from_address", StringType(), True),
    StructField("to_address", StringType(), True),
    StructField("value", FloatType(), True)
])

In [3]:
len(from_data)

6159195

In [41]:
from_df.show(3)

+--------------------+--------------------+---------+------------+--------------------+--------------------+--------+
|    contract_address|    transaction_hash|log_index|block_number|        from_address|          to_address|   value|
+--------------------+--------------------+---------+------------+--------------------+--------------------+--------+
|0xdac17f958d2ee52...|0x578be3b736d74ff...|      354|    18584924|0x40be1331dc553b6...|0x9c4593831c33330...|   500.0|
|0xdac17f958d2ee52...|0x8b29fe0c65e6a7d...|      378|    18584924|0x991377861891c60...|0xd9c1e3c7e1c046d...|841.6164|
|0xa0b86991c6218b3...|0x00a3afa81b2bc5a...|       32|    18584925|0x2f45724d7e384b3...|0x9008d19f58aabd9...|  5000.0|
+--------------------+--------------------+---------+------------+--------------------+--------------------+--------+
only showing top 3 rows



In [6]:
from_data[0:5]

[{'contract_address': '0xdac17f958d2ee523a2206206994597c13d831ec7',
  'transaction_hash': '0x578be3b736d74ffe00ae99702d29f165ae643b857b67ff956e1308cbd10f436a',
  'log_index': 354,
  'block_number': 18584924,
  'from_address': '0x40be1331dc553b6962b3db197edb2ea371a19b98',
  'to_address': '0x9c4593831c33330ba75bbed426ccf572e914f322',
  'value': 500.0},
 {'contract_address': '0xdac17f958d2ee523a2206206994597c13d831ec7',
  'transaction_hash': '0x8b29fe0c65e6a7de1f855471faab98c7d8c8f242bf392740cece9beff75b41fa',
  'log_index': 378,
  'block_number': 18584924,
  'from_address': '0x991377861891c60d2f21373ec6c04eda9700d950',
  'to_address': '0xd9c1e3c7e1c046da41353700b4987f07d7a1c212',
  'value': 841.616373},
 {'contract_address': '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48',
  'transaction_hash': '0x00a3afa81b2bc5a376476d1c51d17c84d0b1a9bf947baebe487777999e7dffe8',
  'log_index': 32,
  'block_number': 18584925,
  'from_address': '0x2f45724d7e384b38d5c97206e78470544304887f',
  'to_address': '0

In [8]:
to_interact = from_df.groupby(['to_address','from_address'])["from_address"].count().reset_index(name='from_add_count').rename(columns={"to_address": "user", "from_address": "other_user", "from_add_count":"count"})

In [11]:
from_df = pd.DataFrame.from_records(from_data)

In [12]:
from_interact = from_df.groupby(['from_address','to_address'])["to_address"].count().reset_index(name='to_add_count').rename(columns={"from_address": "user", "to_address": "other_user", "to_add_count":"count"})

In [13]:
%store from_interact

Stored 'from_interact' (DataFrame)


In [9]:
%store to_interact

Stored 'to_interact' (DataFrame)


In [12]:
to_interact.shape

(2181346, 3)

In [10]:
from_interact.shape

(2354785, 3)

In [14]:
to_interact["idx"] = to_interact["user"]+to_interact["other_user"]
to_interact.head()

Unnamed: 0,user,other_user,count,idx
0,0x00000000000030e5959659622cb7eb50aa20ee52,0x01949723055a451229c7ba3a817937c966748f76,2,0x00000000000030e5959659622cb7eb50aa20ee520x01...
1,0x00000000000030e5959659622cb7eb50aa20ee52,0x058d79a4c6eb5b11d0248993ffa1faa168ddd3c0,4,0x00000000000030e5959659622cb7eb50aa20ee520x05...
2,0x00000000000030e5959659622cb7eb50aa20ee52,0x06b1655b9d560de112759b4f0bf57d6f005e72fe,2,0x00000000000030e5959659622cb7eb50aa20ee520x06...
3,0x00000000000030e5959659622cb7eb50aa20ee52,0x06bfac47528d0d6a2de3b1ae5d8214ad45b4b945,4,0x00000000000030e5959659622cb7eb50aa20ee520x06...
4,0x00000000000030e5959659622cb7eb50aa20ee52,0x0928592f80d63d474257a7b797120e301ba2d987,10,0x00000000000030e5959659622cb7eb50aa20ee520x09...


In [15]:
to_interact_dict = to_interact.set_index("idx").to_dict('index')

In [16]:
from_interact["idx"] = from_interact["user"] + from_interact["other_user"]

In [18]:
from_interact_dict = from_interact.set_index("idx").to_dict('index')

In [19]:
%store from_interact_dict

Stored 'from_interact_dict' (dict)


In [2]:
%store -r

In [5]:
interact_dict = {}

# Merge to_interact_dict
for key, value in tqdm(to_interact_dict.items(), desc="Merging to_interact_dict", unit="item"):
    interact_dict[key] = value

# Merge from_interact_dict
for key, value in tqdm(from_interact_dict.items(), desc="Merging from_interact_dict", unit="item"):
    if key in interact_dict:
        # If key already exists, update the count
        interact_dict[key]['count'] += value['count']
    else:
        # If key doesn't exist, add the item
        interact_dict[key] = value

print(len(interact_dict))

Merging to_interact_dict: 100%|███████████████████████████████████████| 2181346/2181346 [00:01<00:00, 1839940.67item/s]
Merging from_interact_dict: 100%|█████████████████████████████████████| 2354785/2354785 [00:01<00:00, 1486076.02item/s]

4224872





In [14]:
transfer_interact_info = list(interact_dict.values())

In [15]:
# Dump the list into the JSON file
with open("transfer_interact_info.json", 'w') as json_file:
    json.dump(transfer_interact_info, json_file)

print(f"The list has been dumped")

The list has been dumped


In [None]:
start_time = time.time()
def process_group(group):
    # Calculate the total amount for each (token, to_address) pair
    aggregated_df = group.groupby(["contract_address"])['value'].sum().reset_index()

    # Collect the necessary information for the sending_info dictionary
    token_list = aggregated_df['contract_address'].tolist()
    amount_list = aggregated_df['value'].tolist()
    time_list = group['block_number'].tolist()
    addresses_list = group['to_address'].unique().tolist()

    # Create the final dictionary for the current from_address
    result_dict = {
        "id": f"0x1_{group.name}",
        "address": group.name,
        "sending": {
            "token": token_list,
            "amount": amount_list,
            "time": time_list,
            "addresses": addresses_list
        }
    }

    return result_dict

# Filter the DataFrame for the selected addresses and apply the custom function
res_from = from_df.groupby('to_address').apply(process_group).tolist()

print(len(res))
end_time = time.time()

# Print the execution time
execution_time = end_time - start_time
print(f"Execution Time: {execution_time} seconds")

In [None]:
start_time = time.time()
def process_group(group):
    # Calculate the total amount for each (token, to_address) pair
    aggregated_df = group.groupby(["contract_address"])['value'].sum().reset_index()

    # Collect the necessary information for the sending_info dictionary
    token_list = aggregated_df['contract_address'].tolist()
    amount_list = aggregated_df['value'].tolist()
    time_list = group['block_number'].tolist()
    addresses_list = group['to_address'].unique().tolist()

    # Create the final dictionary for the current from_address
    result_dict = {
        "id": f"0x1_{group.name}",
        "address": group.name,
        "receiving": {
            "token": token_list,
            "amount": amount_list,
            "time": time_list,
            "addresses": addresses_list
        }
    }

    return result_dict

# Filter the DataFrame for the selected addresses and apply the custom function
res_to = to_df.groupby('from_address').apply(process_group).tolist()

print(len(res))
end_time = time.time()

# Print the execution time
execution_time = end_time - start_time
print(f"Execution Time: {execution_time} seconds")

In [12]:
# Dump the list into the JSON file
with open("transfer_to.json", 'w') as json_file:
    json.dump(res_to, json_file)

print(f"The list has been dumped")

The list has been dumped


In [None]:
# Dump the list into the JSON file
with open("transfer_to.json", 'w') as json_file:
    json.dump(res_from, json_file)

print(f"The list has been dumped")