In [1]:
!pip install -r requirements.txt


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [2]:
## Set up
import os
import sys
import subprocess   
import time
import pandas as pd
from dotenv import load_dotenv
import openai
import os

load_dotenv()
openai.api_key = os.getenv("OPENAI_API_KEY")

source_schema_path = './01_input/01_ctr_schema.csv'
target_schema_path = './01_input/02_target_schema.csv'

In [3]:
df_source_schema = pd.read_csv(source_schema_path)
df_target_schema = pd.read_csv(target_schema_path, sep=';')


In [10]:
df_source_schema.head()

Unnamed: 0,TRIMMED COLUMN NAME,Entry #,Subject Area,Current CTR Definition Field Name,Data Type,Business Definition,Usage,Required Field? (Y/N),Notes/Values,CHR?,Table Name,Column Name,Optum Extract Data Name,Optum Notes
0,CUSTOMER_ID,1,Customer,CTR-CUSTOMER-ID,9(10),Unique customer identifier,B,Y,Primary key,Y,"ECOM001_CUSTOMERS, ECOM002_ADDRESSES, ECOM007_...",CUSTOMER_ID,PCUST-ID,Customer primary key
1,PRODUCT_ID,2,Product,CTR-PRODUCT-ID,9(10),Unique product identifier,B,Y,Primary key,Y,"ECOM005_PRODUCTS, ECOM006_INVENTORY, ECOM008_O...",PRODUCT_ID,PPROD-ID,Product primary key
2,ORDER_ID,3,Order,CTR-ORDER-ID,9(12),Unique order identifier,B,Y,Primary key,Y,"ECOM007_ORDERS, ECOM008_ORDER_ITEMS, ECOM009_P...",ORDER_ID,PORD-ID,Order primary key
3,ADDRESS_ID,4,Address,CTR-ADDRESS-ID,9(10),Unique address identifier,B,Y,Primary key,Y,ECOM002_ADDRESSES,ADDRESS_ID,PADDR-ID,
4,CATEGORY_ID,5,Product,CTR-CATEGORY-ID,9(8),Unique category identifier,B,Y,Primary key,Y,"ECOM003_CATEGORIES, ECOM005_PRODUCTS",CATEGORY_ID,PCAT-ID,Category reference in products


In [4]:
df_target_schema

Unnamed: 0,Table Name,Column Name,Data Type,Description,Sample Value
0,TAGRET01,CUSTOMER_ID,BIGINT,Unique customer identifier - Primary Key,1001
1,TAGRET01,UNIT_PRICE_ABOVE_100,"DECIMAL(9,2)",How much of the product unit price is above 10...,2499.99
2,TAGRET01,ORDER_DATE_YR,BIGINT,Year in which the order was placed,2024
3,TAGRET01,IS_STATUS_DELETED,BULLION,TRUE if Customer status is ( I=Inactive or D=...,TRUE


In [5]:
# Convert DataFrame to JSON (records format)
json_source_schema = df_source_schema.to_json(orient='records', lines=False)
json_target_schema = df_target_schema.to_json(orient='records', lines=False)

# Save JSON to a file
with open('./02_wip/source_schema.json', 'w') as f:
    f.write(json_source_schema)
    
with open('./02_wip/target_schema.json', 'w') as f:
    f.write(json_target_schema)

In [6]:
def generate_transformation_rules(json_source_schema, json_target_schema, extra_instructions=""):
    prompt = (
        f"Given the following source table schema in JSON:\n{json_source_schema}\n\n"
        f"And the following target table schema in JSON:\n{json_target_schema}\n\n"
        "For each field in the target table, provide a transformation rule in the following JSON format:\n"
        "{\n"
        "  \"target_table\": \"<target_table_name>\",\n"
        "  \"target_field\": \"<target_field_name>\",\n"
        "  \"source_table\": \"<source_table_name>\",\n"
        "  \"source_field\": \"<source_field_name>\",\n"
        "  \"transformation\": \"<describe transformation or mapping logic>\"\n"
        "}\n"
        "If multiple source fields or tables are needed, list them as arrays. "
        "Be explicit about any type conversions, calculations, or renaming. "
        f"{extra_instructions}\n"
        "Output only the transformation rules as a JSON array."
    )
    response = openai.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[
            {"role": "system", "content": "You are a data engineer who writes data transformation rules."},
            {"role": "user", "content": prompt}
        ],
        max_tokens=1000,
        temperature=0.2
    )
    return response.choices[0].message.content



In [7]:
json_target_schema

'[{"Table Name":"TAGRET01","Column Name":"CUSTOMER_ID","Data Type":"BIGINT","Description":"Unique customer identifier - Primary Key","Sample Value":"1001"},{"Table Name":"TAGRET01","Column Name":"UNIT_PRICE_ABOVE_100","Data Type":"DECIMAL(9,2)","Description":"How much of the product unit price is above 100 i.e product price minus 100","Sample Value":"2499.99"},{"Table Name":"TAGRET01","Column Name":"ORDER_DATE_YR","Data Type":"BIGINT","Description":"Year in which the order was placed","Sample Value":"2024"},{"Table Name":"TAGRET01","Column Name":"IS_STATUS_DELETED","Data Type":"BULLION","Description":"TRUE if Customer status is ( I=Inactive or  D=Deleted)","Sample Value":"TRUE"}]'

In [9]:
# Example usage:
rules = generate_transformation_rules(json_source_schema,json_target_schema)
print(rules)

```json
[
  {
    "target_table": "TAGRET01",
    "target_field": "CUSTOMER_ID",
    "source_table": "ECOM001_CUSTOMERS",
    "source_field": "CUSTOMER_ID",
    "transformation": "Direct mapping"
  },
  {
    "target_table": "TAGRET01",
    "target_field": "UNIT_PRICE_ABOVE_100",
    "source_table": ["ECOM005_PRODUCTS"],
    "source_field": ["UNIT_PRICE"],
    "transformation": "CASE WHEN UNIT_PRICE > 100 THEN UNIT_PRICE - 100 ELSE 0 END"
  },
  {
    "target_table": "TAGRET01",
    "target_field": "ORDER_DATE_YR",
    "source_table": "ECOM007_ORDERS",
    "source_field": "ORDER_DATE",
    "transformation": "CAST(SUBSTRING(ORDER_DATE, 1, 4) AS BIGINT)"
  },
  {
    "target_table": "TAGRET01",
    "target_field": "IS_STATUS_DELETED",
    "source_table": "ECOM001_CUSTOMERS",
    "source_field": "STATUS",
    "transformation": "CASE WHEN STATUS IN ('I', 'D') THEN TRUE ELSE FALSE END"
  }
]
```
