<a href="https://colab.research.google.com/github/cchang-vassar/Semantic-Relations-in-Vector-Embeddings/blob/main/arguana_data_extraction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Extract Arguments from Arguana Corpus

## Imports

In [None]:
import os
import re
import zipfile
from google.colab import userdata
from enum import Enum
from typing import Optional
import pandas as pd
import pickle
import shutil
import subprocess

## OSF Setup

In [None]:
!pip install osfclient

Collecting osfclient
  Downloading osfclient-0.0.5-py2.py3-none-any.whl (39 kB)
Installing collected packages: osfclient
Successfully installed osfclient-0.0.5


In [None]:
os.environ["OSF_USERNAME"] = userdata.get("OSF_USERNAME")
OSF_USERNAME = os.environ["OSF_USERNAME"]

In [None]:
os.environ["OSF_PASSWORD"] = userdata.get("OSF_PASSWORD")
OSF_PASSWORD = os.environ["OSF_PASSWORD"]

In [None]:
os.environ["OSF_TOKEN"] = userdata.get("OSF_TOKEN")
OSF_TOKEN = os.environ["OSF_TOKEN"]

In [None]:
os.environ["OSF_PROJECT_ID"] = userdata.get("OSF_PROJECT_ID")
OSF_PROJECT_ID = os.environ["OSF_PROJECT_ID"]

## Grab Data from OSF

In [None]:
!osf -p sakjg fetch osfstorage/corpora/arguana_corpus.zip

100% 121M/121M [00:00<00:00, 128Mbytes/s]


In [None]:
!osf -p sakjg fetch osfstorage/corpora/arguana_file_paths.zip

  0% 0.00/90.7k [00:00<?, ?bytes/s]100% 90.7k/90.7k [00:00<00:00, 65.1Mbytes/s]


In [None]:
arguana_corpus_file_path = 'arguana_corpus.zip'
arguana_corpus_output_folder_path = 'arguana-corpus'
os.makedirs(arguana_corpus_output_folder_path, exist_ok=True)

with zipfile.ZipFile(arguana_corpus_file_path, 'r') as zip_ref:
  zip_ref.extractall(arguana_corpus_output_folder_path)

extracted_files = os.listdir(arguana_corpus_output_folder_path)
print("Files extracted:", extracted_files)

Files extracted: ['__MACOSX', 'arguana_corpus']


In [None]:
arguana_file_paths_file_path = 'arguana_file_paths.zip'
arguana_file_paths_output_folder_path = 'arguana-file-paths'
os.makedirs(arguana_file_paths_output_folder_path, exist_ok=True)

with zipfile.ZipFile(arguana_file_paths_file_path, 'r') as zip_ref:
  zip_ref.extractall(arguana_file_paths_output_folder_path)

extracted_files = os.listdir(arguana_file_paths_output_folder_path)
print("Files extracted:", extracted_files)

Files extracted: ['arguana_file_paths', '__MACOSX']


In [None]:
TRAINING_ARGUANA_FILE_PATH = 'arguana-corpus/arguana_corpus/02-extracted-arguments/training'
TEST_ARGUANA_FILE_PATH = 'arguana-corpus/arguana_corpus/02-extracted-arguments/test'

In [None]:
TRAINING_FILE_PATHS_FILE_PATH = 'arguana-file-paths/arguana_file_paths/training'
TRAINING_ALL_CATEGORIES_FILE_PATH = f'{TRAINING_FILE_PATHS_FILE_PATH}/all_categories.txt'
TRAINING_ALL_DEBATES_FILE_PATH = f'{TRAINING_FILE_PATHS_FILE_PATH}/list_of_all_debates.txt'

In [None]:
TEST_FILE_PATHS_FILE_PATH = 'arguana-file-paths/arguana_file_paths/test'
TEST_ALL_CATEGORIES_FILE_PATH = f'{TEST_FILE_PATHS_FILE_PATH}/all_categories.txt'
TEST_ALL_DEBATES_FILE_PATH = f'{TEST_FILE_PATHS_FILE_PATH}/list_of_all_debates.txt'

## Class Declarations

In [None]:
# Enum for categories
class Category(Enum):
  CULTURE = "culture"
  DIGITAL_FREEDOMS = "digital-freedoms"
  ECONOMY = "economy"
  EDUCATION = "education"
  ENVIRONMENT = "environment"
  FREE_SPEECH_DEBATE = "free-speech-debate"
  HEALTH = "health"
  INTERNATIONAL = "international"
  LAW = "law"
  PHILOSOPHY = "philosophy"
  POLITICS = "politics"
  RELIGION = "religion"
  SCIENCE = "science"
  SOCIETY = "society"
  SPORT = "sport"

In [None]:
class Dataset(Enum):
  TRAINING = "training"
  TEST = "test"

## Extract Arguments from File

### [Debate] Arguments dict

In [None]:
def _debate_read_file(category: Category, file_path: str, dataset: Dataset) -> list[str]:
  """Open debate file"""
  try:
    with open(f'{TRAINING_ARGUANA_FILE_PATH if dataset == Dataset.TRAINING else TEST_ARGUANA_FILE_PATH}/{category.value}/{file_path}/full.txt', 'r') as file:
      file_contents = file.read()
  except FileNotFoundError:
    print(f"File not found: {file_path + '.txt'}")
    return None

  lines: list = re.split(r'\n', file_contents)
  return lines

In [None]:
def debate_extract_arguments(
  category: Category,
  file_path: str,
  start_re: str = "# PRO",
  end_re: str = "# LITERATURE",
  pro_point_re: str = "# PRO\w+-POINT",
  pro_counter_re: str = "# PRO\w+-COUNTER",
  con_point_re: str = "# CON\w+-POINT",
  con_counter_re: str = "# CON\w+-COUNTER",
  dataset: Dataset = Dataset.TRAINING,
  ) -> dict:
  """Extract arguments from category file:
  debate_topic.txt -> full.txt
  """

  lines = _debate_read_file(category, file_path, dataset)
  if not lines:
    return None

  # Enum for argument section
  class ArgumentSection(Enum):
    PRO = "pro"
    CON = "con"

  # Enum for argument type
  class ArgumentType(Enum):
    POINT = "point"
    COUNTER = "counter"

  # holds the extracted arguments for the debate topic
  debate_arguments = {}

  # holds the argument pairs data for the debate topic
  arguments = {
    'pro': [],
    'con': []
  }

  # Start looping through lines
  current_argument: str = ""
  start: bool = False
  current_argument_section = ArgumentSection.PRO
  current_argument_type = ArgumentType.POINT
  cur_pair = {}

  for line in lines:
    # skip to start line
    if (not start):
      if re.match(r'\s*' + start_re, line):
        start = True
        continue
      continue

    # special case when we reach # LITERATURE
    # we append the last argument and return
    if re.match(r'\s*' + end_re, line):
      _append_argument_to_cur_pair(
        current_argument,
        current_argument_type,
        cur_pair
        )
      _append_cur_pair_to_arguments(
        current_argument_section,
        arguments,
        cur_pair
        )
      if len(arguments['pro']) and len(arguments['con']):
        debate_arguments[file_path] = arguments
      return debate_arguments

    # skip citations
    if re.match(r'\s*\[', line):
      continue

    # Append an argument to current pair
    def _append_argument_to_cur_pair(
      current_argument: str,
      current_argument_type: ArgumentType,
      cur_pair: dict
      ):
      if len(current_argument):
        if current_argument_type == ArgumentType.POINT:
          cur_pair['point'] = current_argument
        else:
          cur_pair['counter'] = current_argument

    # Append current argument pair to arguments
    def _append_cur_pair_to_arguments(
      current_argument_section: ArgumentSection,
      arguments: dict,
      cur_pair: dict
      ):
      if len(cur_pair):
        if current_argument_section == ArgumentSection.PRO:
          arguments["pro"].append(cur_pair)
        else:
          arguments["con"].append(cur_pair)

    # case where we meet a pro point
    if re.match(r'\s*' + pro_point_re, line):
      _append_argument_to_cur_pair(
        current_argument,
        current_argument_type,
        cur_pair
        )
      _append_cur_pair_to_arguments(
        current_argument_section,
        arguments,
        cur_pair
        )
      current_argument_section = ArgumentSection.PRO
      current_argument_type = ArgumentType.POINT
      current_argument = ""
      cur_pair = {}
      continue

    # case where we meet a pro counter
    elif re.match(r'\s*' + pro_counter_re, line):
      _append_argument_to_cur_pair(
        current_argument,
        current_argument_type,
        cur_pair
        )
      current_argument_section = ArgumentSection.PRO
      current_argument_type = ArgumentType.COUNTER
      current_argument = ""
      continue

      # case where we meet a con point
    elif re.match(r'\s*' + con_point_re, line):
      _append_argument_to_cur_pair(
        current_argument,
        current_argument_type,
        cur_pair
        )
      _append_cur_pair_to_arguments(
        current_argument_section,
        arguments,
        cur_pair
        )
      current_argument_section = ArgumentSection.CON
      current_argument_type = ArgumentType.POINT
      current_argument = ""
      cur_pair = {}
      continue

    # case where we meet a con counter
    elif re.match(r'\s*' + con_counter_re, line):
      _append_argument_to_cur_pair(
        current_argument,
        current_argument_type,
        cur_pair
        )
      current_argument_section = ArgumentSection.CON
      current_argument_type = ArgumentType.COUNTER
      current_argument = ""
      continue

    # remove in-text citations
    line = re.sub(r'\[\w+\]', '', line)
    line = re.sub(r'\s\s+', '', line)
    current_argument += line.strip()

  # this should never actually be reached
  debate_arguments[file_path] = arguments
  return debate_arguments

### [Category] Arguments dict

In [None]:
def _write_invalid_debate_to_file(category: Category, file_path: str, dataset: Dataset):
  """Mark debate files that are empty"""
  output_folder = f'current-data-dump/data-valid-tally/{"training" if dataset == Dataset.TRAINING else "test"}'
  output_file_path = f'{output_folder}{category.value}.txt'
  os.makedirs(output_folder, exist_ok=True)
  file = open(output_file_path, "a")
  file.write(file_path)

In [None]:
def _category_read_paths(category: Category, dataset: Dataset) -> list[str]:
  """Read file paths for category"""
  category_path = category.value.replace('-', '_')
  category_file_path = f'{TRAINING_FILE_PATHS_FILE_PATH if dataset == Dataset.TRAINING else TEST_FILE_PATHS_FILE_PATH}/list_of_{category_path}_debates.txt'
  try:
    with open(category_file_path, 'r') as file:
      file_contents = file.read()
  except FileNotFoundError:
    print(f"File not found: {category_file_path}")
    return None
  lines: list = re.split(r'\n', file_contents)
  return lines

In [None]:
def category_extract_arguments(category: Category, dataset: Dataset) -> dict:
  """ Extract all debates from a category:
  list_of_<category_path>_debates.txt -> <debate_topic>.txt
  """
  debates_file_paths = _category_read_paths(category, dataset=dataset)
  category_arguments = {}
  for i, debate in enumerate(debates_file_paths):
    debate_arguments = debate_extract_arguments(category, debate, dataset=dataset)
    if debate_arguments:
      category_arguments.update(debate_extract_arguments(category, debate, dataset=dataset))
    else:
      _write_invalid_debate_to_file(category, debate, dataset=dataset)
  category_arguments = {f'{category.value}': category_arguments}
  return category_arguments

### [Global] Arguments dict

In [None]:
def _global_read_paths(dataset: Dataset) -> list[str]:
  """Read file paths for all debates"""
  global_file_path = TRAINING_ALL_CATEGORIES_FILE_PATH if dataset == Dataset.TRAINING else TEST_ALL_CATEGORIES_FILE_PATH
  try:
    with open(global_file_path, 'r') as file:
      file_contents = file.read()
  except FileNotFoundError:
    print(f"File not found: {global_file_path}")
    return None
  lines: list = re.split(r'\n', file_contents)
  return lines

In [None]:
def global_extract_arguments(dataset: Dataset) -> dict:
  """ Extract all debates across all categories:
  all_categories.txt -> list_of_<category>_debates.txt
  """
  category_file_paths = _global_read_paths(dataset)
  category_pattern = re.compile(r'list_of_(\w+)_debates')
  category_paths = [line for line in category_file_paths if category_pattern.search(line)]
  category_names = [category_pattern.search(category).group(1).upper() for category in category_paths]

  # key: category: Category.value
  # value: dictionary of dictionaries where key = topic and value is {'pro: [{'point':, 'counter':}, ...], 'con': []}
  global_arguments = {}

  # add valid topics as keys to extracted_categories and grab their arguments
  for index, category_str in enumerate(zip(category_paths, category_names)):
    try:
      category = Category[category_str[1]]
      global_arguments.update(category_extract_arguments(category, dataset))
    except KeyError as e:
      print(f"Category: {category_str[1]}, Category not found in Category enum and is removed.")
      category_paths.pop(index)
      category_names.pop(index)
  return global_arguments

## Convert to df

#### [Write to File] Arguments df

In [None]:
def _arguments_df_write_to_file(
  arguments_data: pd.DataFrame,
  category: Optional[str] = None,
  topic: Optional[str] = None,
  dataset: Dataset = Dataset.TRAINING,
  ):
  """Write arguments df to pickle file"""

  # Debate case
  if topic and category:
    topic_path = topic.replace('-', '_')
    folder_path = f'current-data-dump/arguments-dump/{"training" if dataset == Dataset.TRAINING else "test"}/{category}/'
    file_path = f'{folder_path}{topic_path}_arguments.pkl'

  # Category case
  elif category:
    folder_path = f'current-data-dump/arguments-dump/{"training" if dataset == Dataset.TRAINING else "test"}/{category}/'
    file_path = f'{folder_path}{category}_arguments.pkl'

  # Global case
  else:
    folder_path = f'current-data-dump/arguments-dump/{"training" if dataset == Dataset.TRAINING else "test"}/'
    file_path = f'{folder_path}global_arguments.pkl'

  if not os.path.exists(folder_path):
    os.makedirs(folder_path)

  with open(file_path, 'wb') as file:
    pickle.dump(arguments_data, file)
    print(f"File uploaded to {file_path}")

#### [Debate] Arguments df

In [None]:
def debate_convert_to_df(debate_arguments: dict, category: str, dataset: Dataset) -> pd.DataFrame:
  """Convert arguments dict into df"""
  debate_arguments_df = pd.DataFrame()
  debate_topic = next(iter(debate_arguments))

  # loop through all argument pairs in the # PRO section
  for i, pro_argument in enumerate(debate_arguments[debate_topic]["pro"]):
    point_argument = {
      'argument': pro_argument['point'],
      'pair_id': str(i),
      'type': 'point',
      'stance': 'PRO'
    }
    debate_arguments_df = pd.concat([debate_arguments_df, pd.DataFrame([point_argument])], axis=0)
    debate_arguments_df = debate_arguments_df.reset_index(drop=True)

    if 'counter' in pro_argument.keys():
      counter_argument = {
        'argument': pro_argument['counter'],
        'pair_id': str(i),
        'type': 'counter',
        'stance': 'CON'
      }
      debate_arguments_df = pd.concat([debate_arguments_df, pd.DataFrame([counter_argument])], axis=0)
      debate_arguments_df = debate_arguments_df.reset_index(drop=True)

  offset = len(debate_arguments[debate_topic]["pro"])

  # loop through all argument pairs in the # CON section
  for j, con_argument in enumerate(debate_arguments[debate_topic]["con"]):
    point_argument = {
      'argument': con_argument['point'],
      'pair_id': str(j+offset),
      'type': 'point',
      'stance': 'CON'
    }
    debate_arguments_df = pd.concat([debate_arguments_df, pd.DataFrame([point_argument])], axis=0)
    debate_arguments_df = debate_arguments_df.reset_index(drop=True)

    if 'counter' in con_argument.keys():
      counter_argument = {
        'argument': con_argument['counter'],
        'pair_id': str(j+offset),
        'type': 'counter',
        'stance': 'PRO'
      }
      debate_arguments_df = pd.concat([debate_arguments_df, pd.DataFrame([counter_argument])], axis=0)
      debate_arguments_df = debate_arguments_df.reset_index(drop=True)

  debate_arguments_df['topic'] = debate_topic
  debate_arguments_df = debate_arguments_df.dropna()
  _arguments_df_write_to_file(debate_arguments_df, category, debate_topic, dataset=dataset)
  return debate_arguments_df

#### [Category] Arguments df

In [None]:
def category_convert_to_df(category_arguments: dict, dataset: Dataset) -> pd.DataFrame:
  """Convert category arguments dict into df"""
  category_arguments_df = pd.DataFrame()
  category = next(iter(category_arguments))

  # Loop through debates in category
  debates = category_arguments[category]
  for debate in debates:
    debate_dict = category_arguments[category][debate]
    debate_df = debate_convert_to_df({debate: debate_dict}, category, dataset=dataset)
    category_arguments_df = pd.concat([category_arguments_df, debate_df], axis = 0)
    category_arguments_df = category_arguments_df.reset_index(drop=True)

  category_arguments_df['category'] = category
  category_arguments_df = category_arguments_df.dropna()
  _arguments_df_write_to_file(category_arguments_df, category, dataset=dataset)
  return category_arguments_df

#### [Global] Arguments df

In [None]:
def global_convert_to_df(global_arguments: dict, dataset: Dataset) -> pd.DataFrame:
  """Convert global arguments dict into df"""
  global_arguments_df = pd.DataFrame()

  # Loop through categories in global arguments
  for category in global_arguments.keys():
    global_arguments_df = pd.concat([global_arguments_df, category_convert_to_df({category: global_arguments[category]}, dataset=dataset)], axis=0)
    global_arguments_df = global_arguments_df.reset_index(drop=True)

  global_arguments_df = global_arguments_df.dropna()
  _arguments_df_write_to_file(global_arguments_df, dataset=dataset)
  return global_arguments_df

## Run Extract Arguments

### [Debate] Run Extract Arguments

In [None]:
economy_debate_arguments = debate_extract_arguments(Category.ECONOMY, "business-economy-general-house-would-prohibit-retailers-selling-certain-items", dataset=Dataset.TRAINING)

### [Category] Run Extract Arguments

In [None]:
economy_category_arguments = category_extract_arguments(Category.ECONOMY, dataset=Dataset.TRAINING)

### [Global] Run Extract Arguments

In [None]:
global_arguments = global_extract_arguments(Dataset.TRAINING)

File not found: -business-finance-health-addiction-house-would-introduce-minimum-pricing-alcohol.txt


In [None]:
global_test_arguments = global_extract_arguments(Dataset.TEST)

## Run Convert to df

### [Debate] Run Convert to df

In [None]:
economy_debate_arguments_df = debate_convert_to_df(economy_debate_arguments, Category.ECONOMY.value, Dataset.TRAINING)

File uploaded to current-data-dump/arguments-dump/training/economy/business_economy_general_house_would_prohibit_retailers_selling_certain_items_arguments.pkl


### [Category] Run Convert to df

In [None]:
economy_category_arguments_df = category_convert_to_df(economy_category_arguments, Dataset.TRAINING)

File uploaded to current-data-dump/arguments-dump/training/economy/business_economic_policy_economy_general_house_believes_national_minimum_wage_arguments.pkl
File uploaded to current-data-dump/arguments-dump/training/economy/business_economic_policy_employment_eurozone_crisis_house_believes_eu_member_arguments.pkl
File uploaded to current-data-dump/arguments-dump/training/economy/business_economic_policy_international_africa_house_believes_africans_are_worse_arguments.pkl
File uploaded to current-data-dump/arguments-dump/training/economy/business_economy_general_house_would_prohibit_retailers_selling_certain_items_arguments.pkl
File uploaded to current-data-dump/arguments-dump/training/economy/business_employment_economy_general_society_house_believes_there_should_be_arguments.pkl
File uploaded to current-data-dump/arguments-dump/training/economy/business_employment_finance_house_would_introduce_mandatory_salary_capping_arguments.pkl
File uploaded to current-data-dump/arguments-dump/t

### [Global] Run Convert to df

In [None]:
global_arguments_df = global_convert_to_df(global_arguments, Dataset.TRAINING)

File uploaded to current-data-dump/arguments-dump/training/economy/business_economic_policy_economy_general_house_believes_national_minimum_wage_arguments.pkl
File uploaded to current-data-dump/arguments-dump/training/economy/business_economic_policy_employment_eurozone_crisis_house_believes_eu_member_arguments.pkl
File uploaded to current-data-dump/arguments-dump/training/economy/business_economic_policy_international_africa_house_believes_africans_are_worse_arguments.pkl
File uploaded to current-data-dump/arguments-dump/training/economy/business_economy_general_house_would_prohibit_retailers_selling_certain_items_arguments.pkl
File uploaded to current-data-dump/arguments-dump/training/economy/business_employment_economy_general_society_house_believes_there_should_be_arguments.pkl
File uploaded to current-data-dump/arguments-dump/training/economy/business_employment_finance_house_would_introduce_mandatory_salary_capping_arguments.pkl
File uploaded to current-data-dump/arguments-dump/t

In [None]:
global_test_arguments_df = global_convert_to_df(global_test_arguments, Dataset.TEST)

File uploaded to current-data-dump/arguments-dump/test/economy/business_economic_policy_africa_house_believes_tunisia_should_not_rely_tourism_arguments.pkl
File uploaded to current-data-dump/arguments-dump/test/economy/business_economic_policy_international_europe_house_believes_eu_should_abandon_arguments.pkl
File uploaded to current-data-dump/arguments-dump/test/economy/business_economic_policy_international_global_house_believes_dictatorship_best_arguments.pkl
File uploaded to current-data-dump/arguments-dump/test/economy/business_economic_policy_law_crime_policing_digital_freedoms_freedom_expression_arguments.pkl
File uploaded to current-data-dump/arguments-dump/test/economy/business_economy_general_house_would_build_hyperloop_arguments.pkl
File uploaded to current-data-dump/arguments-dump/test/economy/business_health_addiction_house_would_ban_smoking_public_spaces_arguments.pkl
File uploaded to current-data-dump/arguments-dump/test/economy/economic_policy_economy_general_internati

## Export arguments dump to OSF

In [None]:
arguments_dump_file_path = 'current-data-dump/arguments-dump'
arguments_dump_file_path_zip = 'current-data-dump/arguments-dump'
shutil.make_archive(arguments_dump_file_path_zip, 'zip', arguments_dump_file_path)
print(f"Zip file created at: {arguments_dump_file_path_zip}")
result = subprocess.run([f"osf -p sakjg upload --force {arguments_dump_file_path_zip}.zip data-dump/arguments_dump.zip"], shell=True, capture_output=True, text=True)
print(result.stderr)
print(f"File: {arguments_dump_file_path_zip} uploaded at osfstorage")

Zip file created at: current-data-dump/arguments-dump

File: current-data-dump/arguments-dump uploaded at osfstorage
