In [None]:
!pip3 -q install pyspark weaviate-client > /dev/null

In [34]:
!pip show pyspark

Name: pyspark
Version: 3.4.0
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: /usr/local/lib/python3.10/dist-packages
Requires: py4j
Required-by: 


In [None]:
!sudo apt-get install scala

In [None]:
!wget https://repo1.maven.org/maven2/io/weaviate/spark-connector_2.13/1.2.4/spark-connector_2.13-1.2.4.jar

In [None]:
!mv /content/spark-connector_2.13-1.2.4.jar /usr/local/lib/python3.10/dist-packages/pyspark/jars

In [36]:
from pyspark.sql import SparkSession
import os
#.config("spark.jars",
 #       "weaviate-spark-connector-assembly-v0.1.2.jar" )
spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("weaviate")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

In [None]:
!wget https://storage.googleapis.com/sphere-demo/sphere.100k.jsonl.tar.gz

In [None]:
!gunzip /content/sphere.100k.jsonl.tar.gz

In [None]:
!tar -xvf /content/sphere.100k.jsonl.tar

sphere.100k.jsonl


In [None]:
df = spark.read.load("sphere.100k.jsonl", format="json")

In [None]:
import weaviate
from weaviate.embedded import EmbeddedOptions

client = weaviate.Client(embedded_options=EmbeddedOptions())

embedded weaviate is already listing on port 6666


In [38]:
new_df = df.limit(5)

In [43]:
client.schema.delete_all()

client.schema.create_class(
    {
        "class": "Sphere",
        "properties": [
            {
                "name": "raw",
                "dataType": ["string"]
            },
            {
                "name": "sha",
                "dataType": ["string"]
            },
            {
                "name": "title",
                "dataType": ["string"]
            },
            {
                "name": "url",
                "dataType": ["string"]
            },
        ],
     "vectorizer":"text2vec-huggingface"
    }
)

In [None]:
df.limit(1500).withColumnRenamed("id", "uuid").write.format("io.weaviate.spark.Weaviate") \
    .option("batchSize", 200) \
    .option("scheme", "http") \
    .option("host", "localhost:8080") \
    .option("id", "uuid") \
    .option("className", "Sphere") \
    .option("vector", "vector") \
    .mode("overwrite").save()

Here's what happens behind the scenes when the client uses the embedded options in the instantiation call:

The client downloads a Weaviate release from GitHub and caches it
It then spawns a Weaviate process with a data directory configured to a specific location, and listening to the specified port (by default 6666)

The server's STDOUT and STDERR are piped to the client

The client connects to this server process (e.g. to http://127.0.0.1:6666) and runs the client code
After running the code (when the application terminates), the client shuts down the Weaviate process

The data directory is preserved, so subsequent invocations have access to the data from all previous invocations, across all clients using the embedded option.

https://ai.facebook.com/blog/introducing-sphere-meta-ais-web-scale-corpus-for-better-knowledge-intensive-nlp/

https://huggingface.co/facebook/dpr-question_encoder-single-nq-base

https://weaviate.io/blog/sphere-dataset-in-weaviate

In [None]:
import time
# Configure a batch process
with client.batch as batch:
    batch.batch_size=5
    batch = 0
    for iterator in new_df.collect():
        print(f"processing batch {batch}")
        properties = {
            "raw": iterator['raw'],
            "sha": iterator['sha'],
            "title": iterator['title'],
            "url": iterator['url'],
        }
        time.sleep(8)
        client.batch.add_data_object(properties,"Sphere")

In [45]:
nearText = {"concepts":["Clearcreek chapel"]}

result = (
    client.query
    .get("Sphere", ["url", "raw"])
    .with_near_text(nearText)
    .with_limit(2)
    .do()
)

In [46]:
result

{'data': {'Get': {'Sphere': []}}}

In [47]:
client.schema.create_class(
    {
        "class": "Sphere_vector",
        "description" : "Sphere vectorizer pipeline",
        "moduleConfig": {
        "text2vec-huggingface": {
          "model": "sentence-transformers/all-MiniLM-L6-v2",
          "options": {
            "waitForModel": True,
            "useGPU": False,
            "useCache": True
            }
          }
        },
        "properties": [
            {
                "name": "raw",
                "dataType": ["string"]
            },
            {
                "name": "sha",
                "dataType": ["string"]
            },
            {
                "name": "title",
                "dataType": ["string"]
            },
            {
                "name": "url",
                "dataType": ["string"]
            },
        ],
     "vectorizer":"text2vec-huggingface"
    }
)

In [48]:
import time
# Configure a batch process
with client.batch as batch:
    batch.batch_size=5
    batch = 0
    for iterator in new_df.collect():
        print(f"processing batch {batch}")
        properties = {
            "raw": iterator['raw'],
            "sha": iterator['sha'],
            "title": iterator['title'],
            "url": iterator['url'],
        }
        time.sleep(8)
        client.batch.add_data_object(properties,
                                     "Sphere_vector")

processing batch 0
processing batch 0
processing batch 0
processing batch 0
processing batch 0


In [51]:
nearText = {"concepts":["Clearcreek chapel"]}

result = (
    client.query
    .get("Sphere_vector", ["url", "raw"])
    .with_near_text(nearText)
    .with_limit(2)
    .do()
)
result

{'data': {'Get': {'Sphere_vector': [{'raw': 'Clearcreek Chapel Counseling: Blog ). Boundaries seems to focus more on what is comfortable for the victim of abuse, rather than calling the Christian towards holiness in the midst of significant suffering. Many counseling systems start with the philosophy of reclaiming what is rightfully yours and regaining control of your own life, which you believe was taken by your abuser. This approach is born out of the objective to forbid any additional abuse. While this objective is appropriate, if it becomes your sole focus you will become primarily concerned with self, personal comfort, and total control. You will ultimately lack a God-centered worldview in',
     'url': 'http://chapelcounseling.org/blog/the-problem-with-boundaries-and-feelings-based-counseling/'},
    {'raw': 'Discover event places and locations in Bulgaria Sofia across from the Alexander Nevsky Cathedral, the Radisson Blu Grand Hotel Sofia offers free Wi-Fi and a fitness center. 

In [52]:
client.schema.create_class(
    {
        "class": "Sphere_auto",
        "description" : "Sphere vectorizer auto Schema",
        "moduleConfig": {
        "text2vec-huggingface": {
          "model": "sentence-transformers/all-MiniLM-L6-v2",
          "options": {
            "waitForModel": True,
            "useGPU": False,
            "useCache": True
            }
          }
        },
        "properties": [],
    }
)

In [53]:
import time
# Configure a batch process
with client.batch as batch:
    batch.batch_size=5
    batch = 0
    for iterator in new_df.collect():
        print(f"processing batch {batch}")
        properties = {
            "raw": iterator['raw'],
            "sha": iterator['sha'],
            "title": iterator['title'],
            "url": iterator['url'],
        }
        time.sleep(8)
        client.batch.add_data_object(properties,
                                     "Sphere_auto",
                                     vector=iterator['vector'])

processing batch 0
processing batch 0
processing batch 0
processing batch 0
processing batch 0


In [56]:
nearText = {"concepts":["Clearcreek chapel"]}

result = (
    client.query
    .get("Sphere_auto", ["url", "raw"])
    .with_near_text(nearText)
    .with_limit(2)
    .do()
)
result

{'errors': [{'locations': [{'column': 27, 'line': 1}],
   'message': 'Unknown argument "nearText" on field "Sphere_auto" of type "GetObjectsObj". Did you mean "nearObject" or "nearVector"?',
   'path': None}]}