In [1]:
import subprocess
def c_import(library, elements=None, name=None, always_reimport= True, always_reinstall = True):
  if elements:
    import_str = f'from {library} import {", ".join(elements)}'
    tested_install_var = ", ".join(elements)
    any_not_installed = True if any(e not in globals() for e in elements) else False
  else:
    import_str = f'import {library}'
    tested_install_var = library
    any_not_installed = True if library not in globals() else False
  if name:
    import_str = f'{import_str} as {name}'
    tested_install_var = name
    any_not_installed = True if name not in globals() else False

  def sub_install():
    subprocess.run(f'pip install {library}', shell=True, check=True)
    print(f'Library {library} installed successfully.')

  def sub_import():
    exec(import_str, globals())
    print(f'Library {library} imported successfully. As: \n {import_str}')

  if always_reinstall:
    try:
      sub_install()
      sub_import()
    except subprocess.CalledProcessError:
      print(f'Failed to install {library}.')
    except ImportError as err:
      print(f'After Install. Import error: {err}')

  else:
    if always_reimport == True or any_not_installed == True:
        try:
          sub_import()
        except ImportError as err:
          print(f'Import error: {err}')
          if library in str(err):
            try:
                # Use subprocess to run the pip install command
                sub_install()
                sub_import()
            except subprocess.CalledProcessError:
                print(f'Failed to install {library}.')
    else:
      print(f'"{tested_install_var}" already installed and imported')

In [2]:
import_config = {'always_reimport': False, 'always_reinstall': False}
c_import('pandas',name='pd',**import_config)
c_import('csv',**import_config)
c_import('json',**import_config)
c_import('os',**import_config)
c_import('subprocess',**import_config)
c_import('tqdm.notebook',['tqdm'],**import_config)
c_import('concurrent.futures',['ProcessPoolExecutor'],**import_config)
c_import('datetime',**import_config)
c_import('argparse', **import_config)
c_import('codecs', **import_config)
c_import('os', **import_config)
c_import('sys', **import_config)
c_import('numpy',name='np', **import_config)

Library pandas imported successfully. As: 
 import pandas as pd
Library csv imported successfully. As: 
 import csv
Library json imported successfully. As: 
 import json
Library os imported successfully. As: 
 import os
"subprocess" already installed and imported
Library tqdm.notebook imported successfully. As: 
 from tqdm.notebook import tqdm
Library concurrent.futures imported successfully. As: 
 from concurrent.futures import ProcessPoolExecutor
Library datetime imported successfully. As: 
 import datetime
Library argparse imported successfully. As: 
 import argparse
Library codecs imported successfully. As: 
 import codecs
"os" already installed and imported
Library sys imported successfully. As: 
 import sys
Library numpy imported successfully. As: 
 import numpy as np


In [3]:
def remove_invalids(col='', df=''):
  invalid_options = [np.nan, 'nan', None, 0, '0', 'NaN', '[deleted]', '[removed]']
  df = df.dropna(subset=[col], how='all')
  df = df[~df[col].isin(invalid_options)]
  return df

# for source in reddit_files_titles:
source = 'argentina_comments'
source_dir = f'../output/reddit_output/{source}-output_table.csv'
source_output_dir = f'../output/reddit_output/filtered_tables_LIWC_count/{source}-liwc_output.csv'

df_output_table = pd.read_csv(source_dir,sep=',')

df_output_table = remove_invalids('text', df_output_table)

if 'title' in df_output_table.columns: #if its a submission (has title)
    #remove invalid/incomplete obs
    df_output_table = remove_invalids('title', df_output_table)
    #join title and text (yes fillna just in case something passed previous cleanup)
    df_output_table['text'] = df_output_table['title'].fillna('') + '\n' + df_output_table['text'].fillna('')
    #now drop it
    df_output_table = df_output_table.drop(columns=['title'])

In [None]:
import dask.dataframe as dd
import dask.bag as db
from dask.multiprocessing import get

# Read the large CSV using Dask
ddf = dd.read_csv('/Volumes/Drakôn Kholkikos - 2TB/Tesis-Grado/programs/liwc-tmp.csv')

# Process the 'text' column using Dask's bag
liwc_counts = ddf['text'].to_bag().map(liwc().getLIWCCount)

# Save intermediate results to a Dask DataFrame
ddf['liwc_counts'] = liwc_counts

# Persist the Dask DataFrame to optimize performance
ddf = ddf.persist()

# Compute the Dask DataFrame to perform the processing
ddf = ddf.compute(get=get)

# Continue with the rest of your code to concatenate the results
liwc_df = pd.DataFrame(ddf['liwc_counts'].to_list())

# Add 'text_' as a prefix to all column names
liwc_df = liwc_df.add_prefix('text_')

# Concatenate the new DataFrame with the original DataFrame
df_output_table = pd.concat([df_output_table, liwc_df], axis=1)
