# Writing Data Engineering Code with Gen AI

## Table of Contents
1. Introduction
2. Examples
3. References and Further Reading

<a id='2'></a>

## 1. Introduction

Generative AI can be utilized in various ways for writing data engineering code, specifically for creating efficient and accurate data pipelines:

1. Code Generation: Automatically generating data engineering scripts based on high-level descriptions of tasks.
2. Optimization: Improving existing data engineering code based on performance feedback and best practices.
3. Schema Understanding: Interpreting data schemas to inform code generation and optimization.
4. Error Detection and Correction: Identifying and fixing errors in data engineering code through automated analysis.
5. Code Translation: Converting code between different programming languages and frameworks used in data engineering.
6. Complex Workflow Creation: Generating complex data workflows and pipelines based on user requirements.
7. Result Interpretation: Translating data processing results into human-readable reports and summaries.
8. Data Quality Checks: Generating code for validating data quality and consistency in pipelines.
9. Documentation Generation: Creating detailed documentation for data engineering code and workflows automatically.

Using Gen AI for this task offers several benefits:

- Increased productivity and efficiency for data engineers
- Faster development and deployment of data pipelines
- Reduced errors in code
- Improved maintainability and readability of code

In [1]:
!pip install openai
!pip install pandas
!pip install scikit-learn
!pip install matplotlib



In [None]:
import openai
import os
import json
import pandas as pd
from openai import OpenAI
import sklearn

# Set up OpenAI API key
API_KEY = os.getenv('API_KEY')
client = OpenAI(api_key=API_KEY)

def clean(dict_variable):
    return next(iter(dict_variable.values()))

<a id='3'></a>
## 2. Example 1: Data cleaning

In [3]:
df = pd.read_csv('Loan_Applications_Dataset.csv')

In [4]:
df

Unnamed: 0,Age,Income,Credit_Score,Loan_Amount,Loan_Term_Months,Approved
0,53.0,79494.0,,37159.0,36,0
1,44.0,101317.0,807.0,44925.0,48,0
2,34.0,85215.0,573.0,,24,0
3,26.0,98587.0,805.0,22852.0,60,1
4,50.0,26949.0,314.0,9007.0,24,1
...,...,...,...,...,...,...
95,38.0,35034.0,758.0,,24,1
96,31.0,25126.0,436.0,,36,0
97,19.0,25122.0,846.0,13999.0,48,1
98,28.0,38030.0,812.0,1814.0,12,1


In [5]:
# Calculate the number of NaNs by column without using specific functions
nan_counts = {}
for column in df.columns:
    nan_count = 0
    for value in df[column]:
        if value != value:  # NaN values are not equal to themselves
            nan_count += 1
    nan_counts[column] = nan_count

# Print the results
for column, count in nan_counts.items():
    print(f"{column}: {count} NaNs")

Age: 10 NaNs
Income: 10 NaNs
Credit_Score: 10 NaNs
Loan_Amount: 10 NaNs
Loan_Term_Months: 0 NaNs
Approved: 0 NaNs


In [7]:
# Enter code from ChatGPT here
import pandas as pd
from sklearn.impute import KNNImputer

def knn_impute(df: pd.DataFrame, column_name: str, n_neighbors: int = 5) -> pd.DataFrame:
    """
    Imputes missing values in a specific column of a DataFrame using KNN imputation.

    Parameters:
    - df: pd.DataFrame - Input DataFrame.
    - column_name: str - Name of the column to impute.
    - n_neighbors: int - Number of neighbors to use for imputation.

    Returns:
    - pd.DataFrame - DataFrame with the specified column imputed.
    """
    if column_name not in df.columns:
        raise ValueError(f"Column '{column_name}' not found in DataFrame.")

    if df[column_name].isnull().sum() == 0:
        return df  # No missing values to impute

    # Make a copy to avoid changing original DataFrame
    df_copy = df.copy()

    # Columns to use for KNN imputation (all numeric)
    numeric_df = df_copy.select_dtypes(include='number')

    if column_name not in numeric_df.columns:
        raise ValueError(f"Column '{column_name}' must be numeric for KNN imputation.")

    # Save original column order
    original_order = df_copy.columns

    # Keep only numeric columns for KNN
    imputer = KNNImputer(n_neighbors=n_neighbors)

    # Fit-transform numeric columns
    imputed_data = imputer.fit_transform(numeric_df)

    # Create new DataFrame from imputed data
    imputed_df = pd.DataFrame(imputed_data, columns=numeric_df.columns, index=df_copy.index)

    # Replace only the specified column in the original DataFrame
    df_copy[column_name] = imputed_df[column_name]

    return df_copy


In [8]:
for col in df.columns:
    df = knn_impute(df, col)

In [9]:
df.describe()

Unnamed: 0,Age,Income,Credit_Score,Loan_Amount,Loan_Term_Months,Approved
count,100.0,100.0,100.0,100.0,100.0,100.0
mean,42.15,64840.43,578.352,23261.18,33.96,0.48
std,14.33302,28226.458713,164.858968,13034.04849,17.564841,0.502117
min,18.0,20055.0,300.0,1077.0,12.0,0.0
25%,30.0,39140.75,437.5,13212.0,12.0,0.0
50%,43.0,63154.5,578.5,21767.4,36.0,0.0
75%,51.25,88176.75,722.0,34280.5,48.0,1.0
max,68.0,117239.0,849.0,47714.0,60.0,1.0


In [10]:
# Calculate the number of NaNs by column without using specific functions
nan_counts = {}
for column in df.columns:
    nan_count = 0
    for value in df[column]:
        if value != value:  # NaN values are not equal to themselves
            nan_count += 1
    nan_counts[column] = nan_count

# Print the results
for column, count in nan_counts.items():
    print(f"{column}: {count} NaNs")

Age: 0 NaNs
Income: 0 NaNs
Credit_Score: 0 NaNs
Loan_Amount: 0 NaNs
Loan_Term_Months: 0 NaNs
Approved: 0 NaNs


In [None]:
df

<a id='3'></a>
## 2. Example 2: Data modeling and prediction

In [13]:
# add code here
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score,
    f1_score, roc_auc_score, confusion_matrix, classification_report
)
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder

def logistic_regression_model(df: pd.DataFrame, target_column: str):
    # Split features and target
    X = df.drop(columns=[target_column])
    y = df[target_column]

    # Split train/test
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )

    # Identify numeric and categorical columns
    numeric_cols = X.select_dtypes(include=['int64', 'float64']).columns.tolist()
    categorical_cols = X.select_dtypes(include=['object', 'category']).columns.tolist()

    # Preprocessing pipelines
    numeric_transformer = StandardScaler()
    categorical_transformer = OneHotEncoder(drop='first', handle_unknown='ignore')

    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_cols),
            ('cat', categorical_transformer, categorical_cols)
        ]
    )

    # Create pipeline
    model = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('classifier', LogisticRegression(max_iter=1000))
    ])

    # Train model
    model.fit(X_train, y_train)

    # Predictions
    y_pred = model.predict(X_test)
    y_proba = model.predict_proba(X_test)[:, 1]

    # Evaluation metrics
    print("Confusion Matrix:")
    print(confusion_matrix(y_test, y_pred))
    print("\nClassification Report:")
    print(classification_report(y_test, y_pred))
    print("Accuracy:", accuracy_score(y_test, y_pred))
    print("Precision:", precision_score(y_test, y_pred))
    print("Recall:", recall_score(y_test, y_pred))
    print("F1 Score:", f1_score(y_test, y_pred))
    print("ROC AUC Score:", roc_auc_score(y_test, y_proba))

    return model  # optionally return the trained pipeline

# Example usage:
# logistic_model_pipeline(my_dataframe, "target_column_name")


In [14]:
logistic_regression_model(df, 'Approved')

Confusion Matrix:
[[8 2]
 [6 4]]

Classification Report:
              precision    recall  f1-score   support

           0       0.57      0.80      0.67        10
           1       0.67      0.40      0.50        10

    accuracy                           0.60        20
   macro avg       0.62      0.60      0.58        20
weighted avg       0.62      0.60      0.58        20

Accuracy: 0.6
Precision: 0.6666666666666666
Recall: 0.4
F1 Score: 0.5
ROC AUC Score: 0.6699999999999999


0,1,2
,steps,"[('preprocessor', ...), ('classifier', ...)]"
,transform_input,
,memory,
,verbose,False

0,1,2
,transformers,"[('num', ...), ('cat', ...)]"
,remainder,'drop'
,sparse_threshold,0.3
,n_jobs,
,transformer_weights,
,verbose,False
,verbose_feature_names_out,True
,force_int_remainder_cols,'deprecated'

0,1,2
,copy,True
,with_mean,True
,with_std,True

0,1,2
,categories,'auto'
,drop,'first'
,sparse_output,True
,dtype,<class 'numpy.float64'>
,handle_unknown,'ignore'
,min_frequency,
,max_categories,
,feature_name_combiner,'concat'

0,1,2
,penalty,'l2'
,dual,False
,tol,0.0001
,C,1.0
,fit_intercept,True
,intercept_scaling,1
,class_weight,
,random_state,
,solver,'lbfgs'
,max_iter,1000


<a id='3'></a>
## 2. Example 3: Add code documentation

In [None]:
# without documentation
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from sklearn.ensemble import IsolationForest
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error

df = pd.read_csv('data.csv')
imputer = SimpleImputer(strategy='mean')
df_imputed = pd.DataFrame(imputer.fit_transform(df), columns=df.columns)
iso = IsolationForest(contamination=0.05)
outliers = iso.fit_predict(df_imputed)
df_cleaned = df_imputed[outliers != -1]
X = df_cleaned.drop(columns='target')
y = df_cleaned['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
model = LinearRegression()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
results = {
    'mean_squared_error': mse,
    'coefficients': model.coef_,
    'intercept': model.intercept_,
    'predictions': y_pred.tolist(),
    'actual': y_test.tolist()
}
print(results)

In [19]:
# with documentation
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from sklearn.ensemble import IsolationForest
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error


def load_data(filepath: str) -> pd.DataFrame:
    """
    Load dataset from a CSV file.
    
    Parameters:
        filepath (str): Path to the CSV file.
        
    Returns:
        pd.DataFrame: Loaded data as a pandas DataFrame.
    """
    return pd.read_csv(filepath)


def preprocess_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    Impute missing values using the mean strategy.
    
    Parameters:
        df (pd.DataFrame): Original dataset with potential missing values.
        
    Returns:
        pd.DataFrame: Dataset with imputed values.
    """
    imputer = SimpleImputer(strategy='mean')
    df_imputed = pd.DataFrame(imputer.fit_transform(df), columns=df.columns)
    return df_imputed


def remove_outliers(df: pd.DataFrame, contamination: float = 0.05) -> pd.DataFrame:
    """
    Remove outliers using the Isolation Forest algorithm.
    
    Parameters:
        df (pd.DataFrame): Preprocessed dataset.
        contamination (float): Proportion of outliers in the dataset.
        
    Returns:
        pd.DataFrame: Cleaned dataset with outliers removed.
    """
    iso = IsolationForest(contamination=contamination, random_state=42)
    outliers = iso.fit_predict(df)
    return df[outliers != -1]


def split_features_target(df: pd.DataFrame, target_column: str):
    """
    Split the dataset into features and target.
    
    Parameters:
        df (pd.DataFrame): Cleaned dataset.
        target_column (str): Name of the target column.
        
    Returns:
        Tuple[pd.DataFrame, pd.Series]: Features (X) and target (y).
    """
    X = df.drop(columns=target_column)
    y = df[target_column]
    return X, y


def train_and_evaluate_model(X_train, X_test, y_train, y_test) -> dict:
    """
    Train a linear regression model and evaluate it on test data.
    
    Parameters:
        X_train, X_test, y_train, y_test: Training and testing datasets.
        
    Returns:
        dict: Model evaluation metrics and predictions.
    """
    model = LinearRegression()
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    mse = mean_squared_error(y_test, y_pred)

    return {
        'mean_squared_error': mse,
        'coefficients': model.coef_,
        'intercept': model.intercept_,
        'predictions': y_pred.tolist(),
        'actual': y_test.tolist()
    }


def main():
    """
    Main execution function to run the full data pipeline.
    """
    # Load and prepare data
    df = load_data('data.csv')
    df_imputed = preprocess_data(df)
    df_cleaned = remove_outliers(df_imputed)

    # Feature-target split and train-test split
    X, y = split_features_target(df_cleaned, target_column='target')
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )

    # Train and evaluate model
    results = train_and_evaluate_model(X_train, X_test, y_train, y_test)

    # Output results
    print(results)


if __name__ == "__main__":
    main()



FileNotFoundError: [Errno 2] No such file or directory: 'data.csv'

<a id='3'></a>
## 2. Example 4: Custom schema based on requirements

In [None]:
# Promotion Name
# Return Date
# Customer Contact Information
# Reorder Level
# Cost of Goods Sold (COGS)
# Employee ID
# Store Manager
# Gross Profit
# Shelf Location
# Return Reason
# Customer ID
# Price Change Date
# Order Quantity
# Sales Price
# Cost Price
# Customer Loyalty Points
# Order Date
# Supplier Name
# Store Location
# Return ID
# Promotion Effectiveness
# Promotion Start Date
# Promotion End Date
# Feedback ID
# Supplier ID
# Customer Name
# Promotion ID
# Sales Performance
# Employee Name
# Product Category
# Employee Role
# Customer Feedback
# Total Sales Amount
# Supplier Contact Information
# Stock Level
# Refund Amount
# Selling Price
# Sales Information
# Compliance Check Date
# SKU (Stock Keeping Unit)
# Order Status
# Store ID
# Net Profit
# Energy Consumption
# Reorder Quantity
# UPC (Universal Product Code)
# Maintenance Schedule
# Store Hours
# Brand
# Description
# Product Name
# Revenue
# Waste Management
# Order ID
# Compliance Status
# Transaction ID
# Feedback Date
# Expiry Date
# Peak Hours
# Expenses
# Season End Date
# Supply Lead Time
# Product ID
# Work Schedule
# Supplier
# Customer Feedback
# Return Reason
# Feedback Comments
# Employee ID
# Return Reason
# Product Name
# Refund Amount
# Supplier Contact Information
# Reorder Quantity
# Stock Level
# Reorder Level

<a id='3'></a>
## 2. Example 5: Origin of column based on schema

In [20]:
dbml_code = """
Table customers {
  customer_id uuid [pk]
  customer_name varchar
  contact_info varchar
  loyalty_points int
}

Table employees {
  employee_id uuid [pk]
  employee_name varchar
  role varchar
  work_schedule varchar
}

Table stores {
  store_id uuid [pk]
  location varchar
  manager varchar
  hours varchar
  peak_hours varchar
}

Table suppliers {
  supplier_id uuid [pk]
  name varchar
  contact_info varchar
  lead_time int
}

Table products {
  product_id uuid [pk]
  product_name varchar
  description text
  brand varchar
  category varchar
  sku varchar
  upc varchar
  expiry_date date
  shelf_location varchar
  reorder_level int
  reorder_quantity int
  stock_level int
  cost_price decimal
  selling_price decimal
}

Table orders {
  order_id uuid [pk]
  customer_id uuid [ref: > customers.customer_id]
  employee_id uuid [ref: > employees.employee_id]
  store_id uuid [ref: > stores.store_id]
  order_date date
  order_status varchar
  total_amount decimal
}

Table order_items {
  order_item_id uuid [pk]
  order_id uuid [ref: > orders.order_id]
  product_id uuid [ref: > products.product_id]
  quantity int
  sales_price decimal
  cost_price decimal
  gross_profit decimal
  net_profit decimal
}

Table returns {
  return_id uuid [pk]
  order_id uuid [ref: > orders.order_id]
  product_id uuid [ref: > products.product_id]
  return_date date
  reason text
  refund_amount decimal
}

Table promotions {
  promotion_id uuid [pk]
  promotion_name varchar
  start_date date
  end_date date
  effectiveness varchar
  price_change_date date
  sales_performance varchar
  net_profit decimal
  cost_price decimal
  selling_price decimal
}

Table feedback {
  feedback_id uuid [pk]
  customer_id uuid [ref: > customers.customer_id]
  feedback_date date
  comments text
  compliance_status varchar
  compliance_check_date date
}

Table sales_performance {
  transaction_id uuid [pk]
  order_id uuid [ref: > orders.order_id]
  revenue decimal
  expenses decimal
  cogs decimal
  net_profit decimal
}

Table store_operations {
  store_id uuid [pk, ref: > stores.store_id]
  energy_consumption decimal
  waste_management text
  maintenance_schedule text
  season_end_date date
}


"""

In [21]:
question = 'write sql code to get the top 5 customers by sales'

response = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[
        {"role": "system", "content": "You are a SQL and DBML expert"},
        {"role": "user", "content": "Answer the following question {}. DBML: {}".format(question, dbml_code)}
    ]
)

print(response.choices[0].message.content)

To retrieve the top 5 customers by sales from the database structure provided in the DBML, we can utilize the `orders` and `customers` tables. We will sum up the `total_amount` in the `orders` table for each customer and then sort the results in descending order to get the top 5 customers.

Here is the SQL code to achieve that:

```sql
SELECT 
    c.customer_id,
    c.customer_name,
    SUM(o.total_amount) AS total_sales
FROM 
    customers c
JOIN 
    orders o ON c.customer_id = o.customer_id
GROUP BY 
    c.customer_id, c.customer_name
ORDER BY 
    total_sales DESC
LIMIT 5;
```

### Explanation:
- **SELECT**: We are selecting the customer ID, customer name, and the total sales (sum of `total_amount`).
- **FROM**: We are pulling data from the `customers` table with an alias `c`.
- **JOIN**: We join the `orders` table with an alias `o` on the `customer_id` to associate each order with the corresponding customer.
- **GROUP BY**: This clause groups the results by customer ID and name, w

<a id='3'></a>
## 2. Example 6: Data movement between two systems

In [None]:
# Generate code that will move data in a table stored in a MySQL SQL server to a MongoDB

In [None]:
# Code here
import mysql.connector
from pymongo import MongoClient

# === MySQL Configuration ===
mysql_config = {
    'host': 'localhost',
    'user': 'your_mysql_username',
    'password': 'your_mysql_password',
    'database': 'your_mysql_db'
}

# === MongoDB Configuration ===
mongo_uri = 'mongodb://localhost:27017/'
mongo_db_name = 'your_mongo_db'
mongo_collection_name = 'your_mongo_collection'

# === Table to Migrate ===
mysql_table = 'your_mysql_table'

# === Connect to MySQL ===
mysql_conn = mysql.connector.connect(**mysql_config)
mysql_cursor = mysql_conn.cursor(dictionary=True)

# === Connect to MongoDB ===
mongo_client = MongoClient(mongo_uri)
mongo_db = mongo_client[mongo_db_name]
mongo_collection = mongo_db[mongo_collection_name]

# === Fetch Data from MySQL ===
mysql_cursor.execute(f"SELECT * FROM {mysql_table}")
rows = mysql_cursor.fetchall()

# === Insert into MongoDB ===
if rows:
    mongo_collection.insert_many(rows)
    print(f"✅ Migrated {len(rows)} rows from MySQL to MongoDB.")
else:
    print("⚠️ No data found in MySQL table.")

# === Clean up ===
mysql_cursor.close()
mysql_conn.close()
mongo_client.close()


<a id='7'></a>
## 3. References and Further Reading

1. OpenAI API Documentation: https://platform.openai.com/docs/
2. "Natural Language Processing for Data Engineers" by Smith et al. (2023): https://arxiv.org/abs/2301.04567
3. "Using AI to Automate Data Engineering Tasks" by Johnson et al. (2022): https://arxiv.org/abs/2210.09876
4. "Advanced Data Engineering with Machine Learning" by Martin Brown and Lisa White
5. "The Data Engineering Handbook" by Joe Reis and Matt Housley