In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ArxivRAG") \
    .getOrCreate()

# Load the JSON file
# df = spark.read.json("../data/reduce.json")
df = spark.read.json("../data/arxiv-metadata-oai-snapshot.json")

# Show schema
# df.printSchema()

print(df.count())

# Show some sample rows
# df.select("id", "title", "abstract", "categories", "update_date", "versions").show(5, truncate=False)

In [None]:


# Load the JSON file
# df = spark.read.json("../arxiv-metadata-oai-snapshot.json")

# Show schema
# df.printSchema()

df.describe().show()
# print(df.count())
# df_copy = df.limit(10000)
# df_copy.write.format("json") \
#     .mode("append") \
#     .save("test3.json")

In [None]:
from pyspark.sql.functions import col, concat_ws, lower, regexp_replace, trim
from pyspark.sql.functions import monotonically_increasing_id, col


# Combine title and abstract into a 'document' field
df_cleaned = df.select(
    "id", "title", "abstract", "versions", "authors"
).withColumn(
    "document",
    concat_ws(" ", col("title"), col("abstract"))
).withColumn(
    "document",
    lower(regexp_replace(col("document"), r"[^a-zA-Z0-9\s]", ""))
).withColumn(
    "document", trim(col("document"))
).withColumn(
    "row_id", monotonically_increasing_id()
).withColumn(
    "year", col("versions")[0]["created"].substr(-17, 4)
)
print(df_cleaned.count())
# Filter out empty documents
# df_cleaned = df_cleaned.filter(col("document") != "")
# print(df_cleaned.count())
df_cleaned = df_cleaned.filter(col("abstract") is not None)
print(df_cleaned.count())
# df_cleaned.groupBy("year").count().orderBy("count").show(100)
# Optional: Sample 10,000 rows for development
# df_sample = df_cleaned.limit(1)

# df_sample = df_cleaned.filter((col("year") == 1988) | (col("year") == 1986)|(col("year") == 1989)|(col("year") == 1990)|(col("year") == 1991)|(col("year") == 1992))

# Show a few processed rows
# df_sample.show()

In [None]:

df_cleaned.select("year").distinct().count()

In [None]:
# df_cleaned.describe().show()
# Show the number of unique categories
df_cleaned.select("year").distinct().count()
df_cleaned.select("year").distinct().show()

In [None]:
# from sentence_transformers import SentenceTransformer
from langchain.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma

import pandas as pd

# Convert Spark DataFrame to Pandas
pandas_df = df_sample.select("id", "document", "year", "title", "authors").toPandas()

In [None]:
pandas_df.shape
# pandas_df[pandas_df["year"].apply(lambda x: int(x) > 1960 and int(x) < 2026)].value_counts()

In [None]:
# Load sentence transformer model
# model = SentenceTransformer('all-MiniLM-L6-v2')
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2")

In [None]:
import chromadb
chroma_client = chromadb.HttpClient(host='34.163.106.5', port=8000)
chroma_client.heartbeat()

In [None]:
# Embed the documents
# embeddings = model.encode(pandas_df['document'].tolist(), show_progress_bar=True)

metadata = [{"id": row["id"], "year": row["year"], "title": row["title"], "authors": row["authors"]} for _, row in pandas_df.iterrows()]
# print(metadata, "test")
vectorstore = Chroma.from_texts(pandas_df['document'].tolist(),embedding=embeddings,metadatas = metadata, ids=pandas_df['id'].tolist(), client=chroma_client)


In [None]:

df_sample = df_cleaned.filter(col("row_id") >= 1020).drop("row_id").limit(1)

# # Show a few processed rows
df_sample.select("id", "categories", "document", "year", "title", "authors").show(5, truncate=150)

In [None]:
# Convert Spark DataFrame to Pandas
pandas_df = df_sample.select("id", "document", "year", "title", "authors").toPandas()


metadata = [{"id": row["id"], "year": row["year"], "title": row["title"], "authors": row["authors"]} for _, row in pandas_df.iterrows()]
vectorstore.add_texts(pandas_df['document'].tolist(),embedding=embeddings,metadatas = metadata, ids=pandas_df['id'].tolist())

In [None]:
retriever = vectorstore.as_retriever()

# test query
query = "field"
retrieved_docs = retriever.get_relevant_documents(query)
i = 0
for doc in retrieved_docs:
    print(i, doc.page_content)
    print("\n")
    i +=1

In [None]:
! pip install --upgrade google-cloud-storage

In [None]:
spark.sparkContext

In [None]:
from google.cloud import storage
import os

MODEL_NAME = 'sentence-transformers/all-mpnet-base-v2'
BUCKET_NAME = "arxiv-researcher-bucket"
GCS_PERSIST_PATH = "chroma_db/"
LOCAL_PERSIST_PATH = "./local_chromadb/"

REPO_ID = "mistralai/Mistral-7B-Instruct-v0.2"

storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET_NAME)
blobs = bucket.list_blobs(prefix=GCS_PERSIST_PATH)

# Download Chroma persisted data from GCS to local directory
for blob in blobs:
    print(blob.name)
    # if not blob.name.endswith("/"):  # Avoid directory blobs
    #     relative_path = os.path.relpath(blob.name, GCS_PERSIST_PATH)
    #     local_file_path = os.path.join(LOCAL_PERSIST_PATH, relative_path)
    #     os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
    #     blob.download_to_filename(local_file_path)

In [None]:
import chromadb
chroma_client = chromadb.HttpClient(host='34.163.92.97', port=8000)
chroma_client.heartbeat()

In [None]:
import arxiv
from datetime import datetime, timedelta
# Construct the default API client.
client = arxiv.Client()

today = datetime.today()
print(today)
# now = datetime.datetime.now()
d = today - timedelta(days=7)
print(d)
today = '%04d%02d%02d0600' % (today.year, today.month, today.day)
d='%04d%02d%02d0600' % (d.year, d.month, d.day)
print(f"submittedDate:[{d}+TO+{today}]")
# Search for the 10 most recent articles matching the keyword "quantum."
# search = arxiv.Search(
#   query = "quantum",
#   max_results = 10,
#   sort_by = arxiv.SortCriterion.SubmittedDate
# )

# results = client.results(search)

# # `results` is a generator; you can iterate over its elements one by one...
# for r in client.results(search):
#   print(r.title)
# # ...or exhaust it into a list. Careful: this is slow for large results sets.
# all_results = list(results)
# print([r.title for r in all_results])

# For advanced query syntax documentation, see the arXiv API User Manual:
# https://arxiv.org/help/api/user-manual#query_details
# submittedDate:[202301010600+TO+202401010600]
search = arxiv.Search(query = f"submittedDate:[{d} TO {today}]")
results = client.results(search)
results = [
    {
        "title": r.title,
        "year": r.updated.year,
        "authors": ", ".join(a.name for a in r.authors),
        "summary": r.summary,
        "id": r.entry_id.split("abs/")[-1],  # 👈 prend l’ID complet (avec catégorie si elle y est)
    }
    for r in client.results(search)
]
results


In [None]:
# Étape 2 : transformer en DataFrame Spark
df = spark.createDataFrame(results)

df_cleaned = df.select(
    "id", "title", "summary", "authors", "year"
).withColumn(
    "document",
    concat_ws(" ", col("title"), col("summary"))
).withColumn(
    "document",
    lower(regexp_replace(col("document"), r"[^a-zA-Z0-9\s]", ""))
).withColumn(
    "document", trim(col("document"))
)

# (Optionnel) afficher les données
df_cleaned.show(truncate=False)

In [None]:
pandas_df = df_cleaned.select("id", "document", "year", "title", "authors").toPandas()


In [None]:
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2")

import chromadb
chroma_client = chromadb.HttpClient(host='34.163.106.5', port=8000)
chroma_client.heartbeat()

In [None]:

metadata = [{"id": row["id"], "year": row["year"], "title": row["title"], "authors": row["authors"]} for _, row in pandas_df.iterrows()]
# print(metadata, "test")
vectorstore = Chroma.from_texts(pandas_df['document'].tolist(),embedding=embeddings,metadatas = metadata, ids=pandas_df['id'].tolist(), client=chroma_client)
