**The Task:** Build a Python script that ingests raw text data, cleans it, and loads it into PostgreSQL.

In [15]:
import os
import re
from datasets import load_dataset
import pandas as pd
from itertools import islice
from tqdm import tqdm
import concurrent.futures
import worker_utils
from sqlalchemy import create_engine
import psycopg2
from collections import defaultdict
from dotenv import load_dotenv

load_dotenv(override=True)

db_user = os.getenv('DB_USER')
db_superuser = os.getenv('DB_SUPERUSER')
db_pass = os.getenv('DB_PASS')
db_name = os.getenv('DB_NAME')
db_host = os.getenv('DB_HOST')
db_port = os.getenv('DB_PORT')

db_engine = create_engine(f'postgresql://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}')

def fetch_multiprocess_dataset(repo: str, target_rows: int = 500000, 
                        batch_size: int = 50000):
    # Load the dataset stream (doesn't download everything at once)
    dataset = load_dataset(repo, split="train", streaming=True)
    iter_ds = iter(dataset)

    # List to store the "Future" results (placeholders for work being done)
    futures = []
    final_dfs = []

    with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
        total_batches = (target_rows // batch_size)

        for _ in tqdm(range(total_batches), desc="Processing batches"):
            # Main process grabs the raw data (Network Bound)
            batch = list(islice(iter_ds, batch_size))

            if not batch:
                break

            # Submit the data to a worker to convert (CPU Bound)
            # This returns immediately with a "Future" object
            future = executor.submit(worker_utils.process_batch, batch)
            futures.append(future)

        for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures), desc="Processing..."):
            result_df = future.result()
            final_dfs.append(result_df)

    full_df = pd.concat(final_dfs, ignore_index=True)

    # Trim exact amount (in case last batch went over)
    full_df = full_df.head(target_rows)

    return full_df

def read_csv_as_df(src, chunk_size: int = 10000):
    dfs = {}
    grouped = defaultdict(list)

    for file in src:
        subdir = re.split(r'/\\')
        grouped[subdir].append(file)

        for subdir_name, files in grouped.items():
            table_chunks = []
            
            for file in files:
                reader = pd.read_csv(file, index=False, chunksize=chunk_size)

                for chunk in reader:
                    table_chunks.append(chunk)
        dfs = pd.concat(table_chunks, ignore_index=True)
        dfs[subdir_name] = df

        return dfs
    
def write_dfs_to_files_iter(dfs, base_path: str, file_format="csv", attr_name="name", **kwargs):
    output_dir = f'{base_path}_{file_format}'
    os.makedirs(f'{base_path}_{file_format}', exist_ok=True)

    if isinstance(dfs, dict):
        items = dfs.items() # Get table_name and values
    else:
        table_name = getattr(dfs, attr_name, attr_name)
        items = [(table_name, dfs)]
    
    for table_name, df_or_reader in items:
        # normalize: always make it a list of DataFrames
        output_path = f'{output_dir}/{table_name}.{file_format}'

        # Case 1 — direct DataFrame
        if isinstance(df_or_reader, pd.DataFrame):
            writer = getattr(df_or_reader, f"to_{file_format}", None)
            if writer is None:
                raise ValueError(f"Unsupported format: {file_format}")
            writer(output_path, index=False, **kwargs)
            continue

        # Case 2 — chunked reader (TextFileReader)
        is_first = True
        writer_method = f"to_{file_format}"

        for chunk in df_or_reader:
            chunk.name = table_name
            writer = getattr(chunk, writer_method, None)

            if writer is None:
                raise ValueError(f"Unsupported file format for chunks: {file_format}")
            
            if file_format == "csv":
                writer(
                    output_path,
                    mode="w" if is_first else "a",
                    header=is_first,
                    **kwargs
                )
            
            elif file_format in ("json",):
                # json doesn't support append properly → manual append simulation
                writer(
                    output_path,
                    orient="records",
                    mode="w" if is_first else "a",
                    lines=True,
                    **kwargs
                )
            elif file_format in ("parquet", "feather"):
                # Parquet does NOT support append → we must collect in memory
                # For large datasets, you'd use PyArrow dataset writer instead
                if is_first:
                    chunk.to_parquet(output_path, **kwargs)
                else:
                    chunk.to_parquet(
                        output_path,
                        append=True,  # works only if pyarrow supports it
                        **kwargs
                    )

            else:
                # Generic fallback: overwrite only on first chunk
                writer(
                    output_path,
                    **kwargs
                )
            is_first = False


def chunk_each_dataframe(dataframes: dict, chunk_size: int = 10000) -> dict:
    """
    Returns a dict of generators, one for each DataFrame.
    
    Args:
        dataframes: dict of DataFrames
        chunk_size: number of rows per chunk
        
    Returns:
        dict of generators with same keys
    """
    chunked = {}
    
    for name, data in dataframes.items():
        # Check if it's already a TextFileReader (chunked reader)
        if isinstance(data, pd.io.parsers.TextFileReader):
            # It's already chunked, just pass it through
            chunked[name] = data
        else:
            # It's a DataFrame, create a chunking generator
            def chunk_single_df(df, size=chunk_size):
                for i in range(0, len(df), size):
                    yield df.iloc[i:i + size]
            
            chunked[name] = chunk_single_df(data)
    
    return chunked

def write_dfs_to_sql_iter(
    dfs, 
    connection_string: str,
    schema: str = None,
    if_exists: str = 'replace',
    chunksize: int = 1000,
    **kwargs
):
    """
    Write DataFrames or chunked readers to SQL database.
    
    Args:
        dfs: dict of DataFrames or TextFileReaders
        connection_string: SQLAlchemy connection string
        schema: database schema name (optional)
        if_exists: 'fail', 'replace', or 'append'
        chunksize: rows per write batch (for to_sql method)
        **kwargs: additional arguments for to_sql
    """
    from sqlalchemy import create_engine
    
    if not isinstance(dfs, dict):
        raise TypeError(
            f"dfs must be a dict, got {type(dfs).__name__}. "
            f"Wrap single DataFrames like: {{'table_name': df}}"
        )
    
    # Create database engine
    engine = create_engine(connection_string)
    
    try:
        for table_name, df_or_reader in dfs.items():
            print(f"Writing {table_name} to database...")
            
            # Case 1 — Direct DataFrame
            if isinstance(df_or_reader, pd.DataFrame):
                df_or_reader.to_sql(
                    name=table_name,
                    con=engine,
                    schema=schema,
                    if_exists=if_exists,
                    index=False,
                    chunksize=chunksize,
                    **kwargs
                )
                print(f"✓ {table_name}: {len(df_or_reader)} rows written")
                continue
            
            # Case 2 — Chunked reader (generator)
            is_first = True
            total_rows = 0
            
            for chunk in df_or_reader:
                if not isinstance(chunk, pd.DataFrame):
                    raise ValueError(
                        f"Expected DataFrame chunk, got {type(chunk)}. "
                        f"Reader may be exhausted or invalid."
                    )
                
                # First chunk: replace or fail based on if_exists
                # Subsequent chunks: always append
                mode = if_exists if is_first else 'append'
                
                chunk.to_sql(
                    name=table_name,
                    con=engine,
                    schema=schema,
                    if_exists=mode,
                    index=False,
                    chunksize=chunksize,
                    **kwargs
                )
                
                total_rows += len(chunk)
                is_first = False
            
            print(f"✓ {table_name}: {total_rows} rows written")
    
    finally:
        engine.dispose()

"""if __name__ == "__main__":
    df = fetch_multiprocess_dataset("google/civil_comments", target_rows=1000000)"""

'if __name__ == "__main__":\n    df = fetch_multiprocess_dataset("google/civil_comments", target_rows=1000000)'

In [3]:
df = fetch_multiprocess_dataset("google/civil_comments", target_rows=1000000)

Processing batches: 100%|██████████| 20/20 [05:08<00:00, 15.43s/it]
Processing...: 100%|██████████| 20/20 [00:00<00:00, 46.33it/s]


 **Workflow:**     1. Python script reads CSV.     2. Use Pandas to flag comments containing toxicity.     3. Separate data into two SQL tables: `clean_training_data` and `quarantined_toxic_data`.

In [4]:
df = pd.DataFrame(df)

toxicity_cols = [col for col in df.columns if col != 'text']
toxicity_query_string = f' or '.join([f"{col} > 0" for col in toxicity_cols])
safe_query_string = f' and '.join([f"{col} <= 0" for col in toxicity_cols])

toxicity_mask = (df[toxicity_cols] > 0).any(axis=1)
quarantine_mask = (df[toxicity_cols] <= 0).all(axis=1)

quarantined_toxic_data = df[toxicity_mask].sort_values(
    by=toxicity_cols,
    ascending=False
)
clean_training_data = df[quarantine_mask].sort_values(
    by=toxicity_cols, ascending=False
)

In [5]:
quarantined_toxic_data

Unnamed: 0,text,toxicity,severe_toxicity,obscene,threat,insult,identity_attack,sexual_explicit
813887,"Sorry, bucko, but I'll stay in Alaska - as I h...",1.0,0.4,0.8,0.0,0.9,0.0,0.600000
618250,"Dang folks, break out your happy lights or go ...",1.0,0.3,1.0,0.0,0.6,0.0,0.000000
791235,"Nope, not a damn thing.",1.0,0.3,1.0,0.0,0.6,0.0,0.000000
156196,"As a primitive Native heathen, I do not share ...",1.0,0.3,0.1,0.8,0.4,0.2,0.000000
579515,Enough already. Cut the head off this snake.,1.0,0.3,0.0,1.0,0.4,0.0,0.000000
...,...,...,...,...,...,...,...,...
992498,http://www.secondenlightenment.org/ALL%20ABOAR...,0.0,0.0,0.0,0.0,0.0,0.0,0.100000
997545,Boeing could care less what Sajan and trudeau ...,0.0,0.0,0.0,0.0,0.0,0.0,0.100000
125976,"Excellent, glad that you agree that a prospero...",0.0,0.0,0.0,0.0,0.0,0.0,0.090909
318183,Allan can't read with his right eye tightly cl...,0.0,0.0,0.0,0.0,0.0,0.0,0.034483


In [6]:
clean_training_data

Unnamed: 0,text,toxicity,severe_toxicity,obscene,threat,insult,identity_attack,sexual_explicit
0,Good...to protect the environment the EPA peop...,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,Interesting,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,You know what I really don't care what people ...,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,If port Townsend was smart it would lower its ...,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,"Abigail, the rightful Queen of Hawaii. Well a...",0.0,0.0,0.0,0.0,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...
999995,yeah ... and that's still a couple hundred not...,0.0,0.0,0.0,0.0,0.0,0.0,0.0
999996,There are at least 2 points that Goldberg igno...,0.0,0.0,0.0,0.0,0.0,0.0,0.0
999997,"What is ""cofveve""?",0.0,0.0,0.0,0.0,0.0,0.0,0.0
999998,Thank you GM for alerting the public about the...,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [7]:
chunked_quarantine = chunk_each_dataframe({"quarantined_toxic_data": quarantined_toxic_data}, 50000)
chunked_clean = chunk_each_dataframe({'clean_training_data': clean_training_data}, chunk_size=50000)

write_dfs_to_files_iter(chunked_quarantine, './output', 'csv', attr_name='quarantined')
write_dfs_to_files_iter(chunked_clean, './output', 'csv', attr_name='clean')

In [8]:
db_url = os.getenv('DB_URL')

chunked_quarantine = chunk_each_dataframe({"quarantined_toxic_data": quarantined_toxic_data}, 50000)
chunked_clean = chunk_each_dataframe({'clean_training_data': clean_training_data}, chunk_size=50000)

write_dfs_to_sql_iter(
    chunked_clean,
    db_url,
    if_exists='replace',
    chunksize=50000,
    method='multi'
)

write_dfs_to_sql_iter(
    chunked_quarantine,
    db_url,
    if_exists='replace',
    chunksize=50000,
    method='multi'
)

Writing clean_training_data to database...
✓ clean_training_data: 693877 rows written
Writing quarantined_toxic_data to database...
✓ quarantined_toxic_data: 306123 rows written
