# Setup
Install the required libraries.

In [None]:
%pip uninstall protobuf -y


[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Found existing installation: protobuf 3.19.3
Uninstalling protobuf-3.19.3:
  Successfully uninstalled protobuf-3.19.3
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


In [None]:
%pip install torch transformers accelerate einops langchain xformers bitsandbytes faiss-gpu sentence_transformers sentencepiece openai==0.27.7 "pinecone-client[grpc]"==2.2.1 pinecone-client==2.2.1 langchain==0.0.162 tiktoken==0.4.0 datasets==2.12.0


[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting protobuf==3.19.3
  Using cached protobuf-3.19.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.1 MB)
Installing collected packages: protobuf
  Attempting uninstall: protobuf
    Found existing installation: protobuf 3.20.3
    Uninstalling protobuf-3.20.3:
      Successfully uninstalled protobuf-3.20.3
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
databricks-feature-store 0.16.1 requires pyspark<4,>=3.1.2, which is not installed.
tensorflow 2.14.0 requires protobuf!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<5.0.0dev,>=3.20.3, but you have protobuf 3.19.3 which is incompatible.
tensorboard 2.14.0 requires protobuf>=3.19.6, but you have protobuf 3.19.3 which is incompatible.
tensorboard-plugin-profile 2.14.0 requi

In [None]:
%pip install protobuf==3.20.3


[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting protobuf==3.20.3
  Using cached protobuf-3.20.3-cp310-cp310-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (1.1 MB)
Installing collected packages: protobuf
  Attempting uninstall: protobuf
    Found existing installation: protobuf 3.19.3
    Uninstalling protobuf-3.19.3:
      Successfully uninstalled protobuf-3.19.3
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
databricks-feature-store 0.16.1 requires pyspark<4,>=3.1.2, which is not installed.
Successfully installed protobuf-3.20.3
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


In [None]:
%pip freeze | grep protobuf


protobuf==3.20.3


In [None]:
dbutils.library.restartPython()


# Download and extract data
- Download the Google Local Data (2021) for Maryland along with the metadata for businesses
- Save the data to Databricks dbfs to prevent the need for downloading again in subsequent runs

In [None]:
# !wget https://datarepo.eng.ucsd.edu/mcauley_group/gdrive/googlelocal/review-Maryland_10.json.gz -O /tmp/review-Maryland_10.json.gz && gunzip /tmp/review-Maryland_10.json.gz && head -n 1000000 /tmp/review-Maryland_10.json > /tmp/review_1M.json


In [None]:
# !wget https://datarepo.eng.ucsd.edu/mcauley_group/gdrive/googlelocal/meta-Maryland.json.gz -O /tmp/meta-Maryland.json.gz && gunzip /tmp/meta-Maryland.json.gz


In [None]:
# # Move files to Databricks file system
# dbutils.fs.mv("file:/tmp/review_1M.json", "dbfs:/FileStore/review-Maryland-1M.json")
# dbutils.fs.mv("file:/tmp/review-Maryland_10.json", "dbfs:/FileStore/review-Maryland_10.json")
# dbutils.fs.mv("file:/tmp/meta-Maryland.json", "dbfs:/FileStore/meta-Maryland.json")


# Read Data

Create spark dataframes for both reviews data and metadata and join them using gmap_id

In [None]:
from pyspark.sql.functions import *

review_df = spark.read.json("dbfs:/FileStore/review-Maryland-1M.json")
metadata_df = spark.read.json("dbfs:/FileStore/meta-Maryland.json").withColumnsRenamed({'name': 'business_name', 'gmap_id': 'business_gmap_id'})
data = review_df.join(
    metadata_df, 
    review_df.gmap_id == metadata_df.business_gmap_id
)


In [None]:
data.show(10)


+--------------------+-----------------+--------------------+------+----+--------------------+-------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+----------+-----------+------------------+--------------+-----+--------------------+-----------------+--------------------+
|             gmap_id|             name|                pics|rating|resp|                text|         time|             user_id|                MISC|             address|avg_rating|            category|         description|    business_gmap_id|               hours|  latitude|  longitude|     business_name|num_of_reviews|price|    relative_results|            state|                 url|
+--------------------+-----------------+--------------------+------+----+--------------------+-------------+--------------------+--------------------+--------------------+----------+--------------------+-----------------

# Run pipeline to map data and write to Pinecone

## Using a udf to create Documents to use for generating embeddings 

In [None]:
import json
import hashlib
from pyspark.sql.types import StringType
from pyspark.sql.functions import to_json

def safe_get(val):
  return val if val is not None else "N/A"

@udf(returnType=StringType())
def to_id(user_id, gmap_id):
    _id  = hashlib.md5(f"{user_id}|{gmap_id}".encode("utf-8")).hexdigest()
    return _id

@udf(returnType=StringType())
def to_document(address, business_name, review_text, misc, hours, url, latitude, longitude, description, categories, avg_rating):
    address = safe_get(address)
    business_name = safe_get(business_name)
    review_text = safe_get(review_text)[:2000]
    misc_str = "{}"
    if misc is not None:
        misc_dict = misc.asDict()
        misc_data = {k: v for k, v in misc_dict.items() if v is not None }
        misc_str = json.dumps(misc_data)
    hours = safe_get(hours)
    url = safe_get(url)
    lat_long = f"{latitude}, {longitude}" if latitude is not None and longitude is not None else "N/A"
    description = safe_get(description)[:1500]
    categories = ", ".join(categories) if categories is not None and len(categories) > 0 else "N/A"
    rating = f"{float(avg_rating):.2f}" if avg_rating else "N/A"
    doc = f"### Address: {address}\n### Name: {business_name}\n### Review: {review_text}\n### Average Rating: {rating}\n### Hours: {hours}\n### URL: {url}\n### Description: {description}\n### Categories: {categories}\n### Latitude/Longitude: {lat_long}\n### Hours: {hours}\n### Misc: {misc_str}"
    return doc

pinecone_df = data.select(
    to_id("user_id", "business_gmap_id").alias("document_id"), 
    to_document("address", "business_name", "text", "MISC", "hours", "url", "latitude", "longitude", "description", "category", "avg_rating" ).alias("document")
)


## Setup Pinecone
- Initialize index if it doesn't exist

In [None]:
# Setup Pinecone
import pinecone

PINECONE_API_KEY = dbutils.secrets.get(scope="business-retrieval-app", key="PINECONE_API_KEY")
environment = 'us-east-1-aws'
pinecone.init(api_key=PINECONE_API_KEY, environment=environment)

index_name = 'business-listings'

if index_name not in pinecone.list_indexes():
    pinecone.create_index(
        name=index_name,
        metric='dotproduct',
        dimension=384
    )
index = pinecone.Index(index_name=index_name)


  from tqdm.autonotebook import tqdm


## Initialize the embedding model

In [None]:
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql.types import *
from typing import Iterator,Tuple
from langchain.embeddings import HuggingFaceEmbeddings, SentenceTransformerEmbeddings

embedder = SentenceTransformerEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")


Downloading (…)e9125/.gitattributes:   0%|          | 0.00/1.18k [00:00<?, ?B/s]

Downloading (…)_Pooling/config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Downloading (…)7e55de9125/README.md:   0%|          | 0.00/10.6k [00:00<?, ?B/s]

Downloading (…)55de9125/config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

Downloading (…)ce_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

Downloading (…)125/data_config.json:   0%|          | 0.00/39.3k [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

Downloading (…)nce_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

Downloading (…)e9125/tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

Downloading (…)okenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

Downloading (…)9125/train_script.py:   0%|          | 0.00/13.2k [00:00<?, ?B/s]

Downloading (…)7e55de9125/vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading (…)5de9125/modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

## Define map function to generate embeddings

In [None]:
import pyspark.pandas as ps
from typing import TypeAlias
import numpy as np

def create_embeddings(pdf)-> pd.DataFrame["id": str, "values": list[np.float32], "namespace": str, "metadata": str]:
    embedder = SentenceTransformerEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
    documents = pdf.document.to_list()
    document_ids = pdf.document_id.to_list()
    embeddings = embedder.embed_documents(documents)
    metadata = [json.dumps({'doc_id': str(row.document_id), 'document': row.document}) for _, row in pdf.iterrows()]
    return pd.DataFrame({"id": document_ids, "values": embeddings, "namespace": "", "metadata": metadata})


In [None]:
mapped_df = pinecone_df.pandas_api().pandas_on_spark.apply_batch(create_embeddings).to_spark()


## Run the pipeline and write to Pinecone

In [None]:

(
    mapped_df.write
    .option("pinecone.apiKey", PINECONE_API_KEY)
    .option("pinecone.environment", environment)
    .option("pinecone.projectName", pinecone.whoami().projectname)
    .option("pinecone.indexName", index_name)
    .format("io.pinecone.spark.pinecone.Pinecone")
    .mode("append")
    .save()
)
