Skip to content
Data Pipeline Clientlib provides an interface to tail and publish to data pipeline topics.
Branch: master
Clone or download
asottile Merge pull request #12 from Yelp/dh_virtualenv_1x_compatible
Make data pipeline clientlib dh-virtualenv 1.x compatible
Latest commit e143a40 Dec 29, 2016
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
bin
data_pipeline
debian Make data pipeline clientlib dh-virtualenv 1.x compatible Dec 27, 2016
docs
logs
requirements.d
tests
yelp_package Merge branch 'master' of git.yelpcorp.com:clientlibs/data_pipeline in… Nov 15, 2016
.coveragerc
.dockerignore
.gitignore Retry when consumer encounters IOError with errno == EINTR Nov 18, 2016
.pre-commit-config.yaml
.rat-excludes
.ruby-version More CR Response May 12, 2015
.travis.yml fix the issue with environment variables in tox Nov 30, 2016
CONTRIBUTING.rst
Dockerfile
Gemfile
Gemfile.lock
Guardfile Fixed test flakiness May 13, 2015
HISTORY.rst
LICENSE
MANIFEST.in
Makefile
Makefile-opensource
NOTICE
Procfile
README.md
USAGE.rst
docker-compose.yml
ecosystem
key-1.key include a local encryption key Nov 23, 2016
requirements.txt change avro to yelp_avro in clientlib Nov 4, 2016
run_guard.sh
setup.cfg Initial version of the clientlib generated by cookiecutter Feb 4, 2015
setup.py Merge branch 'master' of github.com:abrarsheikh/data_pipeline into pr… Nov 29, 2016
setup_bundles.sh
teams.yaml
tox-opensource.ini
tox.ini
travis.yml
zookeeper_discoverydevc.yaml

README.md

Data Pipeline Clientlib

Build Status

What is it?

Data Pipeline Clientlib provides an interface to tail and publish to data pipeline topics.

Read More

How to download

git clone git@github.com:Yelp/data_pipeline.git

Tests

Running unit tests

make -f Makefile-opensource test

Configuration

Include the data_pipeline namespace in your module_env_config of config.yaml and configure following values for kafka_ip, zk_ip and schematizer_ip

module_env_config:
	...
    - namespace: data_pipeline
      config:
        kafka_broker_list:
            - <kafka_ip>:9092
        kafka_zookeeper: <zk_ip>:2181
        schematizer_host_and_port: <schematizer_ip>:8888
    ...

Usage

Registering a simple schema with the Schematizer service.

from data_pipeline.schematizer_clientlib.schematizer import get_schematizer
test_avro_schema_json = {
    "type": "record",
    "namespace": "test_namespace",
    "source": "test_source",
    "name": "test_name",
    "doc": "test_doc",
    "fields": [
        {"type": "string", "doc": "test_doc1", "name": "key1"},
        {"type": "string", "doc": "test_doc2", "name": "key2"}
    ]
}
schema_info = get_schematizer().register_schema_from_schema_json(
    namespace="test_namespace",
    source="test_source",
    schema_json=test_avro_schema_json,
    source_owner_email="test@test.com",
    contains_pii=False
)

Creating a simple Data Pipeline Message from payload data.

from data_pipeline.message import Message
message = Message(
    schema_id = schema_info.schema_id,
    payload_data = {
        'key1': 'value1',
        'key2': 'value2'
    }
)

Starting a Producer and publishing messages with it::

from data_pipeline.producer import Producer
with Producer() as producer:
    producer.publish(message)

Starting a Consumer with name my_consumer that listens for messages in all topics within the test_namespace and test_source. In this example, the consumer consumes a single message, processes it, and commits the offset.

from data_pipeline.consumer import Consumer
from data_pipeline.consumer_source import TopicInSource
consumer_source = TopicInSource("test_namespace", "test_source")
with Consumer(
    consumer_name='my_consumer',
    team_name='bam',
    expected_frequency_seconds=12345,
    consumer_source=consumer_source
) as consumer:
    while True:
        message = consumer.get_message()
        if message is not None:
            ... do stuff with message ...
            consumer.commit_message(message)

Disclaimer

We're still in the process of setting up this package as a stand-alone. There may be additional work required to run Producers/Consumers and integrate with other applications.

License

Data Pipeline Clientlib is licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0

Contributing

Everyone is encouraged to contribute to Data Pipeline Clientlib by forking the Github repository and making a pull request or opening an issue.

You can’t perform that action at this time.