In [18]:
import threading
import func_timeout
import time
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from kafka import KafkaProducer, KafkaConsumer
from utils.meeyland_util import transferMeeyland
import json
from tqdm import tqdm
from consume.utils import Redis
from dotenv import load_dotenv
import os
load_dotenv(override=True)

True

In [19]:
class Kafka:
    def __init__(self, broker_id):
        self.kafka_host = os.getenv('KAFKA_HOST')
        self.broker_id = broker_id
        self.kafka_port = os.getenv(f'KAFKA_PORT_{self.broker_id}')
        self.producer = KafkaProducer(bootstrap_servers=['localhost:9092', 'localhost:9093', 'localhost:9094'])
        #self.consumer = KafkaConsumer(bootstrap_servers=[f'{self.kafka_host}:{self.kafka_port}'], auto_offset_reset='earliest', enable_auto_commit=True, group_id=self.kafka_group_id,value_deserializer=lambda x: json.loads(x.decode('utf-8')))

    def kafka_consumer(self, kafka_group_id, kafka_topic):
        """_summary_

        Args:
            kafka_group_id (_type_): group id of consumer
            kafka_topic (_type_): list topic to subscribe

        Returns:
            _type_: consumer
        """
        consumer = KafkaConsumer(
            bootstrap_servers=['localhost:9092', 'localhost:9093', 'localhost:9094'],
            auto_offset_reset="earliest",
            enable_auto_commit=False,
            group_id=kafka_group_id,
            value_deserializer=lambda x: json.loads(x.decode("utf-8")),
            max_poll_records=10
        )
        consumer.subscribe(kafka_topic)
        return consumer

    def send_data(self, data,kafka_topic):
        """_summary_

        Args:
            data (_type_): data to send to kafka
            kafka_topic (_type_): topic to send data

        Returns:
            _type_: False if send fail, True if send success
        """
        status = self.producer.send(kafka_topic, value = json.dumps(data).encode('utf-8'))
        self.producer.flush()
        if status.is_done == True:
            return True
        else:
            return False


    def create_consumer_and_subscribe(self, kafka_group_id, kafka_topic):
        """_summary_

        Args:
            kafka_group_id (_type_): group id of consumer
            kafka_topic (_type_): list topic to subscribe

        Returns:
            _type_ : consumer
        """
        consumer = KafkaConsumer(bootstrap_servers=['localhost:9092', 'localhost:9093', 'localhost:9094'], auto_offset_reset='earliest', enable_auto_commit=True, group_id=kafka_group_id,value_deserializer=lambda x: x.decode('utf-8'))
        consumer.subscribe(kafka_topic)
        return consumer

In [20]:
import google.generativeai as genai

import time
import gradio as gr


genai.configure(api_key="AIzaSyAiHLi5BQN2Truo7mrSpDRRo6G2TnnUGsA")

for m in genai.list_models():
  if 'generateContent' in m.supported_generation_methods:
    print(m.name)

model = genai.GenerativeModel('gemini-pro')

models/gemini-1.0-pro
models/gemini-1.0-pro-001
models/gemini-1.0-pro-latest
models/gemini-1.0-pro-vision-latest
models/gemini-1.5-flash
models/gemini-1.5-flash-001
models/gemini-1.5-flash-latest
models/gemini-1.5-pro
models/gemini-1.5-pro-001
models/gemini-1.5-pro-latest
models/gemini-pro
models/gemini-pro-vision


In [21]:
def generate_content_gemini(input_sentence):
    response = model.generate_content(
        input_sentence,
        safety_settings={
            'HARM_CATEGORY_SEXUALLY_EXPLICIT':'block_none',
            'HARM_CATEGORY_HATE_SPEECH':'block_none',
            'HARM_CATEGORY_HARASSMENT':'block_none',
            'HARM_CATEGORY_DANGEROUS_CONTENT':'block_none'
        }
    )
    try:
    # print(response.text)
        return response.text
    except:
        print(response.prompt_feedback)
        # return None
        return ""

In [22]:
# generate_content_gemini("Crawl data")

In [23]:
test_insights = """
Q: How to build MLOPs for predict realestate price in production?
A: First. You have to crawl data. You extract, transform and insert to database. Moreover, you have to build training dataset to build AI model. To more efficiently, you can ensemble model to make predict result more stable"
-----------

Q: What should I do after collecting data?
A: Since the collected data has a lot of noise, the collected data needs to be cleaned first and put into a certain format. After the data cleaning step, the cleaned data can be stored in the database and used in the next stages.

Q: With the data collected and newly updated into the database, it is possible to build a training set to train the model and continue to update the knowledge for the correct AI service?
A: Of course. You can do anything on this clean data file, including training AI model. The process of processing data and building datasets for AI services, people go there is the process of building offline batch data: engineer feature / extract feature, transform feature, ...

"""

In [24]:
def _crawl_data(source = 'meeyland'):
    return f"Start to crawl data from {source}"

def _clean_data(source = 'meeyland'):
    return f"Start to clean data from {source}"

def _insert_data(source = 'meeyland'):
    return f"Start to insert clean data to database"

def _build_offline_batch_data():
    return f"Build Offline batch data to train model"

def _train_price_prediction_model(model_name):
    return f"Start to train {model_name}"

def _get_information_about_train_experiment(experiment_id):
    return f"Get all metrics from {experiment_id}"

def _train_ensemble_model():
    return f"Start to train ensemble model"


In [25]:
functions_description = """
Function: _crawl_data
    Description:
        Crawl realestate data from source
    Params:
        source
        - Enum: ['meeyland', 'muaban']
        - Default: 'meeyland'
        - Sample: 'meeyland'
    Output:
        - None


Function: _clean_data
    Description:
        - Clean raw realestate data
    Params:
        source
        - Enum: ['meeyland', 'muaban']
        - Default: 'meeyland'
        - Sample: 'meeyland'
    Output:
        - None

Function: _insert_data
    Description:
        - Insert clean data to database
    Params:
        source
        - Enum: ['meeyland', 'muaban']
        - Default: 'meeyland'
        - Sample: 'meeyland'
    Output:
        - None

Function: _build_offline_batch_data
    Decription:
        - Build batch data for training AI model: extract feature, transform feature for training AI model phrase
    Params:
    Output:
        - None

Function: _train_price_prediction_model
    Description:
        - Training Price Prediction Model. Support models: lightgbm, catboost, xgboost
    Params:
        source
        - Enum: ['cat', 'lgbm', 'xgb']
        - Default: 'meeyland'
        - Sample: 'meeyland'
    Output:
        - None

Function: _get_information_about_train_experiment
    Description:
        Get machine learning metrics about train experiment:
            - explained_variance
            - neg_mean_absolute_percentage_error
            - neg_root_mean_squared_error
            - max_error
    Params:
        experiment_id
        - string
        - Default: "hcm_knr_realestate_DATN_V4"
        - Sample: "hcm_knr_realestate_DATN_V4"
    Output:
        - Information about each training metrics

Function: _train_ensemble_model
    Description:
        - Train ensemble model from single pretrained models: lgbm, xgb, ...
    Params:
    Output:
        - None
"""

In [26]:
CONTROLLER_PROMPT_TEMPLATE = """You are a controller, you receive below query from user, utilize the insights and choose what is the action from given functions

Query: $$QUERY$$

Insights: $$INSIGHTS$$

List function:
$$FUNCTIONS_DECRIPTION$$

The response should be exactly like format and don't say anything else:
```json
{
    "observation": <what is the current situation, what should follow>,
    "guidelines": <what is the most suitable action in this situation and why>,
    "actions": [{
        "fn": <function name 1>,
        "params": <function param 1>
    }, {
        "fn": <function name 2>,
        "params": <function param 2>
    }]
}
```
RESPONSE:
```json
"""


In [27]:
faulty_insights = """"""

In [28]:
import json

In [29]:
def get_best_candidate(obj):
    actions = obj['actions']
    try:
        if len(actions):
            return actions[0]
    except:
        return actions

In [35]:
query = "I want to train model to predict realestate price. I have a clean database in previous phrase. Now  i want to extract feature and build data for training phrase"
test_inputs = CONTROLLER_PROMPT_TEMPLATE.replace("$$QUERY$$", query).replace("$$INSIGHTS$$", test_insights).replace("$$FUNCTIONS_DECRIPTION$$", functions_description)
# eval(generate_content_gemini(test_inputs))
result = generate_content_gemini(test_inputs)

print(result)
result = json.loads(result.replace("`", "").replace("\n", ""))
# func_obj = get_best_candidate(result)
# print(func_obj)

# if func_obj['fn'] == '_crawl_data':
#     print("ok")

{
    "observation": "The user has a clean database from previous phrase and now wants to extract feature and build data for training phrase.",
    "guidelines": "To extract feature and build data for training phrase, the most suitable action is to build offline batch data.",
    "actions": [{
        "fn": "_build_offline_batch_data",
        "params": {}
    }]
}
```


In [31]:
# query = "Train ensemble model"
# test_inputs = CONTROLLER_PROMPT_TEMPLATE.replace("$$QUERY$$", query).replace("$$INSIGHTS$$", test_insights).replace("$$FUNCTIONS_DECRIPTION$$", functions_description)
# # eval(generate_content_gemini(test_inputs))
# print(generate_content_gemini(test_inputs))


In [32]:
def get_func_obj_by_response(response):
    result = json.loads(response.replace("`", "").replace("\n", ""))
    func_obj = get_best_candidate(result)
    return func_obj

In [33]:
# from tqdm import tqdm

# from get_raw_data import crawl_meeyland_by_page
# from clean_raw_data import processMeeyland

In [37]:
import time
import gradio as gr
import os
import requests



KafkaInstance = Kafka(broker_id = 0)
MAX_THREAD = 10

def slow_echo(message, history):
    query = message
    promp_with_input = CONTROLLER_PROMPT_TEMPLATE.replace("$$QUERY$$", query).replace("$$INSIGHTS$$", test_insights).replace("$$FUNCTIONS_DECRIPTION$$", functions_description)
    response = generate_content_gemini(promp_with_input)
    try:
        func_obj = get_func_obj_by_response(response)
    except:
        for retry in range(3):
            func_obj = get_func_obj_by_response(response)
            break

    if func_obj['fn'] == '_crawl_data':
        # total_data_count = 0
        # for page in tqdm(range(200, 203)):
        #     data = crawl_meeyland_by_page(page)
        #     total_data_count += len(data)
        #     if len(data):
        #         reply = f'Crawling batch {len(data)} realestates - Here is a Realestate Title Example: {data[0]}'
        #         yield reply
        #     else:
        #         yield "Crawling..."
        # yield f"Crawled {total_data_count}"

        os.system("tmux new-session -d -s crawl 'python src/helpers/chat_get_data.py'")
        yield "Crawl Job Starting..."

    elif func_obj['fn'] == "_clean_data":
        # consumer = KafkaInstance.kafka_consumer("raw_meeyland", ["raw_meeyland"])
        # cnt = 0
        # for msg in tqdm(consumer):

        #     if Redis().check_id_exist(f'meeyland_offset_{msg.offset}', 'meeyland_clean_rawdata'):
        #         print("Ignore Processed Messages")
        #         continue
        #     Redis().add_id_to_set(f'meeyland_offset_{msg.offset}', 'meeyland_clean_rawdata')
        #     clean_msg = processMeeyland(msg)
        #     try:
        #         yield f'{clean_msg["propertyBasicInfo"]}'
        #     except:pass
        #     cnt += 1
        #     if cnt >= 3:
        #         break
        os.system("tmux new-session -d -s clean 'python src/helpers/chat_clean_data.py'")
        yield "Clean Job Starting..."
    elif func_obj['fn'] == "_insert_data":
        os.system("tmux new-session -d -s insert 'python src/helpers/chat_insert_data.py'")
        yield "Insert to Database Job Starting..."

    elif func_obj["fn"] == "_build_offline_batch_data":
        yield "Extract Feature Job Starting..."

        bkprice_server = os.getenv("BKPRICE_SERVER")
        url = f"{bkprice_server}/build-offline-batch-data"


        payload = {}
        headers = {}

        response = requests.request("POST", url, headers=headers, data=payload)

        response = response.json()
        example = response['sample_data']

        url = f"{bkprice_server}/build-offline-batch-data"

        payload = {}
        headers = {}
        response = requests.request("POST", url, headers=headers, data=payload)
        response = response.json()

        yield f"Here is an example: {example}"


    # yield str(func_obj)
gr.ChatInterface(slow_echo).launch(share=True)

[[34m2024-06-25T18:18:14.242+0000[0m] {[34mconn.py:[0m380} INFO[0m - <BrokerConnection node_id=bootstrap-1 host=localhost:9093 <connecting> [IPv4 ('127.0.0.1', 9093)]>: connecting to localhost:9093 [('127.0.0.1', 9093) IPv4][0m
[[34m2024-06-25T18:18:14.271+0000[0m] {[34mconn.py:[0m1205} INFO[0m - Probing node bootstrap-1 broker version[0m
[[34m2024-06-25T18:18:14.273+0000[0m] {[34mconn.py:[0m410} INFO[0m - <BrokerConnection node_id=bootstrap-1 host=localhost:9093 <connecting> [IPv4 ('127.0.0.1', 9093)]>: Connection complete.[0m
[[34m2024-06-25T18:18:14.383+0000[0m] {[34mconn.py:[0m1267} INFO[0m - Broker version identified as 2.5.0[0m
[[34m2024-06-25T18:18:14.385+0000[0m] {[34mconn.py:[0m1268} INFO[0m - Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup[0m


[[34m2024-06-25T18:18:14.488+0000[0m] {[34mconn.py:[0m919} INFO[0m - <BrokerConnection node_id=0 host=localhost:9092 <connected> [IPv4 ('127.0.0.1', 9092)]>: Closing connection. [0m
Running on local URL:  http://127.0.0.1:7863
[[34m2024-06-25T18:18:15.536+0000[0m] {[34m_client.py:[0m1026} INFO[0m - HTTP Request: GET https://api.gradio.app/pkg-version "HTTP/1.1 200 OK"[0m
[[34m2024-06-25T18:18:15.546+0000[0m] {[34m_client.py:[0m1026} INFO[0m - HTTP Request: GET http://127.0.0.1:7863/startup-events "HTTP/1.1 200 OK"[0m
[[34m2024-06-25T18:18:17.602+0000[0m] {[34m_client.py:[0m1026} INFO[0m - HTTP Request: HEAD http://127.0.0.1:7863/ "HTTP/1.1 200 OK"[0m
[[34m2024-06-25T18:18:18.476+0000[0m] {[34m_client.py:[0m1026} INFO[0m - HTTP Request: GET https://api.gradio.app/v2/tunnel-request "HTTP/1.1 200 OK"[0m
Running on public URL: https://666202a90cfcc93288.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio d



[[34m2024-06-25T18:23:14.561+0000[0m] {[34mconn.py:[0m380} INFO[0m - <BrokerConnection node_id=0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to localhost:9092 [('127.0.0.1', 9092) IPv4][0m
[[34m2024-06-25T18:23:14.562+0000[0m] {[34mconn.py:[0m410} INFO[0m - <BrokerConnection node_id=0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Connection complete.[0m
[[34m2024-06-25T18:23:14.563+0000[0m] {[34mconn.py:[0m919} INFO[0m - <BrokerConnection node_id=bootstrap-1 host=localhost:9093 <connected> [IPv4 ('127.0.0.1', 9093)]>: Closing connection. [0m


duplicate session: clean
