In [None]:
input_data_file = 'data/Example Data - JSON/sensors-africa.json'
#input_data_file = 'data/Example Data - JSON/airly.json'
#input_data_file = 'data/Example Data - JSON/sensors-africa.json'
#input_data_file = 'data/Example Data - CSVs/TTTR/clarity.csv'

## Import what we need and set up our AWS session

In [19]:
from dotenv import load_dotenv
import os
import boto3
import pandas as pd
import json
import re
import importlib
import sys
from datetime import datetime
from langchain.llms.bedrock import Bedrock
from langchain.prompts import PromptTemplate


In [3]:
load_dotenv();

In [22]:
session = boto3.Session(
    region_name=os.getenv("AWS_REGION"), profile_name=os.getenv("AWS_PROFILE")
)
bedrock_runtime = boto3.client(
    service_name='bedrock-runtime',    
    region_name=os.environ.get("AWS_REGION", None))

sns_client = session.client('sns')
s3 = session.client('s3')


## Functions

In [5]:
def preprocess_csv(csv_file):

    df = pd.read_csv(csv_file, nrows=0)
    num_expected_cols = len(df.columns)
    
    df = pd.read_csv(csv_file)

    # Identify numeric and string columns
    numeric_cols = df.select_dtypes(include=['number']).columns 
    string_cols = df.select_dtypes(exclude=['number']).columns

    # String columns - fill NaN with empty string 
    df[string_cols] = df[string_cols].fillna('')  

    # Numeric cols - fill NaN with Pandas NaN
    df[numeric_cols] = df[numeric_cols].fillna(pd.NA)

    # Trim or drop rows with extra columns
    df = df.iloc[:, :num_expected_cols]  
    
    return df

In [6]:
def load_source_file(input_data_file):
  ext = input_data_file.split('.')[-1]
  
  if ext == 'json':
    data = json.load(open(input_data_file))
    threshold = 50000
    if len(str(data)) > threshold:
      sample_data = str(data)[:threshold]
    else:
      sample_data = data

  elif ext == 'csv':
      data = preprocess_csv(input_data_file)
      sample_data = data

  return sample_data, data

In [7]:
def get_final_code(text):
  # Extract code from markdown codeblock   
  pattern = r'```python(.*?)\n```'
  match = re.search(pattern, text, re.DOTALL)
  if match:
    return match.group(1)
  return ''

In [8]:
def create_df(input_data_file):
  ext = input_data_file.split('.')[-1]
  if ext == 'json':
    data = json.load(open(input_data_file))
    threshold = 50000
    if len(str(data)) > threshold:
      sample_data = str(data)[:threshold]
    else:
      sample_data = data
    output = llm(p1_prompt_template.format(input=sample_data))
    convert_code = (get_final_code(output))
    exec(convert_code,globals())
    df = convert_to_df(data)
  elif ext == 'csv':
      df = preprocess_csv(input_data_file)
      convert_code = ''
  return df, convert_code

In [9]:
def get_table_type(llm_output):
    match = re.search(r'<tableType>(\w+)</tableType>', llm_output)
    if match:
        return match.group(1)
    else:
        return None

In [21]:
def branch_LLM_invocations(file_name, is_existing):
    if not is_existing:
        human_in_the_loop(file_name)
    else:
        pass # call some python function later to output SQL tables
def human_in_the_loop(file_name):
    message = "New data format is detected! Please check the file: " + file_name
    msg_body = json.dumps(message)
    sns_client.publish(
                TopicArn=sns_topic_arn,
                Message=json.dumps({'default': msg_body}),
                MessageStructure='json')
    print("\nSNS published message: \n" + str(message))

## Templates

In [11]:
p1_template = """Given the following input file containing air quality data from sensors:
    <input> {input} </input> 
    Provide some python code to convert the data into a pandas dataframe. It should be a function that takes in one input, the input data, as a `dict`.
    `def convert_to_df(input_data):`
    The function should return a panda dataframe.
    <requirements>
    Make sure to import all libraries required.
    Flatten the json as required.
    The timestamp should always be stored as a timestamp. The code must handle conversion. Be mindful of string vs integer. Use a <scratchpad> to Think step by step and insure that the function will work properly when the input data is fed to it.
    </requirements>
    
    OUTPUT: Wrap your final code in the <FinalCode> XML tag and make sure it is formatted as a python code block in Markdown
    EXAMPLE: <FinalCode>```python
import pandas as pd
from datetime import datetime

def convert_to_df(input_data):
        ...
``` 
</FinalCode>
    """

p1_prompt_template = PromptTemplate(
    input_variables=["input"],
    template=p1_template
)

template_pivot = """Type1 table format:
- Each row contains a timestamp
- Each column represents a measurement taken at that time
- Each sensor reading has its own column 
- Type1 example: 
<Type1TableExample>
City,State,Country,Latitude,Longitude,pollution_ts,aqius,mainus,aqicn,maincn,wether_ts,pr,hu,ws,wd,ic
Accra,Greater Accra,Ghana,-0.186964,5.603717,2023-11-25T23:00:00.000Z,74,p2,33,p2,26,1011,82,4.12,272,04n
</Type1TableExample>

Type2 table format 
- Each row contains a timestamp
- One column contains the tag name or value type. This column will not contain the measurements, but instead a string of what the measurement is collecting.
- One column contains the measurement value for that tag. This column will not contain the tag name or value type, but simply the value.
- This table type will generally have only a few columns (3-9)
- Type2 example: 
<Type2TableExample>
timestamp;location;sensor;software_version;value_type;value
2023-11-30 20:37:40.316811+00:00;3615;4829;NRZ-2020-129;P2;14.25
2023-11-30 20:37:40.316811+00:00;3615;4829;NRZ-2020-129;P1;16.25
2023-11-30 20:37:40.316811+00:00;3615;4829;NRZ-2020-129;P0;10.0
</Type2TableExample>

Given the following table:
<table>
{rawtable}
</table>


Analyze the input table and respond with:

1. The identified table type (Type1 or Type2). Write the answer in a <tableType> XML tag. Think step by step in <scratchpad> to properly identify the type.

2. If the table is Type2:
   - Provide a Python function to transform the table to Type1 format. 
   - The Python functin should be called convert_to_type1 and take in a single input, the input dataframe
   - Use markdown to write the function in a python codeblock

If already Type1, state no transformation needed.

<example>
table:
timestamp;location;sensor;software_version;value_type;value
2023-11-30 20:37:40.316811+00:00;3615;4829;NRZ-2020-129;P2;14.25
2023-11-30 20:37:40.316811+00:00;3615;4829;NRZ-2020-129;P1;16.25
2023-11-30 20:37:40.316811+00:00;3615;4829;NRZ-2020-129;P0;10.0
2023-11-30 20:37:08.424916+00:00;3615;4829;NRZ-2020-129;P2;12.0
2023-11-30 20:37:08.424916+00:00;3615;4829;NRZ-2020-129;P1;12.0
2023-11-30 20:37:08.424916+00:00;3615;4829;NRZ-2020-129;P0;9.0
2023-11-30 20:36:36.545464+00:00;3615;4829;NRZ-2020-129;P2;13.4
2023-11-30 20:36:36.545464+00:00;3615;4829;NRZ-2020-129;P1;16.0
2023-11-30 20:36:36.545464+00:00;3615;4829;NRZ-2020-129;P0;8.0
2023-11-30 20:36:04.634028+00:00;3615;4829;NRZ-2020-129;P2;10.5

<scratchpad>This table has the following key characteristics:
A timestamp column
Columns indicating metadata like location, sensor, software version
Columns for value_type and value
Each row contains a timestamp, metadata about the reading, the type of value, and the actual value. Different value types are captured in different rows for the same timestamp.
This matches the description of a Type 2 table format:
Timestamp column
One column for tag name/value type
One column for the measurement value
Multiple value types captured per timestamp across rows
Therefore, I would classify this as a Type 2 table format. Therefore I need to write a function to transform it to Type1. To do so, I need to look at the values in value_type and pivot them so they become columns. </scratchpad>
<tableType>Type2</tableType>

2: Here is the python code to transform the table to Type1 format:
```python
function:
import pandas as pd  

def convert_to_type1(df):
    df_pivoted = df.pivot(index=['timestamp', 'location', 'sensor', 'software_version'], 
                          columns='value_type',
                          values='value')
    
    df_pivoted.reset_index(inplace=True)

    return df_pivoted
```
</example>

Table type analysis and transformation function (if applicable):
"""

prompt_template_pivot = PromptTemplate(
    input_variables=["rawtable"],
    template=template_pivot
)

In [12]:
templateTransform = """Given this dataframe:
{input_df}
Perform the following operations to create a python function called `transform_df`:
<tasks>
For each column, use the column name and the values, to determine what the column likely contains. Provide a description. Store those desciptions in a <description> XML tag for every column in the dataframe. This does not belong in the function. It is to help you think.

<context>
For context, this is data from air quality sensors, so some common items are:
temperature (in Celsius), humidity, relative humidity, PM1 are extremely fine particulates with a diameter of fewer than 1 microns. PM2.5 (also known as fine particles) have a diameter of less than 2.5 microns. PM10 means the particles have a diameter less than 10 microns, or 100 times smaller than a millimeter.
in some dataframes, there will only be a location id (numerical), sometimes, a city. Store the most relevant location, if available, ALWAYS as a string.
<context>
The output file name for all rows will be: {input_filename}
When creating the new dataframe, make sure to properly define the datatype. If not certain, string is OK.
THEN, write a python function to convert it to a dataframe of this format:
<output_structure>
{output_structure}
</output_structure>
Do your best to match the input dataframe to the target. If there are no values for a column of the output_structure, write None

Output your code in python markdown, in <FinalCode> XML tag 
</tasks>
EXAMPLE:
<FinalCode> ```python
import pandas as pd

def transform_df(df):
    
    output_df = pd.DataFrame(columns=['deviceId', 'timestamp', 'locationId', 'geo_lat', 'geo_lon', 'pm1', 'pm10', 'pm25', 'temperature', 'pressure', 'humidity', 'sourcefile'])
    
    output_df['deviceId'] = df['sn']
    output_df['timestamp'] = df['timestamp']
    output_df['locationId'] = None
    output_df['geo_lat'] = df['lat']
    output_df['geo_lon'] = df['lon'] 
    output_df['pm1'] = df['pm1']
    output_df['pm10'] = df['pm10']
    output_df['pm25'] = df['pm25']
    output_df['temperature'] = df['temp'] 
    output_df['pressure'] = None 
    output_df['humidity'] = df['rh']
    output_df['sourcefile'] = df['url']
    
    return output_df
```
</FinalCode>

GO!
"""
output_structure = """
deviceId | timestamp | locationId | geo_lat | geo_lon | pm1 | pm10 | pm25 | temperature | pressure | humidity | sourcefile 

Make sure you store the data in the dataframe with the following datatype:
<datatype>
deviceId          object
timestamp    datetime64[ns]
locationId        object
geo_lat           object   
geo_lon           object
pm1               float64
pm10              float64
pm25              float64
temperature       float64
pressure          float64
humidity          float64
sourcefile        object
</datatype>

"""
prompt_template2 = PromptTemplate(
    input_variables=["input_df", "output_structure", "input_filename"],
    template=templateTransform
)

## Processing

Process the new data file type

In [None]:
cleaned_filename = re.sub(r'[\\/:*?"<>|.]+', '_', os.path.splitext(input_data_file)[0])[:255].strip(' .')
fileformat = re.sub(r'[\- ]+', '_',cleaned_filename) #version

In [10]:
llm = Bedrock(model_id="anthropic.claude-v2:1", client=bedrock_runtime, model_kwargs={"temperature":0,"max_tokens_to_sample": 8000, "top_k": 250, "top_p": 1})

In [32]:
# decide to call existing python code or human_in_the_loop
# Create a topic for sns to publish and sqs to subscribe
sns_topic = sns_client.create_topic(Name="notify-operator")
sns_topic_arn = sns_topic['TopicArn']

if os.path.isdir(os.path.join("functions", fileformat)):
    print(sns_topic_arn)
    branch_LLM_invocations(fileformat, True) #sns notif
else:
    print("data format exists, will retrieve existing functions")



data format exists, will retrieve existing functions


In [14]:
#Logic to create the initial df
df, convert_code = create_df(input_data_file)
df.head(3)


Unnamed: 0,timestamp,id,location,sensor,software_version,value,value_type
0,2023-11-30 20:37:40.316811+00:00,63136960,3615,4829,NRZ-2020-129,14.25,P2
1,2023-11-30 20:37:40.316811+00:00,63136960,3615,4829,NRZ-2020-129,16.25,P1
2,2023-11-30 20:37:40.316811+00:00,63136960,3615,4829,NRZ-2020-129,10.0,P0


Pivot row / columns as needed if necessary

In [15]:
rawtable = df.head(10).to_csv(sep=';', index = False)
output_transformation = llm(prompt_template_pivot.format(rawtable=rawtable))
table_type = get_table_type(output_transformation)
if table_type == 'Type1':
    dft1 = df
    code_pivot = ''
else: 
    if table_type == 'Type2':
        code_pivot = get_final_code(output_transformation) 
        exec(code_pivot,globals())
        dft1 = convert_to_type1(df)
    else:
        raise ValueError('Invalid table type')
    
dft1.head(3)

value_type,timestamp,id,location,sensor,software_version,P0,P1,P2
0,2023-11-30 20:32:42.592915+00:00,63136769,3615,4829,NRZ-2020-129,10.0,18.0,14.0
1,2023-11-30 20:33:14.522366+00:00,63136784,3615,4829,NRZ-2020-129,9.0,18.0,15.0
2,2023-11-30 20:33:46.429630+00:00,63136804,3615,4829,NRZ-2020-129,10.0,17.5,15.75


Get to the final table with standardized column names

In [16]:
output_tf = llm(prompt_template2.format(input_df=dft1.head(10).to_csv(sep=';', index = False), output_structure=output_structure, input_filename=input_data_file))
code_tf  = (get_final_code(output_tf))
exec(code_tf, globals())
df_tf = transform_df(dft1)
df_tf.head(3)


Unnamed: 0,deviceId,timestamp,locationId,geo_lat,geo_lon,pm1,pm10,pm25,temperature,pressure,humidity,sourcefile
0,63136769,2023-11-30 20:32:42.592915+00:00,3615,,,10.0,18.0,14.0,,,,sensors-africa.json
1,63136784,2023-11-30 20:33:14.522366+00:00,3615,,,9.0,18.0,15.0,,,,sensors-africa.json
2,63136804,2023-11-30 20:33:46.429630+00:00,3615,,,10.0,17.5,15.75,,,,sensors-africa.json


Save to S3

In [17]:
now = datetime.now()
timestamp = now.strftime("%Y%m%d_%H%M%S")

bucket_name = os.getenv('S3_BUCKET')

# Save the dataframe to a file in S3
s3.put_object(
    Bucket=bucket_name,
    Key="processed/transformed/csv/" + cleaned_filename + "_" + timestamp + ".csv",
    Body=df_tf.to_csv(index=False));

# Save the dataframe to a file in S3
s3.put_object(
    Bucket=bucket_name,
    Key="processed/raw/csv/" + cleaned_filename + "_" + timestamp + ".csv",
    Body=dft1.to_csv(index=False));

Store the functions

In [18]:
folder = os.path.join("functions", fileformat)
if not os.path.exists(folder):
    os.makedirs(folder)

with open(os.path.join(folder, "code_tf.py"), "w") as f:
    f.write(code_tf)
    
with open(os.path.join(folder, "code_pivot.py"), "w") as f:
    f.write(code_pivot)
    
with open(os.path.join(folder, "convert_code.py"), "w") as f:
    f.write(convert_code)