In [11]:
from dotenv import load_dotenv
load_dotenv()
import openai
import os

In [12]:
schema_files = os.listdir('../schema')

In [13]:
all_schemas = {}

In [14]:
for file in schema_files:
    opened_file = open('../schema/' + file, 'r')
    all_schemas[file] = opened_file.read()

In [15]:
system_prompt = """
            You are a data engineer looking to generate an Airflow pipeline DAG skeleton 
            without the SQL details
            """

In [16]:
user_prompt = f"""
                Generate a cumulative Airflow DAG that transforms 
                {all_schemas['product.sql']}
                into {all_schemas['product_scd_tbl.sql']}
                use markdown for output and Postgres for queries
                The DAG depends on last season data from players table 
                and the DAG depends on past is true
                All create table statements should include IF NOT EXISTS
            """

In [17]:
print(system_prompt)
print(user_prompt)


            You are a data engineer looking to generate an Airflow pipeline DAG skeleton 
            without the SQL details
            

                Generate a cumulative Airflow DAG that transforms 
                CREATE TABLE product (
  Product_ID INT PRIMARY KEY,
  Category_Name VARCHAR(50),
  Sub_Category_Name VARCHAR(50),
  Brand VARCHAR(50),
  Feature_Desc VARCHAR(100)
)

                into create temp table #product_temp
select a.Product_ID as Product_ID_New,
case when a.Category_Name <> b.Category_Name then ‘-Category_Name’ else ‘’ end ||
case when a.Sub_Category_Name <> b.Sub_Category_Name then ‘-Sub_Category_Name’ else ‘’ end ||
case when a.Brand <> b.Brand then ‘-Brand’ else ‘’ end ||
case when a.Feature_Desc <> b.Feature_Desc then ‘-Feature_Desc’ else ‘’ end as CHANGED_COLUMN_NEW
from Dim_Product a join Stg_Product b
on a.Product_ID=b.Product_ID and a.current_flag=’Y’
where
a.Category_Name <> b.Category_Name or
a.Sub_Category_Name <> b.Sub_Category_Name or
a.Bra

In [18]:
response = openai.chat.completions.create(
    model="gpt-4",
    messages=[
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_prompt}
    ],
    temperature=0
)
answer = response.choices[0].message.content

In [19]:
print(answer)

```python
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'start_date': datetime(2021, 1, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'product_transformation', default_args=default_args, schedule_interval=timedelta(1))

t1 = PostgresOperator(
    task_id='create_product_table',
    postgres_conn_id='postgres_default',
    sql="""
    CREATE TABLE IF NOT EXISTS product (
        Product_ID INT PRIMARY KEY,
        Category_Name VARCHAR(50),
        Sub_Category_Name VARCHAR(50),
        Brand VARCHAR(50),
        Feature_Desc VARCHAR(100)
    );
    """,
    dag=dag)

t2 = PostgresOperator(
    task_id='create_product_temp_table',
    postgres_conn_id='postgres_default',
    sql="""
    CREATE TEMP T

In [20]:
if not os.path.exists('output'):
    os.mkdir('output')

In [21]:
output = filter(lambda x: x.startswith('python'), answer.split('```'))
# Open the file with write permissions
with open('output/airflow_dag.py', 'w') as file:
    # Write some data to the file
    file.write('\n'.join(output))