Skip to content

Commit

Permalink
Merge branch 'main' into hybrid_search_reword_and_rerank
Browse files Browse the repository at this point in the history
  • Loading branch information
davidgxue committed Jan 9, 2024
2 parents ca74cdd + 65eb4fe commit 992beb5
Show file tree
Hide file tree
Showing 21 changed files with 478 additions and 737 deletions.
4 changes: 4 additions & 0 deletions airflow/dags/feedback/find_example_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Any

from google.cloud import firestore
from include.utils.slack import send_failure_notification
from langchain.evaluation import StringEvaluator, load_evaluator
from langchain.evaluation.schema import EvaluatorType
from langsmith import Client
Expand Down Expand Up @@ -119,6 +120,9 @@ def process_run(run: dict[str, Any]):
start_date=datetime(2023, 1, 1),
default_args=default_args,
catchup=False,
on_failure_callback=send_failure_notification(
dag_id="{{ dag.dag_id }}", execution_date="{{ dag_run.execution_date }}"
),
)
def find_example_runs():
begin = EmptyOperator(task_id="begin")
Expand Down
33 changes: 18 additions & 15 deletions airflow/dags/ingestion/ask-astro-forum-load.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import datetime
import os

from include.tasks import split
from include.tasks.extract.astro_forum_docs import get_forum_df
from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook
from include.utils.slack import send_failure_notification

from airflow.decorators import dag, task
from airflow.providers.weaviate.operators.weaviate import WeaviateDocumentIngestOperator

ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")
ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)

blog_cutoff_date = datetime.date(2022, 1, 1)

Expand All @@ -22,6 +20,8 @@

@task
def get_astro_forum_content():
from include.tasks.extract.astro_forum_docs import get_forum_df

return get_forum_df()


Expand All @@ -31,21 +31,24 @@ def get_astro_forum_content():
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
on_failure_callback=send_failure_notification(
dag_id="{{ dag.dag_id }}", execution_date="{{ dag_run.execution_date }}"
),
)
def ask_astro_load_astro_forum():
from include.tasks import split

split_docs = task(split.split_html).expand(dfs=[get_astro_forum_content()])

_import_data = (
task(ask_astro_weaviate_hook.ingest_data, retries=10)
.partial(
class_name=WEAVIATE_CLASS,
existing="upsert",
doc_key="docLink",
batch_params={"batch_size": 1000},
verbose=True,
)
.expand(dfs=[split_docs])
)
_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
).expand(input_data=[split_docs])


ask_astro_load_astro_forum()
31 changes: 16 additions & 15 deletions airflow/dags/ingestion/ask-astro-load-airflow-docs.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
import os
from datetime import datetime

from include.tasks import split
from include.tasks.extract import airflow_docs
from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook
from include.utils.slack import send_failure_notification

from airflow.decorators import dag, task
from airflow.providers.weaviate.operators.weaviate import WeaviateDocumentIngestOperator

ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")

ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)

airflow_docs_base_url = "https://airflow.apache.org/docs/"

Expand All @@ -27,29 +25,32 @@
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
on_failure_callback=send_failure_notification(
dag_id="{{ dag.dag_id }}", execution_date="{{ dag_run.execution_date }}"
),
)
def ask_astro_load_airflow_docs():
"""
This DAG performs incremental load for any new Airflow docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""
from include.tasks import split
from include.tasks.extract import airflow_docs

extracted_airflow_docs = task(airflow_docs.extract_airflow_docs)(docs_base_url=airflow_docs_base_url)

split_md_docs = task(split.split_html).expand(dfs=[extracted_airflow_docs])

_import_data = (
task(ask_astro_weaviate_hook.ingest_data, retries=10)
.partial(
class_name=WEAVIATE_CLASS,
existing="upsert",
doc_key="docLink",
batch_params={"batch_size": 1000},
verbose=True,
)
.expand(dfs=[split_md_docs])
)
_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
).expand(input_data=[split_md_docs])


ask_astro_load_airflow_docs()
31 changes: 16 additions & 15 deletions airflow/dags/ingestion/ask-astro-load-astro-cli.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import datetime
import os

from include.tasks import split
from include.tasks.extract import astro_cli_docs
from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook
from include.utils.slack import send_failure_notification

from airflow.decorators import dag, task
from airflow.providers.weaviate.operators.weaviate import WeaviateDocumentIngestOperator

ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")
ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)

default_args = {"retries": 3, "retry_delay": 30}

Expand All @@ -24,28 +22,31 @@
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
on_failure_callback=send_failure_notification(
dag_id="{{ dag.dag_id }}", execution_date="{{ dag_run.execution_date }}"
),
)
def ask_astro_load_astro_cli_docs():
"""
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""
from include.tasks import split
from include.tasks.extract import astro_cli_docs

extract_astro_cli_docs = task(astro_cli_docs.extract_astro_cli_docs)()
split_md_docs = task(split.split_html).expand(dfs=[extract_astro_cli_docs])

_import_data = (
task(ask_astro_weaviate_hook.ingest_data, retries=10)
.partial(
class_name=WEAVIATE_CLASS,
existing="upsert",
doc_key="docLink",
batch_params={"batch_size": 1000},
verbose=True,
)
.expand(dfs=[split_md_docs])
)
_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
).expand(input_data=[split_md_docs])


ask_astro_load_astro_cli_docs()
30 changes: 16 additions & 14 deletions airflow/dags/ingestion/ask-astro-load-astro-sdk.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import datetime
import os

from include.tasks.extract.astro_sdk_docs import extract_astro_sdk_docs
from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook
from include.utils.slack import send_failure_notification

from airflow.decorators import dag, task
from airflow.providers.weaviate.operators.weaviate import WeaviateDocumentIngestOperator

ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")
ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)

blog_cutoff_date = datetime.date(2023, 1, 19)

Expand All @@ -21,6 +20,8 @@

@task
def get_astro_sdk_content():
from include.tasks.extract.astro_sdk_docs import extract_astro_sdk_docs

dfs = extract_astro_sdk_docs()
return dfs

Expand All @@ -31,19 +32,20 @@ def get_astro_sdk_content():
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
on_failure_callback=send_failure_notification(
dag_id="{{ dag.dag_id }}", execution_date="{{ dag_run.execution_date }}"
),
)
def ask_astro_load_astro_sdk():
_import_data = (
task(ask_astro_weaviate_hook.ingest_data, retries=10)
.partial(
class_name=WEAVIATE_CLASS,
existing="upsert",
doc_key="docLink",
batch_params={"batch_size": 1000},
verbose=True,
)
.expand(dfs=[get_astro_sdk_content()])
)
_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
).expand(input_data=[get_astro_sdk_content()])


ask_astro_load_astro_sdk()
52 changes: 52 additions & 0 deletions airflow/dags/ingestion/ask-astro-load-astronomer-docs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import datetime
import os

from include.utils.slack import send_failure_notification

from airflow.decorators import dag, task
from airflow.providers.weaviate.operators.weaviate import WeaviateDocumentIngestOperator

ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")


default_args = {"retries": 3, "retry_delay": 30}

schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@dag(
schedule_interval=schedule_interval,
start_date=datetime.datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
on_failure_callback=send_failure_notification(
dag_id="{{ dag.dag_id }}", execution_date="{{ dag_run.execution_date }}"
),
)
def ask_astro_load_astronomer_docs():
"""
This DAG performs incremental load for any new docs in astronomer docs.
"""
from include.tasks import split
from include.tasks.extract.astro_docs import extract_astro_docs

astro_docs = task(extract_astro_docs)()

split_md_docs = task(split.split_markdown).expand(dfs=[astro_docs])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
).expand(input_data=[split_md_docs])


ask_astro_load_astronomer_docs()
30 changes: 16 additions & 14 deletions airflow/dags/ingestion/ask-astro-load-astronomer-provider.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import datetime
import os

from include.tasks.extract.astronomer_providers_docs import extract_provider_docs
from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook
from include.utils.slack import send_failure_notification

from airflow.decorators import dag, task
from airflow.providers.weaviate.operators.weaviate import WeaviateDocumentIngestOperator

ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")
ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)

blog_cutoff_date = datetime.date(2023, 1, 19)

Expand All @@ -21,6 +20,8 @@

@task
def get_provider_content():
from include.tasks.extract.astronomer_providers_docs import extract_provider_docs

dfs = extract_provider_docs()
return dfs

Expand All @@ -31,6 +32,9 @@ def get_provider_content():
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
on_failure_callback=send_failure_notification(
dag_id="{{ dag.dag_id }}", execution_date="{{ dag_run.execution_date }}"
),
)
def ask_astro_load_astronomer_providers():
"""
Expand All @@ -39,17 +43,15 @@ def ask_astro_load_astronomer_providers():
any existing documents that have been updated will be removed and re-added.
"""

_import_data = (
task(ask_astro_weaviate_hook.ingest_data, retries=10)
.partial(
class_name=WEAVIATE_CLASS,
existing="upsert",
doc_key="docLink",
batch_params={"batch_size": 1000},
verbose=True,
)
.expand(dfs=[get_provider_content()])
)
_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
).expand(input_data=[get_provider_content()])


ask_astro_load_astronomer_providers()
Loading

0 comments on commit 992beb5

Please sign in to comment.