Skip to content
This repository has been archived by the owner on Aug 18, 2022. It is now read-only.

Commit

Permalink
Merge branch 'master' into DATAPIPE-2008-abrar-license
Browse files Browse the repository at this point in the history
  • Loading branch information
abrarsheikh committed Nov 4, 2016
2 parents f9e26ab + e311ab2 commit 2269252
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 77 deletions.
85 changes: 69 additions & 16 deletions README
Original file line number Diff line number Diff line change
@@ -1,21 +1,74 @@
README for the replication handler
# MySQL Streamer

To make a virtualenv consistent with what will be running the batch and have all testing tools:
make venv-dev

Manual testing (TODO remove when acceptance testing works):
fig build
fig up -d
mysql -h 127.0.0.1 -u yelpdev -D yelp -P 8001 < /scratch/ryani/business_small.sql
python replication_handler/batch/parse_replication_stream.py -v --no-notification
What is it?
-----------
MySQLStreamer is a database change data capture and publish system.
It’s responsible for capturing each individual database change,
enveloping them into messages and publishing to Kafka.

make sure it prints out the topic that would be published to kafka

Setting up the docs on dev:
cd docs/build/html
python -m SimpleHTTPServer 8888
visit http://<your dev machine>:8888/index.html, you will have mini docs server up, maybe useful.
[Read More](https://engineeringblog.yelp.com/2016/08/streaming-mysql-tables-in-real-time-to-kafka.html)

Why there is a directory devdbs/ instead of using standard dev database(s):
When we started the development of replication handler, dbas have not upgrade the dbs to MySQL5.6,
it seemed easier to just use Docker to setup some databases for development and testing.

How to download
---------------
```
git clone git@github.com:Yelp/mysql_streamer.git
```


Requirements
------------
For Ubuntu 14.04
```
run apt-get update && apt-get upgrade -y && apt-get install -y \
build-essential \
python-dev \
libmysqlclient-dev \
python-pkg-resources \
python-setuptools \
python-virtualenv \
python-pip \
libpq5 \
libpq-dev \
wget \
language-pack-en-base \
uuid-dev \
git-core \
mysql-client-5.5
```


Tests
-----
Running unit test
```
make -f Makefile-opensource test
```


Running integration test
```
make -f Makefile-opensource itest
```


Demo
----
```
make -f Makefile-opensource interactive-streamer
```
![MySQL Streamer Demo](https://giant.gfycat.com/AdmiredLiveCopepod.gif)

In the bottom pane, you can execute any mysql statement on a barebones structure we’ve set up. Try creating a table and inserting into it to see it be sent (seen in the replication handler logs in the top right), and then received messages in the top left (seen in the kafka tailer’s logs). The received messages will probably look a little funky - this is because they’re avro-encoded.


License
-------
MySQL Streamer is licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0


Contributing
------------
Everyone is encouraged to contribute to MySQL Streamer by forking the Github repository and making a pull request or opening an issue.
3 changes: 2 additions & 1 deletion replication_handler/components/recovery_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ def get_latest_source_log_position(self):

def recover(self):
""" Handles the recovery procedure. """
if self.activate_mysql_dump_recovery:
if (self.activate_mysql_dump_recovery
and self.mysql_dump_handler.mysql_dump_exists()):
self.mysql_dump_handler.recover()
else:
self._handle_pending_schema_event()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ def _get_pending_schema_event_state(self, cluster_name):
# In services we cant do expire_on_commit=False, so
# if we want to use the object after the session commits, we
# need to figure out a way to hold it. for more context:
# https://trac.yelpcorp.com/wiki/JulianKPage/WhyNoExpireOnCommitFalse
return copy.copy(
SchemaEventState.get_pending_schema_event_state(
session,
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
description='',
author='BAM',
author_email='bam@yelp.com',
url='https://gitweb.yelpcorp.com/?p=replication_handler.git',
url='https://github.com/Yelp/mysql_streamer',
packages=find_packages(exclude=['tests']),
setup_requires=['setuptools'],
install_requires=[
Expand Down
100 changes: 42 additions & 58 deletions tests/integration/failure_recovery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ def mock_repl_handler_configs(
'container_name': 'failure_test',
'container_env': 'dev_box',
'disable_meteorite': True,
'gtid_enabled': gtid_enabled
'gtid_enabled': gtid_enabled,
'activate_mysql_dump_recovery': True
}

@pytest.fixture
Expand Down Expand Up @@ -494,7 +495,7 @@ def test_unclean_shutdown_schema_event(
self,
containers_without_repl_handler,
rbrsource,
rbrstate,
schematracker,
schematizer,
namespace,
start_service,
Expand All @@ -503,11 +504,10 @@ def test_unclean_shutdown_schema_event(
"""This tests the recovery of the service if it fails when executing an
schema event.
A failure is triggered intentionally when an alter table to add column
event is being handled. The test asserts that the service marks that
as PENDING in the schema_event_state table and after it restarts, it
processes that event again and changes the state of that event to
COMPLETED. It also asserts that it doesn't miss out on any event and
doesn't process an event more than once.
event is being handled. The test asserts that after the service restarts
it processes that event again and processes the schema event. It also
asserts that it doesn't miss out on any event and doesn't process an
event more than once.
"""
table_name = "ministry_of_magic_{r}".format(r=self.get_random_string())
create_query = """
Expand Down Expand Up @@ -569,20 +569,31 @@ def test_unclean_shutdown_schema_event(
num_of_schema_events=0,
)

saved_schema_state = execute_query_get_all_rows(
execute_query_get_one_row(
containers_without_repl_handler,
rbrstate,
"SELECT * FROM {table} WHERE table_name=\"{name}\"".format(
table='schema_event_state',
name=table_name
)
schematracker,
"DROP TABLE {table}".format(table=table_name)
)
saved_alter_query = saved_schema_state[0]['query']
saved_status = saved_schema_state[0]['status']

assert ' '.join(saved_alter_query.split()) == ' '.join(
add_col_query.format(table=table_name).split())
assert saved_status == SchemaEventStatus.PENDING
execute_query_get_one_row(
containers_without_repl_handler,
schematracker,
create_query.format(table=table_name)
)

tracker_create_table = execute_query_get_all_rows(
containers_without_repl_handler,
schematracker,
"SHOW CREATE TABLE {table}".format(table=table_name)
)[0]['Create Table']

source_create_table = execute_query_get_all_rows(
containers_without_repl_handler,
rbrsource,
"SHOW CREATE TABLE {table}".format(table=table_name)
)[0]['Create Table']

assert source_create_table != tracker_create_table

start_service(
resume_stream=True,
Expand All @@ -592,20 +603,13 @@ def test_unclean_shutdown_schema_event(
num_of_schema_events=100,
)

saved_schema_state = execute_query_get_all_rows(
tracker_create_table = execute_query_get_all_rows(
containers_without_repl_handler,
rbrstate,
"SELECT * FROM {table} WHERE table_name=\"{name}\"".format(
table='schema_event_state',
name=table_name
)
)
saved_alter_query = saved_schema_state[0]['query']
saved_status = saved_schema_state[0]['status']
schematracker,
"SHOW CREATE TABLE {table}".format(table=table_name)
)[0]['Create Table']

assert ' '.join(saved_alter_query).split() == ' '.join(
add_col_query.format(table=table_name)).split()
assert saved_status == SchemaEventStatus.COMPLETED
assert source_create_table == tracker_create_table

_fetch_messages(
containers_without_repl_handler,
Expand All @@ -615,10 +619,7 @@ def test_unclean_shutdown_schema_event(
2
)

@pytest.mark.skip(
reason="This will work only after the schema dump change is in"
)
def test_unclean_shutdown_processing_table_rename(
def test_processing_table_rename(
self,
containers_without_repl_handler,
rbrsource,
Expand All @@ -628,6 +629,9 @@ def test_unclean_shutdown_processing_table_rename(
start_service,
gtid_enabled
):
"""
This test verifies that the service handles table renames.
"""
table_name_one = "hogwarts_{r}".format(r=self.get_random_string())
table_name_two = "durmstrang_{r}".format(r=self.get_random_string())
create_table_query = """
Expand Down Expand Up @@ -690,43 +694,23 @@ def test_unclean_shutdown_processing_table_rename(

start_service(
resume_stream=True,
num_of_queries_to_process=3,
num_of_queries_to_process=5,
end_time=30,
is_schema_event_helper_enabled=False,
num_of_schema_events=100,
)
show_query = "SHOW CREATE TABLE {name}"
old_source_schema = execute_query_get_one_row(
containers_without_repl_handler,
rbrsource,
show_query.format(name=table_name_two)
)
old_tracker_schema = execute_query_get_one_row(
containers_without_repl_handler,
schematracker,
show_query.format(name=table_name_one)
)
assert old_source_schema != old_tracker_schema

start_service(
resume_stream=True,
num_of_queries_to_process=3,
end_time=30,
is_schema_event_helper_enabled=False,
num_of_schema_events=100,
)

old_source_schema = execute_query_get_one_row(
source_schema = execute_query_get_one_row(
containers_without_repl_handler,
rbrsource,
show_query.format(name=table_name_two)
)
old_tracker_schema = execute_query_get_one_row(
tracker_schema = execute_query_get_one_row(
containers_without_repl_handler,
schematracker,
show_query.format(name=table_name_two)
)
assert old_source_schema == old_tracker_schema
assert source_schema == tracker_schema

messages_two = _fetch_messages(
containers_without_repl_handler,
Expand Down

0 comments on commit 2269252

Please sign in to comment.