In [2]:
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyspark.sql.functions as F
import json

In [5]:
from Env_configs import raw_data_file_path

In [4]:
spark = SparkSession.builder.master('local[*]').appName('Pipeline_prototype').getOrCreate()

In [6]:
raw_json_df=spark.read.json(raw_data_file_path)

In [8]:
raw_json_df.printSchema()

root
 |-- animation_original_url: string (nullable = true)
 |-- animation_url: string (nullable = true)
 |-- asset_contract: struct (nullable = true)
 |    |-- address: string (nullable = true)
 |    |-- asset_contract_type: string (nullable = true)
 |    |-- buyer_fee_basis_points: long (nullable = true)
 |    |-- created_date: string (nullable = true)
 |    |-- default_to_fiat: boolean (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- dev_buyer_fee_basis_points: long (nullable = true)
 |    |-- dev_seller_fee_basis_points: long (nullable = true)
 |    |-- external_link: string (nullable = true)
 |    |-- image_url: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- nft_version: string (nullable = true)
 |    |-- only_proxied_transfers: boolean (nullable = true)
 |    |-- opensea_buyer_fee_basis_points: long (nullable = true)
 |    |-- opensea_seller_fee_basis_points: long (nullable = true)
 |    |-- opensea_version: string (nullable 

## Preprocessed Layer 
Input \
Source: Raw Layer \
Type: JSON file \
Content: All data about selected NFT collections

Output \
Target: Preprocessed Layer \
Type: Parquet file \
Content: 
1) CCompare_reference_DF:information required for data enrichment from CryptoCompare API 
2) EScan_reference_DF: information required for data enrichment from EtherScan API 

Transformations & Actions: \
Parse json from raw layer for desired information and create dataframe.
Create reference dataframes needed for next stage.

File Partitions: 
* Preprocessed
   * Date 
      * CCompare 
      * EScan 
         * NFT_#1
         * NFT_#N



In [9]:
CCompare_preprocessed_df=spark.read.parquet('/home/roger/SB/Capstone/NFT_ETH_pipeline/Local/Data/Preprocessed/01-19-22/CCompare/NFT=CryptoPunks/')
EScan_preprocessed_df=spark.read.parquet('/home/roger/SB/Capstone/NFT_ETH_pipeline/Local/Data/Preprocessed/01-19-22/EScan/NFT=CryptoPunks/')

In [17]:
CCompare_preprocessed_df.show(5)

+-------------------+----------+
|           txn_date|      unix|
+-------------------+----------+
|2021-10-28 23:50:37|1635490237|
|2021-03-11 00:48:39|1615452519|
|2021-03-11 20:57:01|1615525021|
|2021-12-09 20:38:17|1639111097|
|2021-07-30 21:24:47|1627705487|
+-------------------+----------+
only showing top 5 rows



In [16]:
EScan_preprocessed_df.show(5)

+--------------------+
|       owner_address|
+--------------------+
|0x3b93cbfb99560ab...|
|0x9c5083dd4838e12...|
|0x266892ed0d40ea5...|
|0xfb5251bf63e2382...|
|0xce90a7949bb7889...|
+--------------------+
only showing top 5 rows



## Processed Layer 
Input \
Source: Preprocessed Layer \
Type: Parquet file \
Content: Reference data for enrichment

Output \
Target: Processed Layer \
Type: Parquet file \
Content: 
1) CCompare_USD_DF:ETH/USD Historical price information from CryptoCompare API 
2) EScan_Token_DF: User Token balance information from EtherScan API 
3) EScan_ETH_DF: User ETH balance from EtherScan API 
4) Opensea_DF: NFT Collection and Owner information from Opensea

Transformations & Actions: \
Apply UDF to reference dataframes to create new dataframe with enriched data


File Partitions: 
* Processed
   * Date 
      * OSea 
      * CCompare 
      * EScan 
         * NFT_#1
         * NFT_#N



In [89]:
CCompare_USD_df=spark.read.parquet('/home/roger/SB/Capstone/NFT_ETH_pipeline/Local/Data/Processed/01-19-22/CCompare/NFT=**/')
EScan_Token_df=spark.read.parquet('/home/roger/SB/Capstone/NFT_ETH_pipeline/Local/Data/Processed/01-19-22/EScan/Token_Balance/NFT=**/')
EScan_ETH_df=spark.read.parquet('/home/roger/SB/Capstone/NFT_ETH_pipeline/Local/Data/Processed/01-19-22/EScan/ETH/NFT=**/')
OSea_NFT_df=spark.read.parquet('/home/roger/SB/Capstone/NFT_ETH_pipeline/Local/Data/Processed/01-19-22/OSea/')

In [93]:
CCompare_USD_df.show(5)

+-------------------+--------+
|           txn_date|usd_rate|
+-------------------+--------+
|2021-10-28 23:50:37|  4417.4|
|2021-03-11 00:48:39| 1827.06|
|2021-03-11 20:57:01| 1767.77|
|2021-12-09 20:38:17| 3902.49|
|2021-07-30 21:24:47| 2531.75|
+-------------------+--------+
only showing top 5 rows



In [94]:
EScan_Token_df.show(5,truncate=False)

+------------------------------------------+---------------------------------------------------------------------+
|owner_address                             |token_balance                                                        |
+------------------------------------------+---------------------------------------------------------------------+
|0x7eb413211a9de1cd2fe8b8bb6055636c43f7d206|{Usdc -> 0, Tether -> 0, Wrapped_eth -> 13000000000000000000}        |
|0x14ae8100ea85a11bbb36578f83ab1b5c1cfdd61c|{Usdc -> 435379993, Tether -> 0, Wrapped_eth -> 0}                   |
|0x542d891303f5b70bbca0ccba83811560f45968a4|{Usdc -> 0, Tether -> 100000000, Wrapped_eth -> 50000000000000000000}|
|0x3b93cbfb99560ab275827d502c49d57a0e5ec5ce|{Usdc -> 83790221294, Tether -> 0, Wrapped_eth -> 0}                 |
|0xe9a6aef13727ae948033614e50e20624e0162376|{Usdc -> 0, Tether -> 0, Wrapped_eth -> 0}                           |
+------------------------------------------+------------------------------------

In [95]:
EScan_ETH_df.show(5)

+--------------------+--------------------+
|       owner_address|                 ETH|
+--------------------+--------------------+
|0x7eb413211a9de1c...| 2498673348161281682|
|0x14ae8100ea85a11...|93051814339210406498|
|0x542d891303f5b70...|13959298706138184858|
|0x3b93cbfb99560ab...| 4690624628633285096|
|0xe9a6aef13727ae9...|  923768192354847426|
+--------------------+--------------------+
only showing top 5 rows



In [96]:
OSea_NFT_df.show(5)

+-----------------+--------+---------+--------------+--------------------+-------------------+-----------------+------------+
|              NFT|token_id|num_sales|      username|       owner_address|           txn_date|      payment_amt|payment_type|
+-----------------+--------+---------+--------------+--------------------+-------------------+-----------------+------------+
|BoredApeYachtClub|    2087|        2|          null|0x066317b90509069...|2021-09-30 01:28:47|            769.0|         ETH|
|BoredApeYachtClub|    3749|        3|TheSandboxGame|0x7a9fe22691c811e...|2021-09-06 21:02:29|            740.0|         ETH|
|BoredApeYachtClub|    8585|        3|          null|0x86292f24afffe8d...|2021-10-19 23:15:59|696.9689999999999|         ETH|
|BoredApeYachtClub|    7090|        2|   j1mmy_vault|0x8ad272ac86c6c88...|2021-09-02 14:59:41|            600.0|         ETH|
|BoredApeYachtClub|    8135|        3|  MetalinkLabs|0x222a50ddb9126d3...|2021-09-17 16:43:57|            550.0|      

## Structured Layer 
Input \
Source: Processed Layer \
Type: Parquet file \
Content: Enriched data for NFT Collections and User Wallet

Output \
Target: Structured Layer \
Type: Parquet file \
Content: 
1) CCompare_USD_DF:ETH/USD Historical price information
2) EScan_Token_DF: User Token balance information 
3) EScan_ETH_DF: User ETH balance information 
4) Opensea_DF: NFT Collection and Owner information

Transformations & Actions: \
Validate,enforce schema, format, and structure data


File Partitions: 
* Structured
   * Date 
      * OSea 
      * CCompare 
      * EScan 
         * NFT_#1
         * NFT_#N

In [80]:
import datetime
today=datetime.date.today().strftime('%m-%d-%y')
processed_data_path='/home/roger/SB/Capstone/NFT_ETH_pipeline/Local/Data/Processed/'
# CC_processed_filepaths=[f'{processed_data_path}{today}/CCompare/NFT={file}/' for file in ['BoredApeYachtClub','Cool Cats', 'CryptoPunks','Doodles','Meebits']]
# ES_processed_filepaths=[f'{processed_data_path}{today}/EScan/NFT={file}/' for file in ['BoredApeYachtClub','Cool Cats', 'CryptoPunks','Doodles','Meebits']]
# OS_processed_filepaths=[f'{processed_data_path}{today}/OSea/NFT={file}/' for file in ['BoredApeYachtClub','Cool Cats', 'CryptoPunks','Doodles','Meebits']]


In [106]:
# NFT_df=spark.read.parquet('/home/roger/SB/Capstone/NFT_ETH_pipeline/Local/Data/Processed/OSea/NFT=BoredApeYachtClub/')
OSea_NFT_df=spark.read.parquet('/home/roger/SB/Capstone/NFT_ETH_pipeline/Local/Data/Processed/01-19-22/OSea/')

In [107]:
OSea_NFT_df.show(5)

+-----------------+--------+---------+--------------+--------------------+-------------------+-----------------+------------+
|              NFT|token_id|num_sales|      username|       owner_address|           txn_date|      payment_amt|payment_type|
+-----------------+--------+---------+--------------+--------------------+-------------------+-----------------+------------+
|BoredApeYachtClub|    2087|        2|          null|0x066317b90509069...|2021-09-30 01:28:47|            769.0|         ETH|
|BoredApeYachtClub|    3749|        3|TheSandboxGame|0x7a9fe22691c811e...|2021-09-06 21:02:29|            740.0|         ETH|
|BoredApeYachtClub|    8585|        3|          null|0x86292f24afffe8d...|2021-10-19 23:15:59|696.9689999999999|         ETH|
|BoredApeYachtClub|    7090|        2|   j1mmy_vault|0x8ad272ac86c6c88...|2021-09-02 14:59:41|            600.0|         ETH|
|BoredApeYachtClub|    8135|        3|  MetalinkLabs|0x222a50ddb9126d3...|2021-09-17 16:43:57|            550.0|      

In [111]:
CCompare_USD_df=spark.read.parquet('/home/roger/SB/Capstone/NFT_ETH_pipeline/Local/Data/Processed/01-19-22/CCompare/NFT=**/')

In [112]:
CCompare_USD_df.show(5)

+-------------------+--------+
|           txn_date|usd_rate|
+-------------------+--------+
|2021-10-28 23:50:37|  4417.4|
|2021-03-11 00:48:39| 1827.06|
|2021-03-11 20:57:01| 1767.77|
|2021-12-09 20:38:17| 3902.49|
|2021-07-30 21:24:47| 2531.75|
+-------------------+--------+
only showing top 5 rows



In [140]:
EScan_ETH_DF=spark.read.parquet(f'{structured_data_path}{today}/EScan/ETH_Balance/')
EScan_ETH_DF.show(5)

+--------------------+-----+
|       owner_address|  ETH|
+--------------------+-----+
|0x3b93cbfb99560ab...| 4.69|
|0x9c5083dd4838e12...|14.48|
|0x266892ed0d40ea5...| 0.00|
|0xfb5251bf63e2382...|25.99|
|0xce90a7949bb7889...|14.39|
+--------------------+-----+
only showing top 5 rows



In [143]:
OSea_NFT_df= spark.read.parquet(f'{processed_data_path}{today}/OSea/')
OSea_NFT_df.show(5)

+-----------------+--------+---------+--------------+--------------------+-------------------+-----------------+------------+
|              NFT|token_id|num_sales|      username|       owner_address|           txn_date|      payment_amt|payment_type|
+-----------------+--------+---------+--------------+--------------------+-------------------+-----------------+------------+
|BoredApeYachtClub|    2087|        2|          null|0x066317b90509069...|2021-09-30 01:28:47|            769.0|         ETH|
|BoredApeYachtClub|    3749|        3|TheSandboxGame|0x7a9fe22691c811e...|2021-09-06 21:02:29|            740.0|         ETH|
|BoredApeYachtClub|    8585|        3|          null|0x86292f24afffe8d...|2021-10-19 23:15:59|696.9689999999999|         ETH|
|BoredApeYachtClub|    7090|        2|   j1mmy_vault|0x8ad272ac86c6c88...|2021-09-02 14:59:41|            600.0|         ETH|
|BoredApeYachtClub|    8135|        3|  MetalinkLabs|0x222a50ddb9126d3...|2021-09-17 16:43:57|            550.0|      

In [146]:
import datetime
today=datetime.date.today().strftime('%m-%d-%y')
Osea_struct_df=spark.read.parquet(f'{structured_data_path}{today}/OSea/')
Osea_struct_df.show()

+-----------------+--------+---------+-----------------+--------------------+-------------------+-----------------+------------+
|              NFT|token_id|num_sales|         username|       owner_address|           txn_date|      payment_amt|payment_type|
+-----------------+--------+---------+-----------------+--------------------+-------------------+-----------------+------------+
|BoredApeYachtClub|    2087|        2|             null|0x066317b90509069...|2021-09-30 01:28:47|            769.0|         ETH|
|BoredApeYachtClub|    3749|        3|   TheSandboxGame|0x7a9fe22691c811e...|2021-09-06 21:02:29|            740.0|         ETH|
|BoredApeYachtClub|    8585|        3|             null|0x86292f24afffe8d...|2021-10-19 23:15:59|696.9689999999999|         ETH|
|BoredApeYachtClub|    7090|        2|      j1mmy_vault|0x8ad272ac86c6c88...|2021-09-02 14:59:41|            600.0|         ETH|
|BoredApeYachtClub|    8135|        3|     MetalinkLabs|0x222a50ddb9126d3...|2021-09-17 16:43:57|

In [148]:
CC_struct_df=spark.read.parquet(f'{structured_data_path}{today}/CCompare/')
CC_struct_df.show()

+-------------------+--------+
|           txn_date|usd_rate|
+-------------------+--------+
|2021-10-28 23:50:37|  4417.4|
|2021-03-11 00:48:39| 1827.06|
|2021-03-11 20:57:01| 1767.77|
|2021-12-09 20:38:17| 3902.49|
|2021-07-30 21:24:47| 2531.75|
|2021-09-11 18:41:23| 3405.32|
|2021-07-30 18:36:56| 2531.75|
|2021-08-24 06:34:02| 3172.57|
|2021-08-06 18:18:56| 3162.43|
|2021-09-04 21:39:01| 3952.33|
|2022-01-06 23:52:03| 3196.47|
|2021-08-28 14:24:55| 3246.78|
|2021-07-30 23:45:46| 2531.75|
|2021-07-31 14:11:48| 2531.75|
|2021-02-19 14:27:08| 1957.47|
|2021-07-30 23:46:38| 2531.75|
|2021-04-27 15:44:22| 2667.93|
|2021-01-23 19:12:33| 1392.45|
|2021-02-22 01:01:30| 1778.07|
|2021-07-05 22:49:37| 2322.63|
+-------------------+--------+
only showing top 20 rows



In [164]:
Final_NFT_DF=Osea_struct_df.join(broadcast(CC_struct_df),Osea_struct_df['txn_date']==CC_struct_df['txn_date'],'outer').drop(CC_struct_df['txn_date'])
Final_NFT_DF.printSchema()

root
 |-- NFT: string (nullable = true)
 |-- token_id: string (nullable = true)
 |-- num_sales: long (nullable = true)
 |-- username: string (nullable = true)
 |-- owner_address: string (nullable = true)
 |-- txn_date: timestamp (nullable = true)
 |-- payment_amt: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- usd_rate: string (nullable = true)

