Skip to content

Commit

Permalink
Merge pull request #7 from decodingml/module-3
Browse files Browse the repository at this point in the history
Module 3
  • Loading branch information
alexandruvesa committed Apr 20, 2024
2 parents 7c884df + f872fc9 commit 4633cb3
Show file tree
Hide file tree
Showing 43 changed files with 5,269 additions and 6 deletions.
Binary file removed .DS_Store
Binary file not shown.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,6 @@ cython_debug/
# IDEs
.idea/
.vscode

# MacOS
.DS_Store
Binary file removed course/.DS_Store
Binary file not shown.
2 changes: 1 addition & 1 deletion course/module-1/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

def handler(event, context: LambdaContext) -> dict[str, Any]:
first_name, last_name = lib.user_to_names(event.get("user"))

user = UserDocument.get_or_create(first_name=first_name, last_name=last_name)

link = event.get("link")
Expand Down
4 changes: 3 additions & 1 deletion course/module-2/cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
from db.mongo import MongoDatabaseConnector

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)


def stream_process():
Expand Down
12 changes: 10 additions & 2 deletions course/module-2/data_flow/mq.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ class RabbitMQConnection:
_instance = None

def __new__(
cls, host: str = None, port: int = None, username: str = None, password: str = None, virtual_host: str = "/"
cls,
host: str = None,
port: int = None,
username: str = None,
password: str = None,
virtual_host: str = "/",
):
if not cls._instance:
cls._instance = super().__new__(cls)
Expand Down Expand Up @@ -45,7 +50,10 @@ def connect(self):
credentials = pika.PlainCredentials(self.username, self.password)
self._connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=self.host, port=self.port, virtual_host=self.virtual_host, credentials=credentials
host=self.host,
port=self.port,
virtual_host=self.virtual_host,
credentials=credentials,
)
)
except pika.exceptions.AMQPConnectionError as e:
Expand Down
4 changes: 3 additions & 1 deletion course/module-2/db/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ def __new__(cls, *args, **kwargs):
print(f"Couldn't connect to the database: {str(e)}")
raise

print(f"Connection to database with uri: {settings.MONGO_DATABASE_HOST} successful")
print(
f"Connection to database with uri: {settings.MONGO_DATABASE_HOST} successful"
)
return cls._instance

def get_database(self):
Expand Down
1 change: 0 additions & 1 deletion course/module-2/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@


class AppSettings(BaseSettings):

# MongoDB configs
MONGO_DATABASE_HOST: str = "mongodb://localhost:30001,localhost:30002,localhost:30003/?replicaSet=my-replica-set"
MONGO_DATABASE_NAME: str = "admin"
Expand Down
21 changes: 21 additions & 0 deletions course/module-3/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
EMBEDDING_MODEL_ID="sentence-transformers/all-MiniLM-L6-v2"
EMBEDDING_MODEL_MAX_INPUT_LENGTH=256
EMBEDDING_SIZE=384
EMBEDDING_MODEL_DEVICE="cpu"

# MongoDB configs
MONGO_DATABASE_HOST="mongodb://localhost:30001,localhost:30002,localhost:30003/?replicaSet=my-replica-set"
MONGO_DATABASE_NAME="scrabble"

# QdrantDB config
QDRANT_DATABASE_HOST="localhost"
QDRANT_DATABASE_PORT=6333
CLEANED_DATA_OUTPUT_COLLECTION_NAME="cleaned_posts"
QDRANT_APIKEY=

# MQ config
RABBITMQ_DEFAULT_USERNAME="guest"
RABBITMQ_DEFAULT_PASSWORD="guest"
RABBITMQ_HOST="localhost"
RABBITMQ_PORT= 5673

161 changes: 161 additions & 0 deletions course/module-3/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

data/
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock

# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml

# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
17 changes: 17 additions & 0 deletions course/module-3/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
help:
@grep -E '^[a-zA-Z0-9 -]+:.*#' Makefile | sort | while read -r l; do printf "\033[1;32m$$(echo $$l | cut -f 1 -d':')\033[00m:$$(echo $$l | cut -f 2- -d'#')\n"; done

local-start-infra: # Buil and start mongodb, mq and qdrant.
docker-compose -f docker-compose.yml up --build -d

local-stop-infra: # Stop mongodb, mq and qdrant.
docker-compose -f docker-compose.yml down

local-start-cdc: # Start CDC system
poetry run python cdc.py

local-insert-data-mongo: #Insert data to mongodb
poetry run python insert_data_mongo.py

local-bytewax: # Run bytewax pipeline
poetry run python -m bytewax.run data_flow/bytewax_pipeline
1 change: 1 addition & 0 deletions course/module-3/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
TBD
33 changes: 33 additions & 0 deletions course/module-3/cdc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import json

from bson import json_util

from data_flow.mq import RabbitMQConnection
from db.mongo import MongoDatabaseConnector


def stream_process():
mq_connection = RabbitMQConnection()
mq_connection.connect()

client = MongoDatabaseConnector()

db = client["scrabble"]

changes = db.watch(
[{"$match": {"operationType": {"$in": ["insert"]}}}]
) # Filter for inserts only
for change in changes:
data_type = change["ns"]["coll"]
entry_id = str(change["fullDocument"]["_id"]) # Convert ObjectId to string
change["fullDocument"].pop("_id")
change["fullDocument"]["type"] = data_type
change["fullDocument"]["entry_id"] = entry_id

# Use json_util to serialize the document
data = json.dumps(change["fullDocument"], default=json_util.default)
mq_connection.publish_message(data=data, queue="mongo_data")


if __name__ == "__main__":
stream_process()
Empty file.
30 changes: 30 additions & 0 deletions course/module-3/data_flow/bytewax_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import bytewax.operators as op
from bytewax.dataflow import Dataflow
from data_flow.stream_input import RabbitMQSource
from data_flow.stream_output import QdrantOutput
from data_logic.dispatchers import (
ChunkingDispatcher,
CleaningDispatcher,
EmbeddingDispatcher,
RawDispatcher,
)
from db.qdrant import connection

flow = Dataflow("Streaming ingestion pipeline")
stream = op.input("input", flow, RabbitMQSource())
stream = op.map("raw dispatch", stream, RawDispatcher.handle_mq_message)
stream = op.map("clean dispatch", stream, CleaningDispatcher.dispatch_cleaner)
op.output(
"cleaned data insert to qdrant",
stream,
QdrantOutput(connection=connection, sink_type="clean"),
)
stream = op.flat_map("chunk dispatch", stream, ChunkingDispatcher.dispatch_chunker)
stream = op.map(
"embedded chunk dispatch", stream, EmbeddingDispatcher.dispatch_embedder
)
op.output(
"embedded data insert to qdrant",
stream,
QdrantOutput(connection=connection, sink_type="vector"),
)
Loading

0 comments on commit 4633cb3

Please sign in to comment.