In [None]:
%load_ext autoreload
%autoreload 2

import asyncio
import os
import json
import pandas as pd
from loguru import logger
from glob import glob

from mistral_fine_tuning.levels import KnowledgeGraph, extract_graph
from mistral_fine_tuning.keywords import Topics, extract_keywords
from mistral_fine_tuning.refine import Refinement, Quality, Rewrite, refine_joke, rewrite_joke
from mistral_fine_tuning.utils import read_fine_tuning_file

In [None]:
folder_path = '../data/interim/jokes'
jsonl_files = glob(os.path.join(folder_path, '*.jsonl'))
all_corrected_transcripts = []

for file in jsonl_files:
    with open(file, 'r', encoding='latin-1') as f:
        for line in f:
            data = json.loads(line)
            all_corrected_transcripts.append(data['corrected_transcript'])

df = pd.DataFrame(all_corrected_transcripts, columns=['text'])
df = df.drop_duplicates()

## Word Count

In [None]:
# remove duplicates
df = df.drop_duplicates()

# add word count column
df['word_count'] = df['text'].apply(lambda x: len(x.split()))


In [None]:
df.describe()

In [None]:
# histogram of word count
df['word_count'].hist(bins=100, figsize=(10, 6))

In [None]:
# remove the top 5% and then filter word_count > 10 and 
df = df[df['word_count'] < df['word_count'].quantile(0.95)]
df = df[df['word_count'] > 10]
df.shape

In [None]:
df.describe()

In [None]:
# histogram of word count
df['word_count'].hist(bins=25, figsize=(10, 6))

## Levels of Intentionality

In [None]:
async def levels():
    dataset = df['text'].tolist()
    sem = asyncio.Semaphore(3)

    async def rate_limited_extract_graph(text: str) -> KnowledgeGraph:
        async with sem:
            try:
                return await extract_graph(text, "es")
            except Exception as e:
                logger.error(f"Error processing text: {text}")
                logger.error(e)
                return None
    
    def safe_model_dump(graph):
        try:
            return graph.model_dump_json()
        except Exception as e:
            logger.error(f"Error converting graph: {e}")
            return None

    tasks_get_graphs = [rate_limited_extract_graph(text) for text in dataset]
    resp = await asyncio.gather(*tasks_get_graphs)
    df['graph'] = [safe_model_dump(graph) for graph in resp]

    df.to_json('../data/interim/jokes_with_graphs.jsonl', orient='records', lines=True)    

In [None]:
await levels()

122m, 56 USD

In [None]:
df = df.dropna().reset_index(drop=True)

In [None]:
def count_nodes(json_str):
    try:
        graph = json.loads(json_str)
        return len(graph.get('nodes', []))
    except json.JSONDecodeError:
        return 0

In [None]:
df['num_nodes'] = df['graph'].apply(count_nodes)

In [None]:
df.describe()

In [None]:
df['num_nodes'].hist(bins=13, figsize=(10, 6))

In [None]:
df['num_nodes'].value_counts()

In [None]:
# filter out jokes with less than 3 nodes and more than 6 nodes
df = df[(df['num_nodes'] >= 3) & (df['num_nodes'] <= 6)].reset_index(drop=True)
df.describe()

## Keywords

In [None]:
async def keywords():
    dataset = df['text'].tolist()
    sem = asyncio.Semaphore(3)

    async def rate_limited_extract_keywords(text: str) -> Topics:
        async with sem:
            try:
                return await extract_keywords(text, "es")
            except Exception as e:
                logger.error(f"Error processing text: {text}")
                logger.error(e)
                return []
    
    tasks_get_graphs = [rate_limited_extract_keywords(text) for text in dataset]
    resp = await asyncio.gather(*tasks_get_graphs)

    def safe_model_dump(topics):
        try:
            return topics.model_dump_json()
        except Exception as e:
            logger.error(f"Error reading topics: {e}")
            return None

    df['keywords'] = [safe_model_dump(keywords) for keywords in resp]

    df.to_json('../data/interim/jokes_with_keywords.jsonl', orient='records', lines=True)

In [None]:
await keywords()

In [None]:
df = df.dropna().reset_index(drop=True)

In [None]:
def count_keywords(json_str):
    try:
        topics = json.loads(json_str)
        return len(topics.get('keywords', []))
    except json.JSONDecodeError:
        return 0

In [None]:
df['num_keywords'] = df['keywords'].apply(count_keywords)

In [None]:
df.describe()

In [None]:
df['num_keywords'].hist(bins=10, figsize=(10, 6))

In [None]:
df['num_keywords'].value_counts()

In [None]:
df.head()

In [None]:
def convert_keywords(keywords_str):
    # Load the string into a dictionary
    keywords_dict = json.loads(keywords_str)
    keywords_list = keywords_dict['keywords']
    return str(keywords_list).replace('"', "'")

# Apply the conversion to the 'keywords' column
df['keywords'] = df['keywords'].apply(convert_keywords)

In [None]:
df.head()

In [None]:
df.to_json('../data/interim/jokes.jsonl', orient='records', lines=True)    

In [None]:
df[['text', 'keywords']].to_json('../data/interim/jokes_fine_tuning.jsonl', orient='records', lines=True)  

## Creating the datasets for fine-tuning

In [None]:
df = pd.read_json('../data/interim/jokes.jsonl', orient='records', lines=True)
len(df)

In [None]:
df = read_fine_tuning_file('../data/interim/jokes.jsonl')
len(df)

In [None]:
df_quality = df[(df['word_count'] > 30)&(df['num_nodes'] > 3)][['text', 'keywords']].reset_index(drop=True)
df_quality.head()

In [None]:
async def refinement():
    dataset_text = df_quality['text'].tolist()
    dataset_keywords = df_quality['keywords'].tolist()

    sem = asyncio.Semaphore(3)

    async def rate_limited_refine_joke(text: str, keywords: str) -> Refinement:
        async with sem:
            try:
                return await refine_joke(text, keywords, "es")
            except Exception as e:
                logger.error(f"Error processing text: {text}")
                logger.error(e)
                return Refinement(quality=Quality.NOT_FUNNY, text=text, keywords=[])
    
    tasks_refine_jokes = [rate_limited_refine_joke(text, keywords) for text, keywords in zip(dataset_text, dataset_keywords)]
    resp = await asyncio.gather(*tasks_refine_jokes)
    df_quality['quality'] = [ref.quality.value for ref in resp]
    df_quality['corrected_text'] = [ref.text for ref in resp]
    df_quality['corrected_keywords'] = [ref.keywords for ref in resp]

    df_quality.to_json('../data/interim/jokes_quality.jsonl', orient='records', lines=True)

In [None]:
await refinement()

In [None]:
len(df_quality)

In [None]:
df_quality.columns

In [None]:
df_quality[['text', 'corrected_text', 'keywords', 'corrected_keywords', 'quality']].to_json('../data/interim/quality_jokes_to_be_cleaned.jsonl', orient='records', lines=True)  

In [None]:
df_refined = pd.read_json('../data/interim/quality_jokes_to_be_cleaned.jsonl', orient='records', lines=True)

In [None]:
df_refined.head()

In [None]:
async def rewriting():
    dataset = df_refined['corrected_text'].tolist()

    sem = asyncio.Semaphore(3)

    async def rate_limited_rewrite_joke(text: str) -> Rewrite:
        async with sem:
            try:
                return await rewrite_joke(text, "es")
            except Exception as e:
                logger.error(f"Error processing text: {text}")
                logger.error(e)
                return Rewrite(text=text)
    
    tasks_rewrite_jokes = [rate_limited_rewrite_joke(text) for text in dataset]
    resp = await asyncio.gather(*tasks_rewrite_jokes)
    df_refined['rewritten_text'] = [ref.rewritten_text for ref in resp]

    df_refined.to_json('../data/interim/jokes_high_quality.jsonl', orient='records', lines=True)

In [None]:
await rewriting()

In [None]:
df_fine_tuning = pd.read_json('../data/interim/jokes_high_quality.jsonl', orient='records', lines=True)
df_fine_tuning.head()

In [None]:
df_fine_tuning['quality'].value_counts()

In [None]:
df_fine_tuning = df_fine_tuning[df_fine_tuning['quality'] == 'funny'].reset_index(drop=True)

In [None]:
df_fine_tuning = (df_fine_tuning[['rewritten_text', 'corrected_keywords']]
                  .rename(columns={'rewritten_text': 'text', 'corrected_keywords': 'keywords'})
                  .reset_index(drop=True)
                  )

In [None]:
df_fine_tuning.head()

In [None]:
used_keywords = set()

def get_unique_keyword(keywords):
    for keyword in keywords:
        if keyword not in used_keywords:
            used_keywords.add(keyword)
            return keyword
    return None

df_fine_tuning['keyword'] = df_fine_tuning['keywords'].apply(get_unique_keyword)
df_fine_tuning = df_fine_tuning.dropna().reset_index(drop=True)


In [None]:
df_fine_tuning[['text', 'keyword']].to_json('../data/processed/jokes.jsonl', orient='records', lines=True)