In [33]:
# For additional information or assistance with running this code, please reach out to contact [at] scartozzi [dot] eu.
# This code is released under GNU General Public License v3.0. Feel free to use it as you wish.

import os
import re
import time
import pandas as pd
import sqlite3
import nltk
nltk.download('vader_lexicon')
nltk.download('punkt')
from nltk.sentiment.vader import SentimentIntensityAnalyzer
sent_i = SentimentIntensityAnalyzer()

[nltk_data] Downloading package vader_lexicon to C:\Users\MSI-
[nltk_data]     CMS\AppData\Roaming\nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!
[nltk_data] Downloading package punkt to C:\Users\MSI-
[nltk_data]     CMS\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [34]:
## GLOBAL VARIABLES
# specify the directory where the text files are stored:
TEXT_FILES_DIRECTORY = "./Analysis/Text_files"
# specify if you want to load a SQL database or not:
LOAD_DATABSE = False
# specify the path to the SQL database:
DATABASE_PATH = "./Analysis/Output/database"
# specify the name of the table in the SQL database:
TABLE_NAME = 'documents'
# specify the directory where the assessment framework is stored:
ASSESSMENT_FRAMEWORK_DIRECTORY = "./Analysis/Input/Assessment_framework.ods"
# specify the directory where the results will be saved:
OUTPUT_DIRECTORY = "./Analysis/Output"
# choose the following settings:
TEXT_CLEANING = True
SENTIMENT_ANALYSIS = True
# choose what to export:
EXPORT_SENTENCE_LEVEL_DATA = True
EXPORT_DOCUMENT_DATA = True
# estimate the time needed to run the code:
PRINT_RUN_TIME = True

In [35]:
## LOAD AND PREPARE DATA FOR ANALYSIS
# IMPORT TXT FILES FROM TEXT_FILES_DIRECTORY
def txt_to_sql(directory, db_path):
    data = {"document": [], "text": []}

    for filename in os.listdir(directory):
        if filename.endswith(".txt"):
            filepath = os.path.join(directory, filename)
            with open(filepath, 'r', encoding='utf-8') as file:
                content = file.read()
                data["document"].append(filename.replace(".txt", ""))
                data["text"].append(content)
    df = pd.DataFrame(data)
    conn = sqlite3.connect(db_path)
    df.to_sql('documents', conn, if_exists='replace', index=False)
    print(df.shape)
    conn.close()
    return df

# IMPORT ASSESSMENT FRAMEWORK
def create_dataframes(ods_file):
    variables = process_dataframe(pd.read_excel(ods_file, sheet_name='variables', engine='odf'))
    set_search_strings = process_dataframe(pd.read_excel(ods_file, sheet_name='search_strings', engine='odf'))
    set_co_occurrences = process_dataframe(pd.read_excel(ods_file, sheet_name='co_occurrences', engine='odf'))
    set_doc_conditionals = process_dataframe(pd.read_excel(ods_file, sheet_name='doc_conditionals', engine='odf'))
    set_keywords = process_dataframe(pd.read_excel(ods_file, sheet_name='taxonomy', engine='odf'), drop_na=False)
    return variables, set_search_strings, set_co_occurrences, set_doc_conditionals, set_keywords

# Helper function to process the data from each sheet in the ODS file
def process_dataframe(df, lowercase_columns=True, drop_na=False):
    if lowercase_columns:
        df.columns = [convert_to_lowercase(col) for col in df.columns]
    df = df.map(convert_to_lowercase)
    if 'query' in df.columns:
        df['query'] = df['query'].apply(format_logical_expression)
    if drop_na:
        df = df.dropna()
    return df

# Helper function to format logical expressions in the search strings' query column
def format_logical_expression(expression):
    if not isinstance(expression, str):
        return expression
    expression = re.sub(r"(?<![a-zA-Z])and(?![a-zA-Z])", " and ", expression)
    expression = re.sub(r"(?<![a-zA-Z])or(?![a-zA-Z])", " or ", expression)
    expression = re.sub(r"(?<![a-zA-Z])not(?![a-zA-Z])", " not ", expression)
    expression = re.sub(r"\s+", " ", expression).strip()
    return expression

# Helper function to convert strings to lowercase
def convert_to_lowercase(l):
    if isinstance(l, str):
        return l.lower().strip()
    return l

# ORGANIZE KEYWORDS IN DICTIONARY
def organize_keywords(df):
    cols = df.columns
    key_dict = {}
    for col in cols:
        values = [str(value).strip() for value in df[col].dropna() if str(value).strip()]
        key_dict[col.lower()] = values
    return key_dict

# CLEAN TEXT DATA
def clean_text(df, content):
    df[content] = df[content].apply(lambda text: re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE))
    df[content] = df[content].apply(lambda text: re.sub(r'\.{2,}', '.', text))
    df[content] = df[content].apply(lambda text: re.sub(r'\n(?=[a-z])', ' ', text))

# SPLIT TEXT INTO SENTENCES
def split_sentences(df):
    df["sentences"] = df["text"].apply(nltk.sent_tokenize)
    return df.explode("sentences")

# CLEAN AND DROP SENTENCES
def clean_rows(df):
    df["sentences"] = df["sentences"].str.lower()
    df = df[~df["sentences"].str.match(r'^\d+$')]                   
    df = df[~df["sentences"].str.match(r'^\d+[\W_]+$')]             
    df = df[~df["sentences"].str.match(r'^[\W_]+$')]               
    df.loc[:, "sentences"] = df["sentences"].str.replace('\n', '', regex=False)
    return df

## RUN CONTENT ANALYSIS
# CHECK PRESENCE OF LISTS FROM TAXONOMY IN SENTENCES
def check_groups(df, word_dict):
    for key, words in word_dict.items():
        df[key] = False
        patterns = [r"\b{}\b".format(re.escape(word).replace(r'\*', r'\w*')) for word in words]
        combined_pattern = '|'.join(patterns)
        df[key] = df['sentences'].str.contains(combined_pattern, regex=True)
    return df

# CHECK PRESENCE OF CO-OCCURRENCES IN SENTENCES BASED ON check_groups
def initiate_co_occurrences(df, set_co_occurrences, word_dict):
    set_co_occurrences['name of co-occurrence'] = set_co_occurrences['name of co-occurrence'].astype(object)
    compiled_patterns = {}
    for key, words in word_dict.items():
        patterns = [r"\b%s\b" % word.replace(".", r"\.").replace("*", "\w*") for word in words]
        compiled_patterns[key] = re.compile('|'.join(patterns))
    for index, row in set_co_occurrences.iterrows():
        if pd.isnull(row['name of co-occurrence']):
            set_co_occurrences.at[index, 'name of co-occurrence'] = ''
        key1 = row['first list']
        key2 = row['second list']
        distance = row['distance between lists']
        name = row['name of co-occurrence']
        df[name] = df['sentences'].apply(lambda sentence: find_co_occurrences(sentence, compiled_patterns[key1], compiled_patterns[key2], distance))
    return df

# Helper function to find co-occurrences
def find_co_occurrences(sentence, pattern1, pattern2, distance):
    occurrences1 = [m.start() for m in pattern1.finditer(sentence)]
    occurrences2 = [m.start() for m in pattern2.finditer(sentence)]
    co_occurrences = []
    for occ1 in occurrences1:
        for occ2 in occurrences2:
            start = min(occ1, occ2)
            end = max(occ1, occ2)
            num_words = len(sentence[start:end].split())
            co_occurrences.append(num_words)
    return min(co_occurrences) <= distance if co_occurrences else False

# CHECK DOCUMENT-LEVEL CONDITIONALS
def initiate_document_conditionals(df, set_doc_conditionals):
    for _, row in set_doc_conditionals.iterrows():
        name = row['name of document-level conditional']
        conditional = row['list']
        # Use groupby and transform to create a boolean series directly
        df[name] = df.groupby('document')[conditional].transform(lambda x: x.any())
    return df

# SENTIMENT ANALYSIS BASED ON VADAR
def vadar_sentiment_analysis(text_series):
    # Apply the sentiment analysis to the entire series
    sentiment_scores = text_series.apply(lambda text: sent_i.polarity_scores(text)['compound'])
    return pd.DataFrame({
        'vadar_compound': sentiment_scores,
        'sentiment_positive': sentiment_scores > 0,
        'sentiment_neutral': sentiment_scores == 0,
        'sentiment_neutralpositive': sentiment_scores >= 0,
        'sentiment_negative': sentiment_scores < 0,
        'sentiment_neutralnegative': sentiment_scores <= 0
    })

# RUN QUERIES LISTED IN ASSESSMENT_FRAMEWORK
def run_queries(df, set_search_strings):
    results = df.copy()
    for _, row in set_search_strings.iterrows():
        variable, query = row['query name'], row['query']
        # Convert AND, OR, NOT operations to their Python equivalents
        query = query.replace("AND", "and").replace("OR", "or").replace("NOT", "not")
        # Evaluate the query for each row in df
        results[variable] = df.apply(lambda x: evaluate_query(x, query), axis=1)
    return results

# Helper function to evaluate the query
def evaluate_query(row, query):
    for column in row.index:
        query = query.replace(f'"{column}"', str(row[column]))
    try:
        return eval(query)
    except Exception as e:
        print(f"Error evaluating query: {query} - {e}")
        return False
    
# AGGREGATE VARIABLES LISTED IN ASSESSMENT_FRAMEWORK
def define_variables(results, variables):
    for _, row in variables.iterrows():
        condition, threshold_str = row['aggregation query'].split('>')
        threshold = int(threshold_str.strip())
        child_vars = []
        for var in condition.split('+'):
            var_name = var.strip(" ()\"'")
            if var_name in results.columns:
                child_vars.append(var_name)
        variable_number = str(row['variable number'])
        if '.' not in variable_number:
            variable_number += '.'
        parent_name = f"{variable_number} {row['variable']}"
        count_true = sum([results[var] for var in child_vars])
        results[parent_name] = count_true > threshold
    return results


## EXPORT RESULTS
def get_v_list(df):
    return [col for col in df.columns if bool(re.match(r'^[0-9]+\.[0-9]*', str(col)))]

def results_document(df, v_list):
    codebook = df.groupby('document')[v_list].sum().astype(int).reset_index()
    return codebook

def results_document_clipped(df, v_list):
    code_bool = df.groupby('document')[v_list].sum().clip(upper=1).reset_index()
    return code_bool

def results_document_percentage_of_sentences(df, v_list):
    grouped = df.groupby('document')
    codebook_count_sent = grouped['sentences'].count().reset_index()
    code_sent_percent = grouped[v_list].sum().reset_index()
    code_sent_percent = code_sent_percent.merge(codebook_count_sent, on='document', how='outer')
    code_sent_percent[v_list] = code_sent_percent[v_list].div(code_sent_percent['sentences'], axis=0).multiply(100)
    code_sent_percent.drop('sentences', axis=1, inplace=True)
    return code_sent_percent

def results_document_sentiment(df, v_list):
    codebook_sentiment = df[['document']].drop_duplicates().set_index('document')
    for var in v_list:
        sentiment = df.loc[df[var], ['document', 'vadar_compound']].groupby('document')['vadar_compound'].mean()
        codebook_sentiment[var] = codebook_sentiment.index.map(sentiment)
    return codebook_sentiment.reset_index()

### SQL FUNCTIONS MANAGEMENT
def load_data_from_sql(db_path, table_name):
    conn = sqlite3.connect(db_path)
    df = pd.read_sql(f'SELECT * FROM {table_name}', conn)
    conn.close()
    return df

def save_data_to_sql(df, db_path, table_name):
    conn = sqlite3.connect(db_path)
    df.to_sql(table_name, conn, if_exists='replace', index=False)
    conn.close()

def load_data_from_sql(db_path, table_name):
    conn = sqlite3.connect(db_path)
    df = pd.read_sql(f'SELECT * FROM {table_name}', conn)
    conn.close()
    return df


## CHECK FOR USER-MADE ERRORS IN ASSESSEMENT FRAMEWORK
def preliminary_checks(set_search_strings, set_doc_conditionals, set_co_occurrences, taxonomy):
    if not os.path.exists(ASSESSMENT_FRAMEWORK_DIRECTORY):
        print(f"Error: The Assessment framework file '{ASSESSMENT_FRAMEWORK_DIRECTORY}' not found.")
        return False
    return True

# Check the search strings for errors
def check_search_strings(search_strings, taxonomy, doc_conditionals, co_occurrences,variables):
    required_columns = ['query name', 'query']
    if not all(column in search_strings.columns for column in required_columns):
        print(f"Error: The search_strings tab does not have the required columns. Keep the column headers included in the template.")
        return False
    valid_terms = set(taxonomy.columns) | set(doc_conditionals['name of document-level conditional']) | set(co_occurrences['name of co-occurrence'])
    if search_strings['query name'].duplicated().any():
        print("Error: The 'variable' column in the search_strings tab contains duplicate entries.")
        return False
    for query in search_strings['query']:
        if query.count("(") != query.count(")"):
            print(f"Error: Mismatched parentheses in '{query}'.")
            return False
        strings_in_quotes = re.findall(r'"([^"]*)"', query)
        for string in strings_in_quotes:
            if string not in valid_terms and string not in ['sentiment_positive', 'sentiment_neutral', 'sentiment_neutralpositive', 'sentiment_negative', 'sentiment_neutralnegative']:
                print(f"Error: The string '{string}' inside quotation marks is not found in the provided taxonomy, document-level conditionals, or co-occurrences.")
                return False
        query_simplified = re.sub(r'"[^"]*"', '', query)
        query_simplified = re.sub(r'\b(and|or|not)\b', '', query_simplified, flags=re.IGNORECASE)
        query_simplified = query_simplified.replace("(", "").replace(")", "")
        if query_simplified.strip() != "":
            print(f"Error: Query contains invalid or improperly quoted strings: {query_simplified.strip()}")
            return False
        if '""' in query or re.search(r'"\s+"', query):
            print("Error: Found strings not properly enclosed in quotation marks or missing quotation marks.")
            return False
    required_co_occurrence_columns = ['name of co-occurrence', 'first list', 'distance between lists', 'second list']
    if not all(column in co_occurrences.columns for column in required_co_occurrence_columns):
        print("Error: co_occurrences tab does not have the required columns.")
        return False
    valid_terms_for_co_occurrences = set(taxonomy.columns)
    for index, row in co_occurrences.iterrows():
        if row['first list'] not in valid_terms_for_co_occurrences or row['second list'] not in valid_terms_for_co_occurrences:
            print(f"Error: Check if these strings in co_occurrences '{row['first list']}' or '{row['second list']}' are present in the taxonomy as column headers.")
            return False
    required_doc_conditional_columns = ['name of document-level conditional', 'list']
    if not all(column in doc_conditionals.columns for column in required_doc_conditional_columns):
        print("Error: doc_conditionals tab does not have the required columns.")
        return False
    for index, row in doc_conditionals.iterrows():
        if row['list'] not in valid_terms_for_co_occurrences:
            print(f"Error: The string '{row['list']}' in doc_conditionals is not present in the taxonomy as column headers.")
            return False
    all_variables_in_search_strings = set(search_strings['query name'])
    child_variables = set()
    for query in variables['aggregation query']:
        matches = re.findall(r'"(.*?)"', query)
        child_variables.update(matches)
    missing_variables = child_variables - all_variables_in_search_strings
    if missing_variables:
        print(f"Error: The following queries are not present in the search_strings 'query name' column: {missing_variables}. Note that query names cannot be numerical")
        return False
    return True

## ESTIMATE RUN TIME
def estimate_run_time(sample_df, full_run=False): 
    variables, set_search_strings, set_co_occurrences, set_doc_conditionals, set_keywords = create_dataframes(ASSESSMENT_FRAMEWORK_DIRECTORY)
    key_dict = organize_keywords(set_keywords)
    start_time = time.time()
    sample_df = check_groups(sample_df, key_dict)
    sample_df = initiate_co_occurrences(sample_df, set_co_occurrences, key_dict)
    initiate_document_conditionals(sample_df, set_doc_conditionals)
    if SENTIMENT_ANALYSIS:
        sentiment_df = vadar_sentiment_analysis(sample_df['sentences'])
        sample_df = pd.concat([sample_df, sentiment_df], axis=1)
    sample_results = run_queries(sample_df, set_search_strings)
    sample_results = define_variables(sample_results, variables)
    end_time = time.time()
    return end_time - start_time

In [36]:
def main():  
    ## LOAD DATA
    if not LOAD_DATABSE:
        df = txt_to_sql(TEXT_FILES_DIRECTORY, DATABASE_PATH)
    else:
        df = load_data_from_sql(DATABASE_PATH, TABLE_NAME) 
    variables, set_search_strings, set_co_occurrences, set_doc_conditionals, set_keywords = create_dataframes(ASSESSMENT_FRAMEWORK_DIRECTORY)
    key_dict = organize_keywords(set_keywords)

    ## CHECK FOR USER-MADE ERRORS IN ASSESSEMENT FRAMEWORK
    if not preliminary_checks(set_search_strings, set_co_occurrences, set_doc_conditionals, set_keywords):
        print("Preliminary checks failed. Exiting...")
        return  
    if not check_search_strings(set_search_strings, set_keywords, set_doc_conditionals, set_co_occurrences, variables):
        print("Search strings check failed.")
        return
    
    ## PREPARE DATA
    if TEXT_CLEANING:
        clean_text(df, 'text')
    df = split_sentences(df)
    df= clean_rows(df)
    df = df.drop('text', axis=1)

    ## ESTIMATE RUN TIME
    if PRINT_RUN_TIME:
        total_sentences = len(df)
        if total_sentences < 100:
            print(f"The dataset has only {total_sentences} sentences, which is insufficient to accurately estimate runtime.")
        else:
            sample = df.sample(n=100)
            time_for_50 = estimate_run_time(sample, full_run=False)
            estimated_total_time = (time_for_50 / 100) * total_sentences
            print(f"Estimated run time for {total_sentences} sentences: {estimated_total_time} seconds (excluding time to export results).")

    ## RUN CONTENT ANALYSIS
    df = check_groups(df, key_dict)
    df = initiate_co_occurrences(df, set_co_occurrences, key_dict)
    df = initiate_document_conditionals(df, set_doc_conditionals)
    if SENTIMENT_ANALYSIS:
        sentiment_df = vadar_sentiment_analysis(df['sentences'])
        df = pd.concat([df, sentiment_df], axis=1)
    df = run_queries(df, set_search_strings)
    df = define_variables(df, variables)

    ## EXPORT RESULTS
    os.makedirs(OUTPUT_DIRECTORY, exist_ok=True)
    v_list = get_v_list(df)
    if EXPORT_DOCUMENT_DATA:
        codebook = results_document(df, v_list)
        codebook.to_csv(f'{OUTPUT_DIRECTORY}/results_document.tsv', sep='\t', index=False)
        code_bool = results_document_clipped(df, v_list)
        code_bool.to_csv(f'{OUTPUT_DIRECTORY}/results_document_clipped.tsv', sep='\t', index=False)
        code_sent_percent = results_document_percentage_of_sentences(df, v_list)
        code_sent_percent.to_csv(f'{OUTPUT_DIRECTORY}/results_document_percentage_of_sentences.tsv', sep='\t', index=False)
        if SENTIMENT_ANALYSIS:
            codebook_sentiment = results_document_sentiment(df, v_list)
            codebook_sentiment.to_csv(f'{OUTPUT_DIRECTORY}/results_document_sentiment.tsv', sep='\t', index=False)
    if EXPORT_SENTENCE_LEVEL_DATA:
        df.to_csv(f'{OUTPUT_DIRECTORY}/results_sentences.tsv', sep='\t', index=False)
    save_data_to_sql(df, DATABASE_PATH, TABLE_NAME)

if __name__ == "__main__":
    main_results = main()

(7, 2)
Estimated run time for 1589 sentences: 1.59293470621109 seconds (excluding time to export results).
