In [None]:
%pip install -r /Workspace/Repos/dev@nftpriceestimatoroutlook.onmicrosoft.com/nft-price-estimator/requirements.txt

In [None]:
# Azure Databricks config
app_id = "API_ID"
client_secret = "CLIENT_SECRET"
tenant = "TENANT"

configs = {"fs.azure.account.auth.type": "OAuth",
       "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
       "fs.azure.account.oauth2.client.id": app_id,
       "fs.azure.account.oauth2.client.secret": client_secret,
       "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant}/oauth2/token",
       "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

In [None]:
import json
import requests
from requests.sessions import Session
import os

def filter_typename(dict):
  return dict["__typename"] == "AssetQuantityType"

def filter_images(dict):
  return dict["__typename"] == "AssetType"

def filter_metadata(dict):
  if "tokenId" in dict:
    return {
      'id': int(dict["tokenId"]),
      'name': dict["name"],
      'image': dict["displayImageUrl"]
    }
  else:
    return

def filter_quantityInEth_exists(dict):
  if "quantityInEth" in dict:
    return True
  else:
    return False

def get_floor_price_in_eth(dict):
  return float(dict["quantity"]) / 1000000000000000000

def format_result(pair):
  return {
    "price": pair[0],
    "id": pair[1]["id"],
    "name": pair[1]["name"],
    "image": pair[1]["image"]
  }

def get(endpoint, headers, session:Session):
  if not(session):
    response = requests.get(endpoint, headers=headers)
    return json.loads(response.text)
  else:  
    with session.get(endpoint, headers=headers) as response:
      return json.loads(response.text)

def write_data_to_file(filename, data):
  with open(filename, 'w+') as f:
    json.dump(json.dumps(data, ensure_ascii=False, indent=4), f)
  f.close()

def write_json_to_file(filename, json_arr):
  os.makedirs(os.path.dirname(filename), exist_ok=True) # Create the relevant directory & file not present
  with open(filename, 'w+') as f:
    for i in json_arr:
      json.dump(i, f)
      f.write("\n")
  f.close()
  
def download_image(folder: str, filename: str, image_url: str):
  if not os.path.exists(folder):
    os.makedirs(folder)
  with open(folder + '/' + filename, 'wb') as f:
    f.write(requests.get(image_url).content)
    f.close()
   
# dir_path = os.path.dirname(os.path.realpath(__file__)) # get current directory
# scraper_utils.download_image('{}/assets/cool-cats-nft'.format(dir_path), '4807.png', 'https://lh3.googleusercontent.com/VwXwtH0-4-Np1DrsK5X1u102Je_Ju2FUH8yltByPOEKONeEDBNcs6poEjElhKWeAKquhzpdQwqS_hJGV-O-a3iy7GgPjYVLduxAWKdg')


In [None]:
# Metadata scraping using Python
import json
import requests
import urllib.request as req
import os
import multiprocessing

api = 'https://api.opensea.io/api/v1'
API_KEY = 'API_KEY'
test_address = '0xb47e3cd837ddf8e4c57f05d70ab865de6e193bbb'
url = "https://api.opensea.io/api/v1/assets?order_direction=desc&asset_contract_address={test_address}&limit=20&cursor=LXBrPTUzMTA1Mg%3D%3D&include_orders=false".format(test_address=test_address)

# Add your custom target collections for scraping
final_collections = []

hashSet = set()
def haveSoldBeforeFilter(asset):
    asset_id = int(asset['token_id'])
    if asset_id not in hashSet:
        hashSet.add(asset_id)
        return asset['last_sale'] and asset['last_sale']['total_price'] and float(asset['last_sale']['total_price']) > 0
    return False

def getMetadata(asset):
    return {
        'id': int(asset['token_id']),
        'name': asset['name'],
        'image': asset['image_url'],
        'price': float(float(asset['last_sale']['total_price'])/1000000000000000000),
        'token': asset['last_sale']['payment_token']['symbol']
    }

def getMetadataOfCurrentSet(dict):
    transacted_assets = list(filter(haveSoldBeforeFilter, dict))
    return list(map(getMetadata, transacted_assets))

def getCollectionContractAddress(slug):
    assets_endpoint = api + '/collection/{}'.format(slug)
    response = requests.get(assets_endpoint)
    response = json.loads(response.text)
    try:
        return response['collection']['primary_asset_contracts'][0]['address']
    except:
        return None

def getAsset(contract_address, c_limit=None):
    limit = 50
    assets_endpoint = api + '/assets?order_direction=desc&asset_contract_address={}&limit={}&include_orders=false'.format(contract_address, limit)
    LAMBDA_ASSET_ENDPOINT_WITH_CURSOR = lambda cursorX: api + '/assets?order_direction=desc&asset_contract_address={}&limit={}&cursor={}&include_orders=false'.format(contract_address, limit, cursorX)
    headers = {
        "Accept": "application/json",
        "X-API-KEY": API_KEY
    }
    response = get(assets_endpoint, headers=headers, session=None)
    data: list = response['assets']
    data = getMetadataOfCurrentSet(data)
    cursor = response['next']
    c = 0
    c_limit = 20
    with requests.Session() as session:
        try:
            while (cursor):
                if c_limit is not None and c >= c_limit:
                    break
                encoded_cursor = req.pathname2url(cursor)
                assets_endpoint_cursor = LAMBDA_ASSET_ENDPOINT_WITH_CURSOR(encoded_cursor)
                response = get(assets_endpoint_cursor, headers, session)
                
                if not(response): # handle cases where requests is too slow in getting the data
                    c += 1
                    continue
                cursor, curr = response['next'], response['assets']
                if cursor: # if we have not reached the end of collection
                    print('Iteration {}: '.format(c) + cursor)
                    data.extend(getMetadataOfCurrentSet(curr))
                    c += 1
                else:
                    break
        except Exception as err:
            print("ERROR getAsset() for {}: ".format(contract_address) + str(err))

    return data

cloud_dir_path = "/dbfs/mnt/data/collections"

def scrape_collection_api(slug: str, c_limit=None): 
    print("-------- Test for test_slug = {} --------".format(slug))     
    addr = getCollectionContractAddress(slug)
    print("Address ={}".format(addr))
    if addr is not None:
        data = getAsset(addr, c_limit)
        write_json_to_file("{dir}/{slug}.json".format(dir=cloud_dir_path, slug=slug), data)
        print(len(data), "NFTs scraped for \'{}\'".format(slug))
        hashSet.clear()
    else:
        print("Address is not availabled for \'{}\'".format(slug))

def parallelized_scrape():
    pool = multiprocessing.Pool()
    pool.map(scrape_collection_api, final_collections)

In [None]:
# Image downloading using Python
import requests
import json
import os

# sample
collection_list = [
    'tiny-dinos-eth',
    'premint-collector',
    'doodles-official',
    'sandbox',
    'froyokittenscollection',
    'cool-cats-nft',
    'rtfkt-mnlth',
    'milady',
    'akutar-mint-pass',
    'proof-collective',
    'everai-heroes-duo',
    'alien-frens-evolution',
    'notbanksyechoes',
    'the-art-of-seasons',
    'nft-worlds',
    'vaynersports-pass-vsp',
    'veefriends',
    'cyberbrokers',
    'veefriends-series-2',
    'thelobstars',
    'world-of-women-galaxy',
    'quirkiesoriginals',
    'adidasoriginals',
    'worldwidewebbland',
    'bored-ape-kennel-club',
    'alienfrensnft',
    'quantum-access-pass',
    'unemployables',
    'mfers',
    'akumaorigins',
    'the-picaroons',
    'toxic-skulls-club',
    '0xogpass',
    'bored-ape-chemistry-club',
]

cloud_dir_path = "/dbfs/mnt/data/collections"

def download_collection(slug):
    filename = "{dir}/{slug}.json".format(dir=cloud_dir_path, slug=slug)
    collection_folder = "/dbfs/mnt/data/scraped_images/{slug}".format(slug=slug)
    if not os.path.exists(collection_folder):
        os.makedirs(collection_folder)
    print('Downloading collection {slug}'.format(slug=slug))
    with open(filename, 'r') as f:
        # Get each image url, fetch the image and write to file as png
        count = 0
        for asset in f:
            data = json.loads(asset)
            try:
                id, image_url = data['id'], data['image']
                image_file = collection_folder + '/{slug}-#{id}.png'.format(slug=slug, id=id)
                print('{} : Iteration: {}'.format(slug, count))
                if os.path.exists(image_file): # skip images alr downloaded
                    print('Image file({}) already present. Skipped!'.format(image_file))
                    continue
                image = requests.get(image_url).content
                with open(image_file, 'wb') as file:
                    file.write(image)
                    file.close()
            except:
                print('Failed for {} iteration {}'.format(slug, count))
            count += 1 
    
    print('Completed download of collection {slug}'.format(slug=slug))

from multiprocessing import Pool

def download_all():
#     with Pool(5) as p:
#         p.map(download_collection, collection_list)
    for col in collection_list:
        download_collection(col)

download_all()

In [None]:
collection_list = [
    'where-my-vans-go',
     'lazy-lions',
     'psychonautapedivision',
     'spaceridersnft',
     'bapetaverse-official',
     'the-squishiverse',
     'mirandus',
     'the-meta-kongz',
     'etherthings',
     'space-boo-official-nft',
     'officialkenkyo',
     'felinefiendznft',
     'women-unite-10k-assemble',
     'chromie-squiggle-by-snowfro',
     'lilium',
     'pixel-interfaces',
     'mypethooligan',
     'shatteredeon-colonist',
     'frankfrank',
     'beari-collection',
     'crypto-unicorns-market',
     'max-pain-and-frens-by-xcopy',
     'pixel-vault-mintpass',
     'guttercatgang',
     'pjppfl',
     'clonex-mintvial',
     'fools-nft',
     'notbanksyrain',
     'metahero-generative',
     'nifty-tailor-genesis-mintpass',
     'llamaboost',
     'headtripz',
     'layer-zero-punks-eth',
     'pixels-farm',
     'dourdarcels',
     'cyberkongz-vx',
     'alphakongsclub',
     'mad-meerkat-burrow',
     'treeverse',
     'moonbirdpunks',
     'grayboys',
     'theshiboshis',
     'smiliesgenesis',
     'fvck-avatar-essence',
     'space-doodles-official',
     'pet-rock',
     'chain-runners-nft',
     'smallbros',
     'raidparty',
     'mirlclub',
     'ballies',
     'mad-hare-society-2',
     'mad-hare-society-1',
     'frenlypandas',
]

In [None]:
# Image downloading using Spark
cloud_dir_path = "/dbfs/mnt/data/collections"

import requests
import json
import os

def download_collection(slug):
    filename = "{dir}/{slug}.json".format(dir=cloud_dir_path, slug=slug)
    collection_folder = "/dbfs/mnt/data/scraped_images/{slug}".format(slug=slug)
    if not os.path.exists(collection_folder):
        os.makedirs(collection_folder)
    print('Downloading collection {slug}'.format(slug=slug))
    with open(filename, 'r') as f:
        # Get each image url, fetch the image and write to file as png
        count = 0
        for asset in f:
            data = json.loads(asset)
            try:
                id, image_url = data['id'], data['image']
                image_file = collection_folder + '/{slug}-#{id}.png'.format(slug=slug, id=id)
                print('{} : Iteration: {}'.format(slug, count))
                if os.path.exists(image_file): # skip images alr downloaded
                    print('Image file({}) already present. Skipped!'.format(image_file))
                    continue
                image = requests.get(image_url).content
                with open(image_file, 'wb') as file:
                    file.write(image)
                    file.close()
            except:
                print('Failed for {} iteration {}'.format(slug, count))
            count += 1
    print('Completed download of collection {slug}'.format(slug=slug))

from urllib.request import urlretrieve
from pyspark.sql.functions import *
def download_image(collection, name, url):
    urlretrieve(url, f"/dbfs/mnt/data/scraped_images/{collection}-{name}.jpg")

# collection_list from previous cell
for col in collection_list:
    print('--> Running for {}'.format(col))
    colDf = spark.read.json('/mnt/data/collections/{}.json'.format(col)).cache()
    colDf.foreach(lambda row: download_collection(row))

In [None]:
# Price update using Spark
import requests
import json
import os
from pyspark.sql.functions import *
from urllib.request import urlretrieve

cloud_dir_path = "/dbfs/mnt/data/collections"

def getSingleAsset(contract_address, id):
    url = f'https://api.opensea.io/api/v1/asset/{address}/{id}/?include_orders=false'.format(address=contract_address, id=id) 
    response = requests.get(url)
    response = json.loads(response.text)
    return response

def update_collection(slug):
    filename = "{dir}/{slug}.json".format(dir=cloud_dir_path, slug=slug)
    tmp_file = "{dir}/{slug}-tmp.json".format(dir=cloud_dir_path, slug=slug)
    print('Reading collection {slug}'.format(slug=slug))
    count = 1
    with open(filename, 'r') as f:
        # Get each image id, fetch the live price and write to file
        results: list = []
        for asset in f:
            data = json.loads(asset)
            try:
                id = data['id']
                # get current data of NFT
                contract_address = getCollectionContractAddress(slug)
                live_data = getSingleAsset(contract_address, id)
                # get final price
                live_price = float(float(live_data['last_sale']['total_price'])/1000000000000000000)
                data['price'] = live_price # update price
                results.append(data)
                print('{} : Iteration: {}'.format(slug, count))
            except:
                print('Failed for {} iteration {}'.format(slug, count))
        count += 1
    # write to file
    write_json_to_file(tmp_file, results)
    os.remove(filename)
    os.rename(tmp_file, filename)
    print('Completed update of collection {slug}'.format(slug=slug))

# collection_list from 2 cells prior
for col in collection_list:
    print('--> Running for {}'.format(col))
    colDf = spark.read.json('/mnt/data/collections/{}.json'.format(col)).cache()
    colDf.foreach(lambda row: update_collection(row))