Skip to content

Commit

Permalink
Support MySQL
Browse files Browse the repository at this point in the history
  • Loading branch information
ayemos committed Jun 30, 2017
1 parent b8f9f78 commit cc68ca7
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 32 deletions.
2 changes: 1 addition & 1 deletion akagi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# -*- coding: utf-8 -*-
__author__ = """Yuichiro Someya"""
__email__ = 'ayemos.y@gmail.com'
__version__ = '0.1.15'
__version__ = '0.1.16'


from akagi import data_file
Expand Down
9 changes: 3 additions & 6 deletions akagi/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
class DataSource(object):
'''DataSource is an base class of all data sources
'''

def __init__(self, bundle):
self.bundle = bundle

def save(self, tar_dir, force=False):
paths = []

Expand All @@ -35,7 +31,8 @@ def __enter__(self):
return self

def __exit__(self, *exc):
raise NotImplementedError
return False

def __iter__(self):
return iter(self.bundle)
raise NotImplementedError

35 changes: 16 additions & 19 deletions akagi/data_sources/mysql_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,39 @@ def for_query(cls, query, db_conf={}):

return MySQLDataSource(query, db_conf)

def __init__(self, query, db_conf={}):
def __init__(self, query, db_conf={}, keep_connection=False):
self.query = query
self.__db_conf = db_conf
self.__cursor = None
self.__connection = None
self._keep_connection = keep_connection

@property
def _connection(self):
return MySQLdb.connect(**self._db_conf)
if self._keep_connection:
if self.__connection is None:
self.__connection = MySQLdb.connect(**self._db_conf)

return self.__connection
else:
return MySQLdb.connect(**self._db_conf)

def __iter__(self):
self.__result = []
logger.info("Query sent to Redshift")
logger.info("\n" + self.query.body + "\n")

self._cursor.execute(self.query.body)

return iter(self._cursor.fetchall())
c = self._connection.cursor()
logger.info("Executing query...")
c.execute(self.query.body)
logger.info("Finished.")

@property
def _cursor(self):
if self.__cursor is None:
self.__cursor = self._connection.cursor()

return self.__cursor
return iter(c.fetchall())

@property
def _db_conf(self):
conf = {
'host': self.__db_conf.get('host') or os.getenv('MYSQL_DB_HOST', 'localhost'),
'user': self.__db_conf.get('user') or os.getenv('MYSQL_DB_USER'),
'passwd': self.__db_conf.get('password') or os.getenv('MYSQL_DB_PASS'),
'db': self.__db_conf.get('db') or os.getenv('MYSQL_DB_PASS', os.getenv('USER')),
'db': self.__db_conf.get('db') or os.getenv('MYSQL_DB_NAME'),
'port': self.__db_conf.get('port') or os.getenv('MYSQL_DB_PORT', 3306),
'unix_socket': self.__db_conf.get('unix_socket') or os.getenv('MYSQL_DB_SOCKET')
}
return {k: v for k, v in six.iteritems(conf) if v is not None}

def __exit__(self, *exc):
return False
8 changes: 6 additions & 2 deletions akagi/data_sources/redshift_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def for_query(cls, query, schema, table, bucket_name, db_conf={}, sort=False, ac
return RedshiftDataSource(bundle, query, db_conf, activate)

def __init__(self, bundle, query, db_conf={}, activate=True):
super(RedshiftDataSource, self).__init__(bundle)
self.bundle = bundle
self.query = query
self.__db_conf = db_conf
self.__pgpass = None
Expand All @@ -33,9 +33,10 @@ def __init__(self, bundle, query, db_conf={}, activate=True):
def activate(self):
logger.info("Deleting old files on s3...")
self.bundle.clear()
logger.info("Sending query to Redshift")
logger.info("Executing query on Redshift")
logger.debug("\n" + self.query.body + "\n") # avoid logging unload query since it has raw credentials inside
self._cursor.execute(self.query.sql)
logger.info("Finished")

@property
def _connection(self):
Expand Down Expand Up @@ -74,3 +75,6 @@ def _pgpass(self):
def __exit__(self, *exc):
self.bundle.clear()
return False

def __iter__(self):
return self.bundle
4 changes: 2 additions & 2 deletions akagi/data_sources/s3_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ def for_key(cls, bucket_name, key, file_format=FileFormat.BINARY):
def __init__(self, bundle):
self.bundle = bundle

def __exit__(self, *exc):
return False
def __iter__(self):
return self.bundle
Binary file removed classes_akagi.akagi.png
Binary file not shown.
Binary file removed packages_akagi.akagi.png
Binary file not shown.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.1.15
current_version = 0.1.16
commit = True
tag = True

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

setup(
name='akagi',
version='0.1.15',
version='0.1.16',
description="Codenize your data sources",
long_description=readme,
author="Yuichiro Someya",
Expand Down

0 comments on commit cc68ca7

Please sign in to comment.