In [1]:
!pip install luigi


Collecting luigi
  Downloading luigi-3.4.0.tar.gz (1.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m14.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting python-daemon (from luigi)
  Downloading python_daemon-3.0.1-py3-none-any.whl (31 kB)
Collecting lockfile>=0.10 (from python-daemon->luigi)
  Downloading lockfile-0.12.2-py2.py3-none-any.whl (13 kB)
Building wheels for collected packages: luigi
  Building wheel for luigi (setup.py) ... [?25l[?25hdone
  Created wheel for luigi: filename=luigi-3.4.0-py3-none-any.whl size=1085379 sha256=55967640b1cfa169929cce399fe7c4e7d94e310679670727e12888c5c8b4c14a
  Stored in directory: /root/.cache/pip/wheels/88/8a/ef/e4334a9a7b9355f9a1b0cb1a439a2bead64e74e4330b3a0c12
Successfully built luigi
Installing collected packages: lockfile, python-daemon, luigi
Successfully installed lockfile-0.12.2 luigi-3.4.0 python-daemon-3.0.1


In [2]:
# this function is to get missouri brewery data
# and return data in JSON
def extract()-> dict:
    """ This API extracts data from
    https://www.openbrewerydb.org/
    """
    API_URL = "https://api.openbrewerydb.org/v1/breweries?by_state=missouri"
    data = requests.get(API_URL).json()
    return data


Extract is designed to retrieve data on breweries in the state of Missouri from the Open Brewery Database API. The function utilizes the requests module to send an HTTP GET request to the API endpoint specified by the API_URL variable. The response, assumed to be in JSON format, is then converted into a Python dictionary using the .json() method. The function returns this dictionary containing information about Missouri breweries. The accompanying docstring briefly explains that the API extracts data from the Open Brewery Database. To ensure the code's functionality, it is essential to import the requests module and execute the code in an environment with internet access.

In [3]:
import luigi
import requests
import pandas as pd
from sqlalchemy import create_engine

class ExtractTask(luigi.Task):
    def output(self):
        return luigi.LocalTarget("extract.csv")

    def run(self):
        data = extract()
        df = pd.DataFrame(data)
        df.to_csv(self.output().path, index=False)

class TransformTask(luigi.Task):
    def requires(self):
        return ExtractTask()

    def output(self):
        return luigi.LocalTarget("transform.csv")

    def run(self):
        df = pd.read_csv(self.input().path)
        df["state"] = df["state"].replace("Missouri", "MO")
        df["postal_code"] = df["postal_code"].str.slice(stop=5)
        df.to_csv(self.output().path, index=False)

class LoadTask(luigi.Task):
    def requires(self):
        return TransformTask()

    def output(self):
        return luigi.LocalTarget("my_lite_store.db")

    def run(self):
        df = pd.read_csv(self.input().path)
        db_engine = create_engine(f"sqlite:///{self.output().path}")
        df.to_sql('mo_brewery', db_engine, if_exists='replace', index=False)

if __name__ == '__main__':
    luigi.build([LoadTask()], local_scheduler=True)


DEBUG: Checking if LoadTask() is complete
DEBUG:luigi-interface:Checking if LoadTask() is complete
DEBUG: Checking if TransformTask() is complete
DEBUG:luigi-interface:Checking if TransformTask() is complete
INFO: Informed scheduler that task   LoadTask__99914b932b   has status   PENDING
INFO:luigi-interface:Informed scheduler that task   LoadTask__99914b932b   has status   PENDING
DEBUG: Checking if ExtractTask() is complete
DEBUG:luigi-interface:Checking if ExtractTask() is complete
INFO: Informed scheduler that task   TransformTask__99914b932b   has status   PENDING
INFO:luigi-interface:Informed scheduler that task   TransformTask__99914b932b   has status   PENDING
INFO: Informed scheduler that task   ExtractTask__99914b932b   has status   PENDING
INFO:luigi-interface:Informed scheduler that task   ExtractTask__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO:luigi-interface:Done scheduling tasks
INFO: Running Worker with 1 processes
INFO:luigi-interface:Running Wo

The pipeline consists of three tasks: ExtractTask, TransformTask, and LoadTask. Each task is responsible for a specific phase of the ETL (Extract, Transform, Load) process.

ExtractTask:

Output: This task generates a local target file named "extract.csv" using luigi.LocalTarget.
Run Method: Calls the extract() function (assuming it's defined elsewhere) to retrieve brewery data for Missouri from an external source (presumably the Open Brewery Database API). The data is then converted to a Pandas DataFrame (df) and saved as a CSV file using the specified output path.
TransformTask:

Requires: Depends on the completion of the ExtractTask before running.
Output: Produces a local target file named "transform.csv."
Run Method: Reads the CSV file generated by the ExtractTask, performs transformations on the DataFrame (e.g., abbreviating "Missouri" to "MO" in the "state" column and extracting the first five characters of the "postal_code" column), and saves the transformed DataFrame as a new CSV file using the specified output path.
LoadTask:

Requires: Depends on the completion of the TransformTask before running.
Output: Generates a SQLite database file named "my_lite_store.db" using luigi.LocalTarget.
Run Method: Reads the CSV file produced by the TransformTask, creates a SQLite database engine using SQLAlchemy, and loads the transformed DataFrame into a table named 'mo_brewery' in the SQLite database. The if_exists='replace' parameter ensures that if the table already exists, it will be replaced with the new data.
Main Block (__main__):

Calls luigi.build([LoadTask()], local_scheduler=True) to execute the Luigi pipeline. This builds and runs the specified task (LoadTask) and its dependencies, ensuring that tasks are executed in the correct order based on their dependencies.

In [4]:
%load_ext sql

In [5]:
%%sql
sqlite:///my_lite_store.db

In [6]:
%%sql
select * from mo_brewery limit 10;

 * sqlite:///my_lite_store.db
Done.


id,name,brewery_type,address_1,address_2,address_3,city,state_province,postal_code,country,longitude,latitude,phone,website_url,state,street
84337872-75da-4cdd-9f8f-1e1691f21642,21st St Brewers Bar,brewpub,2017 Chouteau Ave,,,Saint Louis,Missouri,63103,United States,-90.213781,38.624291,3142416969.0,http://www.21stbrew.com,MO,2017 Chouteau Ave
be0ec266-d637-42e4-94ba-4ce21e456054,2nd Shift Brewing Co,brewpub,1601 Sublette Ave,,,Saint Louis,Missouri,63110,United States,-90.280526,38.621943,6187910728.0,http://www.2ndshiftbrewing.com,MO,1601 Sublette Ave
cb56e27e-d113-4f74-925a-69305a56d082,3 Trails Brewing Co,micro,111 N Main St,,,Independence,Missouri,64050,United States,-94.41265,39.122385,8168866256.0,http://www.3trailsbrewing.com,MO,111 N Main St
4f70ac94-eca5-4cec-8581-92ec5afdf4f2,3Halves Brewing Co,brewpub,110 E Kansas St,,,Liberty,Missouri,64068,United States,-94.4190247,39.2461993,8164296886.0,http://www.3halvesbrewingco.com,MO,110 E Kansas St
b72f2ce6-e062-404d-ac31-c1060db2d870,4 By 4 Brewing Company,micro,2811 E Galloway St Ste A,,,Springfield,Missouri,65804,United States,-93.239889,37.147986,4178616400.0,http://www.4by4brewingcompany.com,MO,2811 E Galloway St Ste A
ac2f41ed-e1e8-4586-aa78-253543db7714,4 Hands Brewing Co,regional,1220 S 8th St,,,Saint Louis,Missouri,63104,United States,-90.19762012,38.61526593,3144361559.0,http://www.4handsbrewery.com,MO,1220 S 8th St
085c97d4-5a01-4e28-a1c6-683bf773b062,Alma Mader Brewing,micro,2635 Southwest Blvd,,,Kansas City,Missouri,64108,United States,-94.599389,39.082495,8169452589.0,http://almamaderbrewing.com,MO,2635 Southwest Blvd
0edb8050-2911-4a04-a99a-7927e05cd018,Alpha Brewing Co.,micro,4310 Fyler Ave,,,Saint Louis,Missouri,63116,United States,-90.26236838,38.59861817,3146212337.0,http://www.alphabrewingcompany.com,MO,4310 Fyler Ave
32dd19c9-33b8-4ed5-b5f0-1fb5f5d8ad02,Amerisports Brew Pub,brewpub,3200 Ameristar Dr,,,Kansas City,Missouri,64161,United States,-94.484526,39.150682,8164147435.0,http://www.ameristar.com,MO,3200 Ameristar Dr
d1a61260-d377-4e7d-8ce9-8851e30934ae,Anheuser-Busch InBev,large,1 Busch Pl,,,Saint Louis,Missouri,63118,United States,-90.2118998,38.5954536,3145772000.0,http://www.anheuser-busch.com,MO,1 Busch Pl


%load_ext sql: This line is a Jupyter magic command that loads the SQL extension, allowing you to execute SQL commands in the notebook.

%%sql: This is a cell-level magic command that indicates that the following code cell contains SQL commands.

sqlite:///my_lite_store.db: This line specifies the connection string for SQLite. It tells the SQL extension to connect to the SQLite database file named "my_lite_store.db" in the current directory.

%%sql: This is another cell-level magic command indicating that the following code cell contains additional SQL commands.

select * from mo_brewery limit 10;: This is a SQL query that selects all columns (*) from the table named 'mo_brewery' and limits the result to the first 10 rows using LIMIT 10. This query is executed in the SQLite database specified in the connection string.