First we need to import the necessary classes for our ETL process.

In [None]:
from ETL import BaseExtractor, BaseLoader, BaseTransformer
import logging
logging.getLogger().setLevel(logging.INFO)

Set up the parameters needed. The archives parameter indicates which stackexchange archives we are going to be extracting.

In [None]:
archives = ['woodworking','ai']
username = 'ticketswap_user'
password = 'xxx'
server = 'ticketswap-stackexchange-dwh.csboebiw4ypn.eu-west-3.rds.amazonaws.com'
port = '1433'
database = 'ticketswap-stackexchange-dwh'

First extract the data with the BaseExtractor. Because we want to download data from the stackexchange archives we use the method 'from_stackexchange'.

In [None]:
extractor  = BaseExtractor()
for archive in archives:
    extractor.from_stackexchange(archive)

Then transform the tables. \
In our case this means dropping some columns because they dont fit in the SQL tables or they are not necessary for our data analysis. \
Because we use the same transformation for every table I first create a method to reduce the number of same calls to the same method. \
Here you can see the add(transformation, params) method in action. As you can see, you just need to add the name of the transformation and then the parameter it needs to add it to the transformer class. When you run the 'transform' method the partial method is executed with the DataFrame.

In [None]:

def transform_stackexchange_table(archive, tablename, cols_to_drop):
    transformer = BaseTransformer()
    transformer.add('remove_columns', cols_to_drop)
    transformer.transform(f'stackexchange\{archive}',f'{tablename}.parquet')
    
for archive in archives:
    transform_stackexchange_table(archive, 'Badges', [])
    transform_stackexchange_table(archive, 'Comments', ['text','user_display_name'])
    transform_stackexchange_table(archive, 'PostHistory', ['revision_guid','text','user_display_name','comment'])
    transform_stackexchange_table(archive, 'PostLinks', [])
    transform_stackexchange_table(archive, 'Posts', ['body', 'title', 'tags'])
    transform_stackexchange_table(archive, 'Tags', [])
    transform_stackexchange_table(archive, 'Users', ['about_me', 'location', 'profile_image_url', 'website_url'])
    transform_stackexchange_table(archive, 'Votes', [])

Finally we load the data into our SQL Server database. We use the to_sqlserver method to do this.

In [None]:
for archive in archives:
    loader = BaseLoader()
    loader.to_sqlserver(f'stackexchange\\{archive}', un=username, pw=password, server=server, port=port, db=database)