Skip to content

Commit

Permalink
Fix postgres part of pipeline example of tutorial (#21586)
Browse files Browse the repository at this point in the history
(cherry picked from commit 40028f3)
  • Loading branch information
KevinYanesG authored and ephraimbuddy committed Mar 22, 2022
1 parent 7fee30a commit 13cc488
Showing 1 changed file with 41 additions and 26 deletions.
67 changes: 41 additions & 26 deletions docs/apache-airflow/tutorial.rst
Original file line number Diff line number Diff line change
Expand Up @@ -381,11 +381,30 @@ We need to have docker and postgres installed.
We will be using this `docker file <https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#docker-compose-yaml>`_
Follow the instructions properly to set up Airflow.

Create a Employee table in postgres using this:
You can use the postgres_default connection:

- Conn id: postgres_default
- Conn Type: postgres
- Host: postgres
- Schema: airflow
- Login: airflow
- Password: airflow


After that, you can test your connection and if you followed all the steps correctly, it should show a success notification. Proceed with saving the connection. For


Open up a postgres shell:

.. code-block:: bash
./airflow.sh airflow db shell
Create the Employees table with:

.. code-block:: sql
CREATE TABLE "Employees"
CREATE TABLE EMPLOYEES
(
"Serial Number" NUMERIC PRIMARY KEY,
"Company Name" TEXT,
Expand All @@ -394,7 +413,11 @@ Create a Employee table in postgres using this:
"Leave" INTEGER
);
CREATE TABLE "Employees_temp"
Afterwards, create the Employees_temp table:

.. code-block:: sql
CREATE TABLE EMPLOYEES_TEMP
(
"Serial Number" NUMERIC PRIMARY KEY,
"Company Name" TEXT,
Expand All @@ -403,17 +426,9 @@ Create a Employee table in postgres using this:
"Leave" INTEGER
);
We also need to add a connection to postgres. Go to the UI and click "Admin" >> "Connections". Specify the following for each field:
We are now ready write the DAG.

- Conn id: LOCAL
- Conn Type: postgres
- Host: postgres
- Schema: <DATABASE_NAME>
- Login: airflow
- Password: airflow
- Port: 5432

After that, you can test your connection and if you followed all the steps correctly, it should show a success notification. Proceed with saving the connection and we are now ready write the DAG.

Let's break this down into 2 steps: get data & merge data:

Expand All @@ -436,12 +451,12 @@ Let's break this down into 2 steps: get data & merge data:
with open(data_path, "w") as file:
file.write(response.text)
postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
conn = postgres_hook.get_conn()
cur = conn.cursor()
with open(data_path, "r") as file:
cur.copy_expert(
"COPY \"Employees_temp\" FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
"COPY EMPLOYEES_TEMP FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
file,
)
conn.commit()
Expand All @@ -457,16 +472,16 @@ Here we are passing a ``GET`` request to get the data from the URL and save it i
@task
def merge_data():
query = """
DELETE FROM "Employees" e
USING "Employees_temp" et
DELETE FROM EMPLOYEES e
USING EMPLOYEES_TEMP et
WHERE e."Serial Number" = et."Serial Number";
INSERT INTO "Employees"
INSERT INTO EMPLOYEES
SELECT *
FROM "Employees_temp";
FROM EMPLOYEES_TEMP;
"""
try:
postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
conn = postgres_hook.get_conn()
cur = conn.cursor()
cur.execute(query)
Expand Down Expand Up @@ -509,29 +524,29 @@ Lets look at our DAG:
with open(data_path, "w") as file:
file.write(response.text)
postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
conn = postgres_hook.get_conn()
cur = conn.cursor()
with open(data_path, "r") as file:
cur.copy_expert(
"COPY \"Employees_temp\" FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
"COPY EMPLOYEES_TEMP FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
file,
)
conn.commit()
@task
def merge_data():
query = """
DELETE FROM "Employees" e
USING "Employees_temp" et
DELETE FROM EMPLOYEES e
USING EMPLOYEES_TEMP et
WHERE e."Serial Number" = et."Serial Number";
INSERT INTO "Employees"
INSERT INTO EMPLOYEES
SELECT *
FROM "Employees_temp";
FROM EMPLOYEES_TEMP;
"""
try:
postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
conn = postgres_hook.get_conn()
cur = conn.cursor()
cur.execute(query)
Expand Down

0 comments on commit 13cc488

Please sign in to comment.