In [1]:
import datetime
FEATURE_TIME = datetime.datetime(year=2023, month=5, day=18, hour=0, minute=0, second=0)

In [2]:
import findspark
# $ cd /opt
# /opt$ sudo ln -s ~/apps/spark-3.4.0-bin-hadoop3 spark
findspark.init("/opt/spark") 

In [3]:
# import pyspark
# sc = pyspark.SparkContext(appName="globalContent")

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("LensFeatures").getOrCreate()

23/05/18 09:47:09 WARN Utils: Your hostname, VIJAYs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.87.240 instead (on interface en0)
23/05/18 09:47:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/18 09:47:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
posts_df = spark.read.csv("/tmp/lens_bigquery_csvs/public_profile_post/", 
                          header=True, 
                          escape="\"",
                          multiLine=True,
                          inferSchema=True)

                                                                                

In [None]:
# posts_df.show(5, truncate=False)

In [5]:
print(f"total number of records ${posts_df.count()}")

[Stage 2:>                                                          (0 + 1) / 1]

total number of records $452441


                                                                                

In [6]:
posts_df.printSchema()

root
 |-- post_id: string (nullable = true)
 |-- contract_publication_id: string (nullable = true)
 |-- profile_id: string (nullable = true)
 |-- content_uri: string (nullable = true)
 |-- s3_metadata_location: string (nullable = true)
 |-- collect_nft_address: string (nullable = true)
 |-- reference_implementation: string (nullable = true)
 |-- reference_return_data: string (nullable = true)
 |-- is_related_to_post: string (nullable = true)
 |-- is_related_to_comment: string (nullable = true)
 |-- is_metadata_processed: boolean (nullable = true)
 |-- has_error: boolean (nullable = true)
 |-- metadata_error_reason: string (nullable = true)
 |-- tx_hash: string (nullable = true)
 |-- is_hidden: boolean (nullable = true)
 |-- timeout_request: boolean (nullable = true)
 |-- app_id: string (nullable = true)
 |-- block_timestamp: timestamp (nullable = true)
 |-- created_block_hash: string (nullable = true)
 |-- metadata_version: string (nullable = true)
 |-- language: string (nullable = tru

In [7]:
posts_df = posts_df.select(
    "post_id", 
    "profile_id",
    "is_related_to_post", 
    "is_related_to_comment", 
    "is_hidden", 
    "is_gated",
    "block_timestamp",
    "language",
    "region",
    "content_warning",
    "main_content_focus",
    "tags_vector",
    "custom_filters_gardener_flagged"
)

In [8]:
from pyspark.sql.functions import col, when, lit, datediff
from pyspark.sql.types import IntegerType,BooleanType,TimestampType

posts_df = posts_df.withColumn("is_original", 
                               when(posts_df.is_related_to_post.isNull() & 
                                    posts_df.is_related_to_comment.isNull(), lit(True)) \
                               .otherwise(lit(False))) \
                    .withColumn("is_content_warning",
                                when(posts_df.content_warning.isNull(), lit(False)) \
                                .otherwise(lit(True))) \
                    .withColumn("is_hidden", col("is_hidden").cast(BooleanType())) \
                    .withColumn("is_gated", col("is_gated").cast(BooleanType())) \
                    .withColumn("custom_filters_gardener_flagged", 
                                col("custom_filters_gardener_flagged").cast(BooleanType())) \
                    .withColumn("block_timestamp", col("block_timestamp").cast(TimestampType())) \
                    .withColumn("age", datediff(lit(FEATURE_TIME), col("block_timestamp")))

In [9]:
posts_df = posts_df.drop(col("is_related_to_post")) \
                    .drop(col("is_related_to_comment"))

In [10]:
posts_df.printSchema()

root
 |-- post_id: string (nullable = true)
 |-- profile_id: string (nullable = true)
 |-- is_hidden: boolean (nullable = true)
 |-- is_gated: boolean (nullable = true)
 |-- block_timestamp: timestamp (nullable = true)
 |-- language: string (nullable = true)
 |-- region: string (nullable = true)
 |-- main_content_focus: string (nullable = true)
 |-- tags_vector: string (nullable = true)
 |-- custom_filters_gardener_flagged: boolean (nullable = true)
 |-- is_original: boolean (nullable = false)
 |-- age: integer (nullable = true)



In [11]:
pub_stats_df = spark.read.parquet("/tmp/lens_bigquery_csvs/public_publication_stats_05102023.pqt")

In [None]:
# pub_stats_df.show(5, truncate=False)

In [12]:
print(f"total number of records ${pub_stats_df.count()}")

total number of records $8747109


In [13]:
pub_stats_df.printSchema()

root
 |-- publication_id: string (nullable = true)
 |-- total_amount_of_collects: long (nullable = true)
 |-- total_amount_of_mirrors: long (nullable = true)
 |-- total_amount_of_comments: long (nullable = true)
 |-- total_upvotes: long (nullable = true)
 |-- total_downvotes: long (nullable = true)
 |-- datastream_metadata: struct (nullable = true)
 |    |-- uuid: string (nullable = true)
 |    |-- source_timestamp: long (nullable = true)



In [14]:
pub_stats_df = pub_stats_df.drop(col("datastream_metadata"))

In [15]:
pub_stats_df.printSchema()

root
 |-- publication_id: string (nullable = true)
 |-- total_amount_of_collects: long (nullable = true)
 |-- total_amount_of_mirrors: long (nullable = true)
 |-- total_amount_of_comments: long (nullable = true)
 |-- total_upvotes: long (nullable = true)
 |-- total_downvotes: long (nullable = true)



In [16]:
posts_features_df = posts_df.join(pub_stats_df, posts_df.post_id == pub_stats_df.publication_id, "leftouter")

In [17]:
posts_features_df.printSchema()

root
 |-- post_id: string (nullable = true)
 |-- profile_id: string (nullable = true)
 |-- is_hidden: boolean (nullable = true)
 |-- is_gated: boolean (nullable = true)
 |-- block_timestamp: timestamp (nullable = true)
 |-- language: string (nullable = true)
 |-- region: string (nullable = true)
 |-- main_content_focus: string (nullable = true)
 |-- tags_vector: string (nullable = true)
 |-- custom_filters_gardener_flagged: boolean (nullable = true)
 |-- is_original: boolean (nullable = false)
 |-- age: integer (nullable = true)
 |-- publication_id: string (nullable = true)
 |-- total_amount_of_collects: long (nullable = true)
 |-- total_amount_of_mirrors: long (nullable = true)
 |-- total_amount_of_comments: long (nullable = true)
 |-- total_upvotes: long (nullable = true)
 |-- total_downvotes: long (nullable = true)



In [None]:
# posts_features_df.show(5, truncate=False)

In [18]:
print(f"total number of records ${posts_features_df.count()}")

                                                                                

total number of records $452441


In [19]:
# Credentials saved to file: [/Users/vijay/.config/gcloud/application_default_credentials.json]
# These credentials will be used by any library that requests Application Default Credentials (ADC).

! gcloud auth application-default login

Your browser has been opened to visit:

    https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=764086051850-6qr4p6gpi6hn506pt8ejuq83di341hur.apps.googleusercontent.com&redirect_uri=http%3A%2F%2Flocalhost%3A8085%2F&scope=openid+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fuserinfo.email+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fsqlservice.login+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Faccounts.reauth&state=vKqPT3tW2XsCCaR5CYtlQib7PRZdST&access_type=offline&code_challenge=KcH0qYT3nS_9yFvyhdDChR0B1E4fSJoNyszEr8vaUjc&code_challenge_method=S256


Credentials saved to file: [/Users/vijay/.config/gcloud/application_default_credentials.json]

These credentials will be used by any library that requests Application Default Credentials (ADC).

Quota project "boxwood-well-386122" was added to ADC which can be used by Google client libraries for billing and quota. Note that some services may still bill the project owning the

In [20]:
PROJECT_ID = "boxwood-well-386122"
REGION = "us-central1"
BUCKET_URI = "gs://vijay-lens-feature-store-temp"  
from google.cloud import aiplatform
aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

In [21]:
FEATURESTORE_ID = "lens_featurestore_d2"
from google.cloud.aiplatform import Feature, Featurestore
fs = Featurestore(
    featurestore_name=FEATURESTORE_ID
)
print(fs.gca_resource)

name: "projects/1181216607/locations/us-central1/featurestores/lens_featurestore_d2"
create_time {
  seconds: 1684349445
  nanos: 286219000
}
update_time {
  seconds: 1684349445
  nanos: 525844000
}
etag: "AMEw9yN_VdUOhyR47h1eX-UHdwocdcU-ipwNT9XwDt4kxAhP0MBvGiAd0MDcNaGYnzFI"
online_serving_config {
}
state: STABLE



In [22]:
posts_entity_type = fs.get_entity_type(entity_type_id="posts")

In [23]:
POSTS_FEATURES_IDS = [feature.name for feature in posts_entity_type.list_features()]
print(POSTS_FEATURES_IDS)



In [24]:
# for f in ['recommend' ]: POSTS_FEATURES_IDS.remove(f) 
POSTS_FEATURES_IDS.remove('recommend')
print(POSTS_FEATURES_IDS)



In [25]:
# converting pyspark to pandas throws error when 
# trying to convert timestamps that have nanoseconds
from pyspark.sql.functions import date_format
POSTS_DF = posts_features_df \
            .withColumn("block_timestamp", date_format("block_timestamp", "yyyy-MM-dd HH:mm:ss")) \
            .toPandas()

                                                                                

In [26]:
POSTS_DF.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 452441 entries, 0 to 452440
Data columns (total 20 columns):
 #   Column                           Non-Null Count   Dtype 
---  ------                           --------------   ----- 
 0   post_id                          452441 non-null  object
 1   profile_id                       452441 non-null  object
 2   is_hidden                        452441 non-null  bool  
 3   is_gated                         452441 non-null  bool  
 4   block_timestamp                  452441 non-null  object
 5   language                         424234 non-null  object
 6   region                           362 non-null     object
 8   main_content_focus               450923 non-null  object
 9   tags_vector                      69448 non-null   object
 10  custom_filters_gardener_flagged  452441 non-null  bool  
 11  is_original                      452441 non-null  bool  
 13  age                              452441 non-null  int32 
 14  publication_id  

In [27]:
# since field names in dataframe and featurestore are different
# we need to create a mapping for just those fields
POSTS_SRC_FIELDS = {
    'collects': 'total_amount_of_collects',
    'upvotes': 'total_upvotes',
    'mirrors': 'total_amount_of_mirrors',
    'downvotes': 'total_downvotes',
    'comments': 'total_amount_of_comments',
}

In [28]:
POSTS_ENTITY_ID_FIELD = "post_id"

In [29]:
POSTS_DF['age'] = POSTS_DF['age'].astype('Int64')
POSTS_DF['total_amount_of_collects'] = POSTS_DF['total_amount_of_collects'].astype('Int64')
POSTS_DF['total_amount_of_mirrors'] = POSTS_DF['total_amount_of_mirrors'].astype('Int64')
POSTS_DF['total_amount_of_comments'] = POSTS_DF['total_amount_of_comments'].astype('Int64')
POSTS_DF['total_upvotes'] = POSTS_DF['total_upvotes'].astype('Int64')
POSTS_DF['total_downvotes'] = POSTS_DF['total_downvotes'].astype('Int64')

In [None]:
# Featurestore ingest requires Cloud Resource Manager API to be enabled
# Uncomment below line to enable once for your gcp project
# ! gcloud services enable cloudresourcemanager.googleapis.com

In [30]:
# ingest_from_df(
#     feature_ids: List[str],
#     feature_time: Union[str, datetime.datetime],
#     df_source: pd.DataFrame,
#     feature_source_fields: Optional[Dict[str, str]] = None,
#     entity_id_field: Optional[str] = None,
#     request_metadata: Optional[Sequence[Tuple[str, str]]] = (),
#     ingest_request_timeout: Optional[float] = None,
# )
posts_entity_type.ingest_from_df(
    feature_ids = POSTS_FEATURES_IDS,
    feature_time = FEATURE_TIME,
    df_source = POSTS_DF,
    feature_source_fields = POSTS_SRC_FIELDS,
    entity_id_field = POSTS_ENTITY_ID_FIELD,
)

Importing EntityType feature values: projects/1181216607/locations/us-central1/featurestores/lens_featurestore_d2/entityTypes/posts


INFO:google.cloud.aiplatform.featurestore._entity_type:Importing EntityType feature values: projects/1181216607/locations/us-central1/featurestores/lens_featurestore_d2/entityTypes/posts


Import EntityType feature values backing LRO: projects/1181216607/locations/us-central1/featurestores/lens_featurestore_d2/entityTypes/posts/operations/726482723002122240


INFO:google.cloud.aiplatform.featurestore._entity_type:Import EntityType feature values backing LRO: projects/1181216607/locations/us-central1/featurestores/lens_featurestore_d2/entityTypes/posts/operations/726482723002122240


EntityType feature values imported. Resource name: projects/1181216607/locations/us-central1/featurestores/lens_featurestore_d2/entityTypes/posts


INFO:google.cloud.aiplatform.featurestore._entity_type:EntityType feature values imported. Resource name: projects/1181216607/locations/us-central1/featurestores/lens_featurestore_d2/entityTypes/posts


<google.cloud.aiplatform.featurestore.entity_type.EntityType object at 0x122a46b20> 
resource name: projects/1181216607/locations/us-central1/featurestores/lens_featurestore_d2/entityTypes/posts

23/05/18 09:59:15 WARN JavaUtils: Attempt to delete using native Unix OS command failed for path = /private/var/folders/jg/2ktpnbqx0_1b_cw7pfxlbbbc0000gn/T/blockmgr-63afe0bb-53a2-4905-973c-b7f55c679eab. Falling back to Java IO way
java.io.IOException: Failed to delete: /private/var/folders/jg/2ktpnbqx0_1b_cw7pfxlbbbc0000gn/T/blockmgr-63afe0bb-53a2-4905-973c-b7f55c679eab
	at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingUnixNative(JavaUtils.java:177)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:113)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:94)
	at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1231)
	at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1(DiskBlockManager.scala:368)
	at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1$adapted(DiskBlockManager.scala:364)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.Indexe

In [None]:
# TODO 
# 1. featurestore ingest_from_df
# 2. "recommend" label
# 3. read from bigquery
# 4. checkpoint max(block_timestamp) for incremental reads from BigQuery