In [None]:
# ! pip install -U sentence-transformers

In [None]:
import logging as log
import pickle
from itertools import chain
from multiprocessing import Lock, RawValue
from pathlib import Path

import boto3
import numpy as np
import srsly
import tensorflow as tf
from sentence_transformers import SentenceTransformer, util

logger = log.getLogger()
logger.setLevel(log.INFO)

In [None]:
# create boto3 session using profile name 'prod'
session = boto3.Session(profile_name="production")
# create s3 client using session
s3_client = session.client("s3")

S3_BUCKET = "makersdistillery"
S3_FULL_PREFIX_PATH = f"finder/media_to_index/full"
GENRES_DATASETS_PATH = Path("cache/genres_dataset")
GENRES_SENTENCES_FILE_PATH = GENRES_DATASETS_PATH / "genre_sentences.pickle"
FILTERED_NFTS_FILE_PATH = GENRES_DATASETS_PATH / "filtered_nfts.pickle"
FILTERED_NFTS_METADATA_FILE_PATH = GENRES_DATASETS_PATH / "filtered_nfts_metadat.json"

SENTENCES_LIMIT = 3
GENRES_LIMIT = 3

LIMIT = 1_000
CLEAN_RUN = True

CACHE_DIR = Path("cache")
CACHE_DIR.mkdir(exist_ok=True, parents=True)
GENRES_DATASETS_PATH.mkdir(exist_ok=True, parents=True)

In [None]:
genres_text = """
3D
Abstract
AI Generated Art
Generative Art
Animation
Black and White
Calligraphic
Collage Art
Color Field
Comic/Cartoon
Conceptual Art
Crypto Art
Dance
Dark
Digital Art
Digital Culture
Drawing
Ephemera
Fantasy
Fashion
Feminist
Figurative
Film/Video
Fine art
Folk Art
Glitch Art
Gothic Art
Graffiti/Street Art
Hyperrealism
Illustration
Immersive
Interactive
Landscape
Light Art
Mixed Media
Monochrome
Multi-media
Music
Nature
Net Art
Painting
Photography
Phygitals
Pixel art
Pop Art
Portrait
Psychedelic Art
science fiction
Sculpture
Sports
Surreal"""
# convert above text to list of strings
GENRES = [x.strip().lower() for x in genres_text.split("\n") if x.strip() != ""]
# sorts list
GENRES.sort()

# generate GENRES ids array
GENRE_IDS = list(range(len(GENRES)))

In [None]:
def get_latest_files_from_s3(s3_client, s3_prefix, file_name, limit=1):
    """
    This function is used to get the latest file from s3 directory and download latest file locally and return the local path
    """
    log.info(f"Getting latest file from s3 bucket: {S3_BUCKET} and prefix: {s3_prefix}")
    response = s3_client.list_objects_v2(Bucket=S3_BUCKET, Prefix=s3_prefix)
    # log total response count
    log.info(f"Total files fetched are {len(response['Contents'])}")

    # filter out files matching the file name
    matching_files = [file for file in response["Contents"] if Path(file["Key"]).name == file_name]
    log.info(f"Found {len(matching_files)} files matching the file name: {file_name}")
    # Sort files by last modified date, to get the latest file first
    matching_files.sort(key=lambda x: x["LastModified"], reverse=True)
    log.info(f"Sorted files by last modified date {matching_files}")
    # Pick the latest n(limit) files
    latest_files = matching_files[:limit]
    # Sort files by last modified date to get the latest file last.
    # This sorting is needed to preserve the updates order
    latest_files.sort(key=lambda x: x["LastModified"])
    log.info(f"Latest {len(latest_files)} files: {latest_files}")

    if len(latest_files) == 0:
        log.error(f"File {file_name} not found in s3 bucket: {S3_BUCKET} and prefix: {s3_prefix}")
        return []

    # Iterate on latest files and Download latest file locally
    local_file_paths = []
    for latest_file in latest_files:
        prefix_path = latest_file["Key"]
        # extract last 2 parts in prefix path as file name
        local_file_name = prefix_path.split("/")[-2] + "_" + prefix_path.split("/")[-1]
        local_file_path = CACHE_DIR / local_file_name
        # Download file only if it does not exist locally
        if not local_file_path.exists():
            log.info(f"Downloading latest file: {prefix_path} to local path: {local_file_path}")
            s3_client.download_file(S3_BUCKET, latest_file["Key"], local_file_path)
            log.info(f"Downloaded latest file: {prefix_path} to local path: {local_file_path}")
        local_file_paths.append(local_file_path)

    log.info(f"Local file paths: {local_file_paths}")

    return local_file_paths


def get_s3_file_url(s3_file_name):
    return f"https://makersdistillery.s3.us-west-2.amazonaws.com/1000x/{s3_file_name}.jpg"

In [None]:
# get latest file from s3
local_file_paths = get_latest_files_from_s3(s3_client, S3_FULL_PREFIX_PATH, "data.jsonl", limit=1)
# Read jsonl file
log.info(f"Reading jsonl files: {local_file_paths}")
# Read all jsonl files as iterators and chain them together
json_data_iterator = chain(*[srsly.read_jsonl(local_path) for local_path in local_file_paths])

In [None]:
# Load the model
SENTENCES_FILTER_MODEL_ID = "sentence-transformers/multi-qa-mpnet-base-dot-v1"
GENRE_MATCH_MODEL_ID = "sentence-transformers/all-MiniLM-L6-v2"
sentence_filter_model = SentenceTransformer(SENTENCES_FILTER_MODEL_ID, device="mps")
genre_match_model = SentenceTransformer(GENRE_MATCH_MODEL_ID, device="mps")

In [None]:
def save_dict_to_pickle_file(dict_to_save, pickle_file_path):
    # delete file if exists
    if pickle_file_path.exists():
        pickle_file_path.unlink()

    with open(pickle_file_path, "wb") as f:
        pickle.dump(dict_to_save, f)


def read_dict_from_pickle_file(pickle_file_path):
    with open(pickle_file_path, "rb") as f:
        return pickle.load(f)

In [None]:
# QUERY = "What is the Genre of Art?"
QUERY = "What is the Art category or Genre or style?"
QUERY_EMBED = sentence_filter_model.encode(QUERY, show_progress_bar=False)


def get_genre_sentence(text):
    # split text into sentences
    raw_sentences = text.strip().split(".")
    # remove sentences with no text
    cleaned_sentences = [sentence.strip() for sentence in raw_sentences if sentence.strip() != ""]

    if len(cleaned_sentences) == 0:
        return [], [], []

    sentence_embeddings = sentence_filter_model.encode(cleaned_sentences, show_progress_bar=False)

    # Compute dot score between query and all document embeddings
    scores = util.cos_sim(QUERY_EMBED, sentence_embeddings)[0].cpu().tolist()

    # Combine docs & scores
    sentences_and_scores = list(zip(cleaned_sentences, scores, sentence_embeddings))

    logger.debug(f"Sentences and Scores: {sentences_and_scores}")

    # Sort by decreasing score
    sorted_sentences_and_scores = sorted(sentences_and_scores, key=lambda x: x[1], reverse=True)

    # return top 3 if available
    filtered_doc_score_pairs = (
        sorted_sentences_and_scores[:SENTENCES_LIMIT]
        if len(sorted_sentences_and_scores) >= SENTENCES_LIMIT
        else sorted_sentences_and_scores
    )

    # split sentence, score and embedding
    sentences, scores, embeddings = zip(*filtered_doc_score_pairs)

    return sentences, scores, embeddings

In [None]:
genre_sentences = {}

item_coumt = 0


if not Path(GENRES_SENTENCES_FILE_PATH).exists() or CLEAN_RUN:
    log.info("Creating genre_sentences file")

    for data in json_data_iterator:
        if item_coumt == LIMIT:
            break

        log.debug(f"Processing data: {data}")
        id = data["_id"]
        # rewrite above line with corrected f string
        text = f"{data['_source']['title']} {data['_source']['description']}"

        # check if tags key exists in data
        if "tags" in data["_source"]:
            tags = data["_source"]["tags"]
        else:
            tags = []

        preview_file_name = data["_source"]["finder_data"]["s3_preview_filename"]

        sentences, scores, embeddings = get_genre_sentence(text)

        log.debug(f"Result: {sentences, scores, embeddings}")

        genre_sentences[id] = {
            "text": text,
            "tags": tags,
            "genre_sentences": sentences,
            "genre_sentences_scores": scores,
            "genre_sentences_embeddings": embeddings,
            "preview_file_name": preview_file_name,
        }

        item_coumt += 1

        if item_coumt % 1000 == 0:
            log.info(f"Processed {item_coumt} items")
            save_dict_to_pickle_file(genre_sentences, GENRES_SENTENCES_FILE_PATH)


else:
    log.info("Reading genre_sentences file")
    genre_sentences = read_dict_from_pickle_file(GENRES_SENTENCES_FILE_PATH)

save_dict_to_pickle_file(genre_sentences, GENRES_SENTENCES_FILE_PATH)

In [None]:
genre_embeddings = genre_match_model.encode(GENRES, show_progress_bar=False)
log.info(f"genre_embeddings: {len(genre_embeddings)}")

In [None]:
def get_genre_names(genre_ids):
    return [GENRES[genre_id] for genre_id, _ in genre_ids]


def get_genres(text_array):
    if len(text_array) == 0:
        return []

    text_embeddings = genre_match_model.encode(text_array, show_progress_bar=False)

    genre_scores = util.cos_sim(genre_embeddings, text_embeddings).cpu().tolist()

    log.debug(f"All genre_scores: {(genre_scores)}")
    genre_scores = [max(scores) for scores in genre_scores]

    # Combine docs & scores
    genre_scores = list(zip(GENRE_IDS, genre_scores))

    logger.debug(f"Genre Scores: {genre_scores}")

    # Sort by decreasing score
    sorted_genres = sorted(genre_scores, key=lambda x: x[1], reverse=True)
    log.debug(f"Sorted genre_scores: {(sorted_genres)}")

    # restrict genre scores to 2 decimal places
    rounded_genres = [(score[0], np.round(score[1], 2)) for score in sorted_genres]
    log.debug(f"Rounded genre_scores: {(rounded_genres)}")

    # return top 3 if available
    filtered_sorted_genres = rounded_genres[:GENRES_LIMIT] if len(rounded_genres) >= GENRES_LIMIT else rounded_genres

    # log.debug(f"Top {GENRES_LIMIT} genres for {key}: {genre_sentences[key]['genres']} Preview: {get_s3_file_url(value['preview_file_name'])}")

    return filtered_sorted_genres


for key, value in genre_sentences.items():
    # log.info(f"key: {key}, value: {value}")
    genre_sentences_text = value["genre_sentences"]

    if len(genre_sentences_text) == 0:
        continue

    text_genres = get_genres(genre_sentences_text)

    genre_sentences[key]["text_genre_scores"] = text_genres

    log.debug(
        f"Text Top {GENRES_LIMIT} for {key}: {get_genre_names(text_genres)} Preview: {get_s3_file_url(value['preview_file_name'])}"
    )

    tags = value["tags"]
    if len(tags) == 0:
        continue

    tags_genres = get_genres(tags)

    genre_sentences[key]["tags_genre_scores"] = tags_genres

    log.debug(
        f"Tags Top {GENRES_LIMIT} for {key}: {get_genre_names(tags_genres)} Preview: {get_s3_file_url(value['preview_file_name'])}"
    )

    log.debug(
        f"Top {GENRES_LIMIT} for {key}: Text: {get_genre_names(text_genres)} Genre Tags: {get_genre_names(tags_genres)} User Tags: {tags} Preview: {get_s3_file_url(value['preview_file_name'])}"
    )


save_dict_to_pickle_file(genre_sentences, GENRES_SENTENCES_FILE_PATH)

In [None]:
# log all genres along with their respective genre ids
for genre_id, genre in enumerate(GENRES):
    log.info(f"{genre_id}: {genre}")

In [None]:
FILTER_GENRE_ID = 41
FILTER_GENRE_NAME = GENRES[FILTER_GENRE_ID].strip().lower().replace(" ", "_")
display_threshold = 0.9

# read pickle file and print preview image urls for genres index 2
genre_sentences = read_dict_from_pickle_file(GENRES_SENTENCES_FILE_PATH)

filtered_nfts = {}


def filter_genre(genre_id, threshold):
    # Filter and print preview image urls for genres index 2
    for key, value in genre_sentences.items():
        genre_scores = []
        # check if genre_scores key exists in value
        if "text_genre_scores" in value and len(value["text_genre_scores"]) != 0:
            genre_scores.extend(value["text_genre_scores"])

        if "tags_genre_scores" in value and len(value["tags_genre_scores"]) != 0:
            genre_scores.extend(value["tags_genre_scores"])

        if len(genre_scores) == 0:
            continue

        # Filter genre scores for genre id
        genre_scores = [score[1] for score in genre_scores if score[0] == genre_id]

        # check if any genre scores is greater than threshold
        if len(genre_scores) == 0 or max(genre_scores) < threshold:
            continue

        log.debug(f"genre_scores: {(genre_scores)}")

        # print preview image urls and score
        # log.info(f"{GENRES[FILTER_GENRE_ID]} Score: {genre_scores[0]} Preview: {get_s3_file_url(value['preview_file_name'])} Description: {value['text']} ")

        filtered_nfts[key] = value


filter_genre(FILTER_GENRE_ID, display_threshold)

# save filtered nfts to pickle file
save_dict_to_pickle_file(filtered_nfts, FILTERED_NFTS_FILE_PATH)
metadata = [
    {
        "genre": GENRES[FILTER_GENRE_ID],
        "genre_id": FILTER_GENRE_ID,
        "threshold": display_threshold,
        "count": len(filtered_nfts),
        "sentence_limit": SENTENCES_LIMIT,
        "genre_limit": GENRES_LIMIT,
        "sentence_filter_model": SENTENCES_FILTER_MODEL_ID,
        "genre_match_model": GENRE_MATCH_MODEL_ID,
    }
]
srsly.write_jsonl(FILTERED_NFTS_METADATA_FILE_PATH, metadata)

In [None]:
# print filtered nfts along with their genre , score, preview image url and description
for key, value in filtered_nfts.items():
    text_genre_scores = []
    text_genre_ids = []
    tags_genre_ids = []
    # check if genre_scores key exists in value
    if "text_genre_scores" in value and len(value["text_genre_scores"]) != 0:
        text_genre_scores.extend(value["text_genre_scores"])
        text_genre_ids = [score[0] for score in value["text_genre_scores"]]

    tags_genre_scores = []
    if "tags_genre_scores" in value and len(value["tags_genre_scores"]) != 0:
        tags_genre_scores.extend(value["tags_genre_scores"])
        tags_genre_ids = [score[0] for score in value["tags_genre_scores"]]

    # find common genre ids between text and tags
    common_genre_ids = set(text_genre_ids).intersection(tags_genre_ids)

    log.info(
        f"""
{GENRES[FILTER_GENRE_ID]} Common Genres: {common_genre_ids}  Text Genre Scores: {text_genre_scores}  Tags Genre Scores: {tags_genre_scores} Preview: {get_s3_file_url(value['preview_file_name'])} 
             Genre Sentences: {value['genre_sentences']} """
    )