In [12]:
import requests
import os

# GitHub repository details
owner = 'Srivalli2024'
repo = 'Pentaho_Files_Input'
branch = 'main'  # Change this to the desired branch if different

# Personal access token (use environment variables or other secure methods to store your token)
token = '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'

# GitHub API URL to list files in the repository
api_url = f'https://api.github.com/repos/{owner}/{repo}/git/trees/{branch}?recursive=1'

# Get the list of files in the repository with authentication
headers = {'Authorization': f'token {token}'}
response = requests.get(api_url, headers=headers)
if response.status_code == 200:
    files = response.json().get('tree', [])
    # Filter for .ktr files
    ktr_files = [file['path'] for file in files if file['path'].endswith('.ktr')]
    
    if ktr_files:
        # Display the list of .ktr files to the user
        print("Select a .ktr file to download:")
        for idx, file in enumerate(ktr_files):
            print(f"{idx + 1}: {file}")
        
        # Prompt user to select a file
        while True:
            try:
                selection = int(input("Enter the number of the file to download: ")) - 1
                if 0 <= selection < len(ktr_files):
                    selected_file = ktr_files[selection]
                    break
                else:
                    print("Invalid selection. Please try again.")
            except ValueError:
                print("Invalid input. Please enter a number.")
        
        # GitHub URL to download the file
        download_url = f'https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{selected_file}'
        download_response = requests.get(download_url, headers=headers)
        
        if download_response.status_code == 200:
            # Save the file locally
            local_file_path = os.path.join('simplepentaho', os.path.basename(selected_file))
            with open(local_file_path, 'wb') as f:
                f.write(download_response.content)
            print(f"File downloaded and saved to: {local_file_path}")
        else:
            print(f"Failed to download the file. Status code: {download_response.status_code}")
    else:
        print("No .ktr files found in the repository.")
else:
    print(f"Failed to retrieve the file list. Status code: {response.status_code}")

Select a .ktr file to download:
1: Get list of tables.ktr
2: Process one table.ktr
3: save list of all result files.ktr
4: set variables.ktr
File downloaded and saved to: simplepentaho\set variables.ktr


In [2]:
import pandas as pd
import json
import openai
import os
import shutil
import xml.etree.ElementTree as ET
import requests
import logging
from logging.handlers import RotatingFileHandler

# Configure logging with a rotating log file handler
log_filename = 'app.log'
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
handler = RotatingFileHandler(log_filename, maxBytes=10*1024*1024, backupCount=1)
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger = logging.getLogger(__name__)
logger.addHandler(handler)

file_path = None
folder_name = None

def get_elements_from_file(xml_file_path, element_name):
    root = ET.parse(xml_file_path).getroot()
    step_elements = root.findall(f'.//{element_name}')
    return [ET.tostring(step, encoding='unicode') for step in step_elements] if step_elements else []

def save_elements_to_file(input_filename, output_filename):
    xml_file_path = f'{input_filename}'
    output_text_file_path = f'{output_filename}s.txt' if output_filename == 'step' else 'hop_order.txt'
    step_data_strings = get_elements_from_file(xml_file_path, output_filename)

    logger.info(f"{output_filename} elements extracted")

    with open(output_text_file_path, 'w', encoding='utf-8') as output_file:
        output_file.writelines(f"{step_data_string}\n" for step_data_string in step_data_strings)

    logger.info(f"The output has been saved to '{output_text_file_path}'")
    print(f"The output has been saved to '{output_text_file_path}'")

def openai_sequence_steps(hop_order_file_path, steps_file_path):
    # Read the configuration file for OpenAI credentials
    with open('config_file.json') as f:
        json_data = json.load(f)

    # Set up OpenAI configuration with the loaded credentials
    openai.api_type = json_data["openai_api_type"]
    openai.api_base = json_data["openai_api_base"]
    openai.api_version = json_data["openai_api_version"]
    openai.api_key = json_data['openai_api_key']
  
    # Read the contents of the steps file
    with open(hop_order_file_path, 'r', encoding='utf-8') as file:
        hop_order = file.read()
        
    # Read the contents of the steps file
    with open(steps_file_path, 'r', encoding='utf-8') as file:
        steps = file.read()
  
    try:
        # Make the API call to OpenAI with the hop info and the contents of steps.txt
        response = openai.ChatCompletion.create(
            engine="gpt-4-32k",
            temperature=0.1,
            messages=[
                {
                    "role": "system",
                    "content": "You are an expert in converting Pentaho code to Pyspark code.\n\nYou will receive a hop order file and a steps file. Analyze the hop order to determine the sequence, then convert the corresponding steps into Pyspark code following that sequence.\n\nReturn the Pyspark code for the entire flow step by step as per the steps file.\n\nGive sequence number to each step. Give output in python not in scala"
                },
                {   "role": "user", 
                    "content": f"Here is the Pentaho hop order file: {hop_order} and the steps file {steps}"
                }
            ],
        )
      
        # Extract the output from the response
        output = response["choices"][0]["message"]["content"]
        return output
    except Exception as e:
        print(e)
        logger.error(f"{e}")
        raise

def copy_specific_files(source_folder, target_folder, file_names):
    if not os.path.exists(target_folder):
        os.makedirs(target_folder)
    
    for file_name in file_names:
        source_path = os.path.join(source_folder, file_name)
        target_path = os.path.join(target_folder, file_name)
        if os.path.exists(source_path):
            shutil.copy(source_path, target_path)
            print(f"Copied '{file_name}' to '{target_folder}'.")
        else:
            print(f"File '{file_name}' not found in '{source_folder}'.")

In [3]:

def process_ktr(file_path, folder_name):
        input_filename = file_path
        folder_name = os.path.join('spark_code', folder_name)
        hop_order_file_path = "hop_order.txt"
        steps_file_path = "steps.txt"
        output_file_path = "pyspark_code.txt"

        save_elements_to_file(input_filename, 'order')
        save_elements_to_file(input_filename, 'step')

        code_output = openai_sequence_steps(hop_order_file_path, steps_file_path)
        print(code_output)

        with open(output_file_path, 'a', encoding='utf-8') as output_file:
            output_file.write(code_output)

        source_folder = ""
        target_folder = folder_name
        file_names = ["hop_order.txt", "steps.txt", "pyspark_code.txt"]
        copy_specific_files(source_folder, target_folder, file_names)
        for file_name in file_names:
            file_path = os.path.join(source_folder, file_name)
            os.remove(file_path)
        logger.info(f"The converted code is saved in {folder_name} folder")


In [8]:
def process_directory(directory_path):
    # Iterate over all files in the directory
    for filename in os.listdir(directory_path):
        # Construct the full file path
        file_path = os.path.join(directory_path, filename)
        
        # Check if it's a file and not a directory
        if os.path.isfile(file_path):
            # Process the file based on its extension
            if file_path.endswith('.ktr'):
                print('file_path is', file_path)
                file_name = os.path.basename(file_path)
                folder_name = os.path.splitext(file_name)[0]
                process_ktr(file_path, folder_name)

# Start the processing with the main directory
main_directory_path = r'C:\Multiple_KTR_Updated\KJB_and_KTR_updated\simplepentaho'  # Replace with the path to your directory
process_directory(main_directory_path)

2024-06-11 14:57:06,409 - INFO - order elements extracted
2024-06-11 14:57:06,409 - INFO - The output has been saved to 'hop_order.txt'
2024-06-11 14:57:06,409 - INFO - step elements extracted
2024-06-11 14:57:06,409 - INFO - The output has been saved to 'steps.txt'


file_path is C:\Multiple_KTR_Updated\KJB_and_KTR_updated\simplepentaho\Process one table.ktr
The output has been saved to 'hop_order.txt'
The output has been saved to 'steps.txt'


2024-06-11 14:57:41,996 - INFO - The converted code is saved in spark_code\Process one table folder


Based on the hop order and steps file, the sequence of operations is as follows:

1. Number of rows in ${TABLENAME}
2. rows-${TABLENAME}.txt

Here is the corresponding Pyspark code:

```python
# Step 1: Number of rows in ${TABLENAME}
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CountRows").getOrCreate()

# Assuming that the table is already loaded into a DataFrame named df
# Replace 'df' with the actual DataFrame variable
df = spark.table("TABLENAME") # Replace 'TABLENAME' with the actual table name
count = df.count()

# Step 2: rows-${TABLENAME}.txt
# Writing the count to a text file
with open("/tmp/rows-TABLENAME.txt", "w") as file: # Replace 'TABLENAME' with the actual table name
    file.write(str(count))
```

Please replace 'TABLENAME' with the actual table name in your database. Also, ensure that the DataFrame 'df' is pointing to the correct table data.
Copied 'hop_order.txt' to 'spark_code\Process one table'.
Copied 'steps.txt' to 'spark_code\Proce

In [10]:
import os

def convert_txt_to_py(directory):
    for root, _, files in os.walk(directory):
        for file in files:
            if file == 'pyspark_code.txt':
                txt_file_path = os.path.join(root, file)
                py_file_path = os.path.join(root, 'pyspark_code.py')
                
                try:
                    # Read the content of the txt file
                    with open(txt_file_path, 'r') as txt_file:
                        lines = txt_file.readlines()

                    # Write the content to the py file, with each line commented
                    with open(py_file_path, 'w') as py_file:
                        py_file.write("# This file was converted from a .txt file\n\n")
                        for line in lines:
                            py_file.write(f"# {line}")
                    
                    print(f"Converted: {txt_file_path} to {py_file_path}")

                except Exception as e:
                    print(f"An error occurred while converting {txt_file_path}: {e}")

# Example usage
directory = r'C:\Multiple_KTR_Updated\KJB_and_KTR_updated\spark_code'
convert_txt_to_py(directory)

Converted: C:\Multiple_KTR_Updated\KJB_and_KTR_updated\spark_code\Process one table\pyspark_code.txt to C:\Multiple_KTR_Updated\KJB_and_KTR_updated\spark_code\Process one table\pyspark_code.py
