In [7]:
from dotenv import load_dotenv
import os

# load .env
load_dotenv()

# retrieve env variables
qwen_api_key = os.getenv('QWEN_API_KEY')
qwen_openai_sdk_base_url = os.getenv('QWEN_OPENAI_SDK_BASE_URL')

In [19]:
from openai import OpenAI
client = OpenAI(
            api_key=qwen_api_key, 
            base_url=qwen_openai_sdk_base_url,
        )
completion = client.chat.completions.create(
            model='qwen-turbo',
            messages=[
                {'role': 'system', 'content': """You are a Spark SQL expert."""},
                {'role': 'user', 'content': '你是谁'}],
            temperature=0.1,
            )
print(completion.choices[0].message.content)

我是阿里云的智能助手，专注于提供关于阿里云产品和服务的信息。虽然我的主要专长不是Spark SQL，但我可以提供一些基本指导和信息。如果你有关于Spark SQL的具体问题或需要帮助，请告诉我！


In [6]:
#!python -m pip install -i https://mirrors.aliyun.com/pypi/simple/ openai

## 1. Calculate Logic from source data to target result

In [2]:
import os
import logging

def read_spark_sql_source_code(filePath):

    # logging config
    logging.basicConfig(level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s')
    if os.path.exists(filePath): 
        with open(filePath, 'r') as file:
            code = file.read()
    else:
        logging.error(f'{filePath} does not exist, please check it again!')
        code = ''
    return code

In [5]:
def extract_table_from_spark_sql(SQL:str, llm_model:str='qwen2.5-72b-instruct'):
    import os
    import re
    from openai import OpenAI
    
    msg = f"""Spark SQL Code: ```{SQL}```
    requirements:
    1. Give the result in the Json format.
        the json contains 'spark_data_lineage`, which is a list containing
        `target_result`: targe result table name or file name,
        `source_data`: the list of table names or source file names used to get the target result , 
        `transformation`: the calculation steps show how `target_result` is calculated from the `source_data`. 
                                The step should be a description sentence including operation/transformation and the dataframe/table/view involved
                                take care of the column match.
    2. Consider the input data from DB or files as `source_data`. consider the calcaluted data that is shown to user or saved as the `target_result`.
    3. The intermediate view or table or dataframe cannot be views as `source_data`.
    4. Give the json result only.
    """
    try:
        client = OpenAI(
            api_key=qwen_api_key, 
            base_url=qwen_openai_sdk_base_url,
        )
        completion = client.chat.completions.create(
            model=llm_model,
            messages=[
                {'role': 'system', 'content': 'You are a Spark SQL expert. Given a Spark SQL, please help user to identify the datasource and target result and analize the calculation logic from datasource to targe result'},
                {'role': 'user', 'content': msg}],
            )
        pattern = r'```json(.*?)```'

        matches = re.findall(pattern, completion.choices[0].message.content, re.DOTALL)
        return matches[0]
    except:
        return "```json\n\n```"

In [6]:
code = read_spark_sql_source_code('./data/spark_sql_1.txt')
result = extract_table_from_spark_sql(code, llm_model='qwen2.5-72b-instruct')
print(result)


{
  "spark_data_lineage": [
    {
      "target_result": "result_table1",
      "source_data": ["datas/agent.log"],
      "transformation": [
        "Read the data from 'datas/agent.log' and split each line into three columns (t1, t2, t3).",
        "Filter the rows where the value in column t2 is even.",
        "Create a DataFrame df1 with the filtered data and define the schema (t1: StringType, t2: IntegerType, t3: IntegerType).",
        "Register df1 as a temporary view 't'.",
        "Execute SQL query to select all columns from view 't' and add a new column 'tp' with a constant value 'r'.",
        "The result of the SQL query is stored in DataFrame result_table1."
      ]
    },
    {
      "target_result": "result_table2",
      "source_data": ["datas/agent.log"],
      "transformation": [
        "Read the data from 'datas/agent.log' and split each line into three columns (t1, t2, t3).",
        "Filter the rows where the value in column t3 is even.",
        "Create a Data

## Compare the Difference between two data

In [273]:
def compare_spark_code_difference(original_sql_lineage:str, revised_sql_lineage:str, llm_model:str='qwen2.5-72b-instruct'):
    import os
    import re
    from openai import OpenAI
    
    msg = f"""
    Original Spark SQL Code Data Lineage: ```{original_sql_lineage}```
    Revised Spark SQL Code Data Lineage: ```{revised_sql_lineage}```

    requirements:
    1. Give the result in the Json format. Skip irrelated calculation steps.
        the json contains 'spark_data_lineage`, which is a list containing
        `original_target_result`: targe result table name or file name of the original spark code,
        `original_source_data`: the list of table names or source file names used to get the target result,
        `revised_target_result`: targe result table name or file name of the revised spark code,
        `revised_source_data`: targe result table name or file name of the revised spark code,
        `transformation_logic_change`:show the difference between the `original_target_result` and `original_target_result`. And explain why they differs by listing the changes in the transformation process. Show the differences only.
    2. Consider the input data from DB or files as source_data. consider the calcaluted data that is shown to user or saved as the target_result.
    3. The intermediate view or table or dataframe cannot be views as source_data.
    4. Give the json result only
    """
    try:
        client = OpenAI(
            api_key=qwen_api_key, 
            base_url=qwen_openai_sdk_base_url,
        )
        completion = client.chat.completions.create(
            model=llm_model,
            messages=[
                {'role': 'system', 'content': """You are a Spark SQL expert. 
                 Given two data lineages from two Spark SQLs, one is `Original Spark SQL Code Data Lineage`, the other one is `Revised Spark SQL Code Data Lineage`, 
                 Please help user to identify the changes between these two spark sql code data lineages.
                 The changes include the change of ```source_data``` and ```target_result``` and ```transformation_logic```"""},
                {'role': 'user', 'content': msg}],
            )
        pattern = r'```json(.*?)```'

        matches = re.findall(pattern, completion.choices[0].message.content, re.DOTALL)
        return matches[0]
    except:
        return "```json\n\n```"

In [291]:

llm_model = 'qwen2.5-72b-instruct'
code_before = read_spark_sql_source_code('./data/spark_sql_before_1.txt')
code_after = read_spark_sql_source_code('./data/spark_sql_after_1.txt')
result = compare_spark_code_difference(code_before, code_after, llm_model)

In [292]:
print(result)


{
  "spark_data_lineage": [
    {
      "original_target_result": "result_table1",
      "original_source_data": ["datas/agent.log"],
      "revised_target_result": "result_table2",
      "revised_source_data": ["datas/agent.log"],
      "transformation_logic_change": {
        "original_transformation": "rowRDD.filter(row => row.getInt(1) % 2 == 0)",
        "revised_transformation": "rowRDD.filter(row => row.getInt(2) % 2 == 0)",
        "difference": "The filter condition has changed from filtering rows where the second column (t2) is even to filtering rows where the third column (t3) is even."
      }
    }
  ]
}



### Optional 2

In [286]:
def compare_spark_code_difference(original_sql_lineage:str, revised_sql_lineage:str, llm_model:str='qwen2.5-72b-instruct'):
    import os
    import re
    from openai import OpenAI
    
    msg = f"""
    Original Spark SQL Code Data Lineage: ```{original_sql_lineage}```
    Revised Spark SQL Code Data Lineage: ```{revised_sql_lineage}```

    requirements:
    1. Give the result in the Json format.
        the json contains 'spark_data_lineage`, which is a list containing
        `original_target_result`: targe result table name or file name of the original spark code,
        `original_source_data`: the list of table names or source file names used to get the target result,
        `revised_target_result`: targe result table name or file name of the revised spark code,
        `revised_source_data`: targe result table name or file name of the revised spark code,
        `transformation_logic_change`: show the difference between the `original_target_result` and `original_target_result`. And explain why they differs by listing the changes in the transformation process. Only show the differences.
    2. Give the json result only """
    try:
        client = OpenAI(
            api_key=qwen_api_key, 
            base_url=qwen_openai_sdk_base_url,
        )
        completion = client.chat.completions.create(
            model=llm_model,
            messages=[
                {'role': 'system', 'content': """You are a Spark SQL expert. 
                 Given two data lineages from two Spark SQLs, one is `Original Spark SQL Code Data Lineage`, the other one is `Revised Spark SQL Code Data Lineage`, 
                 Please help user to identify the changes between these two spark sql code data lineages.
                 The changes include the change of ```source_data``` and ```target_result``` and ```transformation_logic```"""},
                {'role': 'user', 'content': msg}],
            )
        pattern = r'```json(.*?)```'

        matches = re.findall(pattern, completion.choices[0].message.content, re.DOTALL)
        return matches[0]
    except:
        return "```json\n\n```"

In [287]:
llm_model = 'qwen-turbo'

In [288]:
code_before = read_spark_sql_source_code('./data/spark_sql_before_1.txt')
data_lineage_before = extract_table_from_spark_sql(code_before,llm_model)
print(data_lineage_before)


{
  "spark_data_lineage": [
    {
      "target_result": "result_table1",
      "source_data": ["datas/agent.log"],
      "transformation": [
        "Read data from datas/agent.log into an RDD",
        "Filter rows where t2 is even and create DataFrame df1 with columns t1, t2, t3",
        "Create a temporary view 't' from df1",
        "Select all columns from view 't' and add a new column 'tp' with value 'r'",
        "The final result is stored in result_table1"
      ]
    }
  ]
}



In [289]:
code_after = read_spark_sql_source_code('./data/spark_sql_after_1.txt')
data_lineage_after = extract_table_from_spark_sql(code_after, llm_model)
print(data_lineage_after)


{
  "spark_data_lineage": [
    {
      "target_result": "result_table2",
      "source_data": ["datas/agent.log"],
      "transformation": [
        "Read data from datas/agent.log into an RDD named rowRDD",
        "Filter rows from rowRDD where r3 (third column) is even and create a new RDD named tableRDD2",
        "Create a DataFrame named df2 from tableRDD2 with schema (r1, r2, r3)",
        "Create a temporary view named r from df2",
        "Select columns r1, r2, r3 from r and add a new column tp with value 'r'",
        "The final result is stored in the DataFrame named result_table2"
      ]
    }
  ]
}



In [290]:
result = compare_spark_code_difference(data_lineage_before, data_lineage_after, llm_model)
print(result)


{
  "spark_data_lineage": [
    {
      "original_target_result": "result_table1",
      "original_source_data": ["datas/agent.log"],
      "revised_target_result": "result_table2",
      "revised_source_data": ["datas/agent.log"],
      "transformation_logic_change": [
        "The transformation logic differs in the following ways:",
        "- The original code filters rows where t2 is even, while the revised code filters rows where r3 is even.",
        "- The original code uses columns t1, t2, t3, while the revised code uses columns r1, r2, r3.",
        "- The original code creates a temporary view named 't', while the revised code creates a temporary view named 'r'.",
        "- The final result is stored in different tables: result_table1 vs result_table2."
      ]
    }
  ]
}

