<a href="https://colab.research.google.com/github/aravind-chilakamarri/CS5001-Fundamentals-of-Data-Engineering/blob/main/Project_10_Luigi_ETL_data_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import requests # needed to send HTTP request to API server
import pandas as pd
from sqlalchemy import create_engine # Python SQL and DB toolkit

In [3]:
!pip install luigi

Collecting luigi
  Downloading luigi-3.4.0.tar.gz (1.2 MB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.2 MB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.3/1.2 MB[0m [31m7.5 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━[0m [32m0.6/1.2 MB[0m [31m9.4 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━[0m [32m1.1/1.2 MB[0m [31m11.1 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m10.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting python-daemon (from luigi)
  Downloading python_daemon-3.0.1-py3-none-any.whl (31 kB)
Collecting lockfile>=0.10 (from python-daemon->luigi)
  Downloading lockfile-0.12.2-py2.py3-none-any.whl (13 kB)
Building wheels for collected p

In [19]:
import luigi
import pandas as pd
import requests
from sqlalchemy import create_engine
import os

In [20]:


def extract():
    API_URL = "https://api.openbrewerydb.org/v1/breweries?by_state=missouri"
    data = requests.get(API_URL).json()
    return data

def transform(df):
    df["state"] = df["state"].replace("Missouri", "MO")
    df["postal_code"] = df["postal_code"].str.slice(stop=5)
    return df

def load(df, db_path):
    db_engine = create_engine(f"sqlite:///{db_path}")
    df.to_sql('mo_brewery', db_engine, if_exists='replace', index=False)

class ExtractTask(luigi.Task):
    def output(self):
        return luigi.LocalTarget("extract.csv")

    def run(self):
        data = extract()
        df = pd.DataFrame(data)
        df.to_csv(self.output().path, index=False)

class TransformTask(luigi.Task):
    def requires(self):
        return ExtractTask()

    def output(self):
        return luigi.LocalTarget("transform.csv")

    def run(self):
        df = pd.read_csv(self.input().path)
        transformed_df = transform(df)
        transformed_df.to_csv(self.output().path, index=False)

class LoadTask(luigi.Task):
    def requires(self):
        return TransformTask()

    def output(self):
        return luigi.LocalTarget("my_lite_store.db")

    def run(self):
        df = pd.read_csv(self.input().path)
        load(df, self.output().path)

if __name__ == '__main__':
    luigi.build([LoadTask()], local_scheduler=True)


DEBUG: Checking if LoadTask() is complete
DEBUG:luigi-interface:Checking if LoadTask() is complete
INFO: Informed scheduler that task   LoadTask__99914b932b   has status   DONE
INFO:luigi-interface:Informed scheduler that task   LoadTask__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO:luigi-interface:Done scheduling tasks
INFO: Running Worker with 1 processes
INFO:luigi-interface:Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG:luigi-interface:Asking scheduler for work...
DEBUG: Done
DEBUG:luigi-interface:Done
DEBUG: There are no more tasks to run at this time
DEBUG:luigi-interface:There are no more tasks to run at this time
INFO: Worker Worker(salt=7269379387, workers=1, host=7a27bb3f3fad, username=root, pid=251) was stopped. Shutting down Keep-Alive thread
INFO:luigi-interface:Worker Worker(salt=7269379387, workers=1, host=7a27bb3f3fad, username=root, pid=251) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =

In [21]:
%load_ext sql

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


In [22]:
%%sql
sqlite:///my_lite_store.db

In [23]:
%%sql
select * from mo_brewery limit 10;

 * sqlite:///my_lite_store.db
Done.


index,id,name,brewery_type,address_1,address_2,address_3,city,state_province,postal_code,country,longitude,latitude,phone,website_url,state,street
0,84337872-75da-4cdd-9f8f-1e1691f21642,21st St Brewers Bar,brewpub,2017 Chouteau Ave,,,Saint Louis,Missouri,63103,United States,-90.213781,38.624291,3142416969,http://www.21stbrew.com,MO,2017 Chouteau Ave
1,be0ec266-d637-42e4-94ba-4ce21e456054,2nd Shift Brewing Co,brewpub,1601 Sublette Ave,,,Saint Louis,Missouri,63110,United States,-90.280526,38.621943,6187910728,http://www.2ndshiftbrewing.com,MO,1601 Sublette Ave
2,cb56e27e-d113-4f74-925a-69305a56d082,3 Trails Brewing Co,micro,111 N Main St,,,Independence,Missouri,64050,United States,-94.41265,39.122385,8168866256,http://www.3trailsbrewing.com,MO,111 N Main St
3,4f70ac94-eca5-4cec-8581-92ec5afdf4f2,3Halves Brewing Co,brewpub,110 E Kansas St,,,Liberty,Missouri,64068,United States,-94.4190247,39.2461993,8164296886,http://www.3halvesbrewingco.com,MO,110 E Kansas St
4,b72f2ce6-e062-404d-ac31-c1060db2d870,4 By 4 Brewing Company,micro,2811 E Galloway St Ste A,,,Springfield,Missouri,65804,United States,-93.239889,37.147986,4178616400,http://www.4by4brewingcompany.com,MO,2811 E Galloway St Ste A
5,ac2f41ed-e1e8-4586-aa78-253543db7714,4 Hands Brewing Co,regional,1220 S 8th St,,,Saint Louis,Missouri,63104,United States,-90.19762012,38.61526593,3144361559,http://www.4handsbrewery.com,MO,1220 S 8th St
6,085c97d4-5a01-4e28-a1c6-683bf773b062,Alma Mader Brewing,micro,2635 Southwest Blvd,,,Kansas City,Missouri,64108,United States,-94.599389,39.082495,8169452589,http://almamaderbrewing.com,MO,2635 Southwest Blvd
7,0edb8050-2911-4a04-a99a-7927e05cd018,Alpha Brewing Co.,micro,4310 Fyler Ave,,,Saint Louis,Missouri,63116,United States,-90.26236838,38.59861817,3146212337,http://www.alphabrewingcompany.com,MO,4310 Fyler Ave
8,32dd19c9-33b8-4ed5-b5f0-1fb5f5d8ad02,Amerisports Brew Pub,brewpub,3200 Ameristar Dr,,,Kansas City,Missouri,64161,United States,-94.484526,39.150682,8164147435,http://www.ameristar.com,MO,3200 Ameristar Dr
9,d1a61260-d377-4e7d-8ce9-8851e30934ae,Anheuser-Busch InBev,large,1 Busch Pl,,,Saint Louis,Missouri,63118,United States,-90.2118998,38.5954536,3145772000,http://www.anheuser-busch.com,MO,1 Busch Pl
