<img src = "https://github.com/singlestore-labs/spaces-notebooks/blob/e551e274bb67bb1e5081131ee1150cdba713fc43/common/images/singlestore-jupyter.png?raw=true">

<div id="singlestore-header" style="display: flex; background-color: rgba(235, 249, 245, 0.25); padding: 5px;">
    <div id="icon-image" style="width: 90px; height: 90px;">
        <img width="100%" height="100%" src="https://raw.githubusercontent.com/singlestore-labs/spaces-notebooks/master/common/images/header-icons/browser.png" />
    </div>
    <div id="text" style="padding: 5px; margin-left: 10px;">
        <div id="badge" style="display: inline-block; background-color: rgba(0, 0, 0, 0.15); border-radius: 4px; padding: 4px 8px; align-items: center; margin-top: 6px; margin-bottom: -2px; font-size: 80%">SingleStore Notebooks</div>
        <h1 style="font-weight: 500; margin: 8px 0 0 4px;">Using Apache Spark Structured Streaming with SingleStore Notebooks</h1>
    </div>
</div>

In [4]:
!pip cache purge --quiet
!conda install -y --quiet -c conda-forge openjdk pyspark

[0mCollecting package metadata (current_repodata.json): ...working... done
Solving environment: ...working... done

# All requested packages already installed.



In [9]:
!pip install openai==0.28 --quiet
!pip install nltk --quiet

In [11]:
import getpass
import openai

os.environ["OPENAI_API_KEY"] = getpass.getpass("OpenAI API Key:")

OpenAI API Key: ········


In [12]:
import os

os.makedirs("jars", exist_ok = True)
os.makedirs("data", exist_ok = True)

In [14]:
import requests

def download_jar(url, destination):
    response = requests.get(url)
    with open(destination, "wb") as f:
        f.write(response.content)

jar_urls = [
    ("https://repo1.maven.org/maven2/com/singlestore/singlestore-jdbc-client/1.2.1/singlestore-jdbc-client-1.2.1.jar", "jars/singlestore-jdbc-client-1.2.1.jar"),
    ("https://repo1.maven.org/maven2/com/singlestore/singlestore-spark-connector_2.12/4.1.5-spark-3.5.0/singlestore-spark-connector_2.12-4.1.5-spark-3.5.0.jar", "jars/singlestore-spark-connector_2.12-4.1.5-spark-3.5.0.jar"),
    ("https://repo1.maven.org/maven2/org/apache/commons/commons-dbcp2/2.12.0/commons-dbcp2-2.12.0.jar", "jars/commons-dbcp2-2.12.0.jar"),
    ("https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.12.0/commons-pool2-2.12.0.jar", "jars/commons-pool2-2.12.0.jar"),
    ("https://repo1.maven.org/maven2/io/spray/spray-json_3/1.3.6/spray-json_3-1.3.6.jar", "jars/spray-json_3-1.3.6.jar")
]

for url, destination in jar_urls:
    download_jar(url, destination)

print("JAR files downloaded successfully")

JAR files downloaded successfully


In [16]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = (SparkSession
             .builder
             .config("spark.jars", ",".join([destination for _, destination in jar_urls]))
             .appName("Spark Streaming Test")
             .getOrCreate()
        )

spark.sparkContext.setLogLevel("ERROR")

24/04/02 08:39:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [18]:
import nltk
import random
from nltk.corpus import wordnet as wn
from nltk.tokenize import word_tokenize

# Download NLTK
nltk.download("punkt")
nltk.download("averaged_perceptron_tagger")
nltk.download("wordnet")
nltk.download("omw")

# Define the directory to save the files
output_dir = "data"

# Generate meaningful sentences
def generate_meaningful_sentence():
    # Choose a random set of synonyms from WordNet
    synset = random.choice(list(wn.all_synsets()))

    # Generate a sentence
    definition = synset.definition()
    tokens = word_tokenize(definition)

    # Capitalise the first word and end with a period
    tokens[0] = tokens[0].capitalize()
    tokens[-1] = tokens[-1] + "."

    return " ".join(tokens)

# Number of files to generate
num_files = 5

# Number of sentences in each file
num_sentences_per_file = 1

# Generate text files
for i in range(num_files):
    file_path = os.path.join(output_dir, f"file_{i+1}.txt")
    with open(file_path, "w") as file:
        for _ in range(num_sentences_per_file):
            # Generate a meaningful sentence
            sentence = generate_meaningful_sentence()
            file.write(sentence + "\n")

[nltk_data] Downloading package punkt to /home/jovyan/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /home/jovyan/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.
[nltk_data] Downloading package wordnet to /home/jovyan/nltk_data...
[nltk_data] Downloading package omw to /home/jovyan/nltk_data...


In [20]:
%%bash

for file in data/*.txt; do
    echo "File: $file"
    cat "$file"
    echo "----------------------"
done

File: data/file_1.txt
A small vehicle with four wheels in which a baby or child is pushed around.
----------------------
File: data/file_2.txt
Make a duplicate or duplicates of.
----------------------
File: data/file_3.txt
A symbol in a logical or mathematical expression that can be replaced by the name of any member of specified set.
----------------------
File: data/file_4.txt
Lean dried meat pounded fine and mixed with melted fat ; used especially by North American Indians.
----------------------
File: data/file_5.txt
Take liquid out of a container or well.
----------------------


In [30]:
host = "<HOST>"
password = "<PASSWORD>"
port = "3306"
cluster = host + ":" + port

In [31]:
spark.conf.set("spark.datasource.singlestore.ddlEndpoint", cluster)
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", password)
spark.conf.set("spark.datasource.singlestore.disablePushdown", "false")

In [34]:
%%sql
DROP DATABASE IF EXISTS spark_demo;
CREATE DATABASE IF NOT EXISTS spark_demo;

In [36]:
%%sql
USE spark_demo;

DROP TABLE IF EXISTS streaming;
CREATE TABLE IF NOT EXISTS streaming (
     value TEXT,
     file_name TEXT,
     embedding VECTOR(1536) NOT NULL
);

In [40]:
from pyspark.sql.functions import input_file_name, udf
from pyspark.sql.types import StringType

openai.api_key = os.environ.get("OPENAI_API_KEY")

# Generate embeddings for text
def generate_embeddings(text):
    # Generate embeddings for text using OpenAI
    return openai.Embedding.create(
        input = text,
        engine = "text-embedding-3-small"
    ).data[0].embedding

# Register the function as a UDF
generate_embeddings_udf = udf(generate_embeddings, StringType())

In [42]:
import time

input_dir = output_dir

# Read from the directory
df = (spark.readStream
    .format("text")
    .option("path", input_dir)
    .load()
    .withColumn("file_name", input_file_name())
)

# Apply the function to the DataFrame to generate embeddings for each row
df_with_embeddings = df.withColumn("embedding", generate_embeddings_udf("value"))

# Write each batch of data to SingleStore
def write_to_singlestore(df_with_embeddings, epoch_id):
    (df_with_embeddings.write
         .format("singlestore")
         .option("loadDataCompression", "LZ4")
         .mode("append")
         .save("spark_demo.streaming")
    )

# Write the streaming DataFrame to SingleStore using foreachBatch
query = (df_with_embeddings.writeStream
    .foreachBatch(write_to_singlestore)
    .start()
)

# Wait for the query to finish processing
while query.isActive:
    time.sleep(1)
    if not query.status["isDataAvailable"]:
        query.stop()

                                                                                

In [45]:
%%sql
USE spark_demo;

SELECT
    SUBSTR(value, 1, 30) AS value,
    SUBSTR(file_name, LENGTH(file_name) - 9) AS file_name,
    SUBSTR(embedding, 1, 50) AS embedding
FROM streaming;

value,file_name,embedding
Take liquid out of a container,file_5.txt,"[0.0102875428,0.00111957663,-0.0203577913,0.019013"
A small vehicle with four whee,file_1.txt,"[0.0256429948,-0.00674873963,-0.0440164097,0.03244"
Lean dried meat pounded fine a,file_4.txt,"[0.00122959982,0.00205766363,-0.0188605282,0.02135"
A symbol in a logical or mathe,file_3.txt,"[0.0347843841,-0.0457727574,-0.0495568328,0.009357"
Make a duplicate or duplicates,file_2.txt,"[0.0421960168,-0.0164516736,0.0258004908,-0.007278"


In [46]:
spark.stop()