# ***DATA INGESTION***

## **1. Data Storage**
- SQLite for Dev
- Supabase for deployment

| Option                             | Pros                            | Cons                  | Recommendation                           |
| ---------------------------------- | ------------------------------- | --------------------- | ---------------------------------------- |
| ✅ **Local CSV**                    | Easy, quick to debug            | Not scalable          | Good for dev/testing                     |
| ✅ **SQLite (Local SQL)**           | Table-based, schema control     | File-based            | Perfect for local DB, supports SQL       |
| 🆓 **Supabase (PostgreSQL)**       | Free 500MB, SQL, REST API, auth | Slight learning curve | **Highly recommended** for free cloud DB |
| 🆓 **Google Cloud SQL (Postgres)** | Scalable                        | Not free beyond trial | Avoid for now                            |
| ❌ **MongoDB Atlas**                | Free 512MB, NoSQL               | No relational joins   | Not ideal for interaction tables         |


In [36]:
import pandas as pd
import sqlite3
import yaml
import os
from tqdm import tqdm

In [37]:
# config.yaml
with open('../config.yaml', 'r') as f:
    config = yaml.safe_load(f)

In [38]:
csv_path = config['data']['raw_csv_path']
sqlite_path = config['data']['sqlite_db_path']
raw_table = config['sqlite']['raw_table']
interaction_table = config['sqlite']['interaction_table']

In [None]:
chunk_size = 1000 
MAX_ROWS = 2000

reader = pd.read_csv(csv_path, chunksize=chunk_size)

In [None]:
# SQLite
conn = sqlite3.connect(sqlite_path)
cursor = conn.cursor()

In [11]:
# Create table if not exists
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {raw_table} (
    tweet_id TEXT PRIMARY KEY,
    author_id TEXT,
    inbound BOOLEAN,
    created_at TEXT,
    text TEXT,
    response_tweet_id TEXT,
    in_response_to_tweet_id TEXT
);
""")
conn.commit()

In [12]:
def insert_chunk(df_chunk):
    rows = [
        (
            str(row['tweet_id']),
            str(row['author_id']),
            bool(row['inbound']),
            str(row['created_at']),
            str(row['text']),
            str(row['response_tweet_id']) if not pd.isna(row['response_tweet_id']) else None,
            str(row['in_response_to_tweet_id']) if not pd.isna(row['in_response_to_tweet_id']) else None
        )
        for _, row in df_chunk.iterrows()
    ]
    
    cursor.executemany(f"""
        INSERT OR IGNORE INTO {raw_table} 
        (tweet_id, author_id, inbound, created_at, text, response_tweet_id, in_response_to_tweet_id)
        VALUES (?, ?, ?, ?, ?, ?, ?);
    """, rows)
    conn.commit()

In [16]:
current_rows = 0

for chunk in tqdm(reader, desc="Ingesting chunks"):
    insert_chunk(chunk)
    current_rows += len(chunk)
    if current_rows >= MAX_ROWS:
        break

Ingesting chunks: 1it [00:00,  7.37it/s]


In [64]:
pd.read_sql(f"SELECT COUNT(*) as total FROM {raw_table};", conn)
pd.read_sql(f"SELECT * FROM {raw_table} LIMIT 10;", conn)

Unnamed: 0,tweet_id,author_id,inbound,created_at,text,response_tweet_id,in_response_to_tweet_id
0,1,sprintcare,0,Tue Oct 31 22:10:47 +0000 2017,@115712 I understand. I would like to assist y...,2.0,3.0
1,2,115712,1,Tue Oct 31 22:11:45 +0000 2017,@sprintcare and how do you propose we do that,,1.0
2,3,115712,1,Tue Oct 31 22:08:27 +0000 2017,@sprintcare I have sent several private messag...,1.0,4.0
3,4,sprintcare,0,Tue Oct 31 21:54:49 +0000 2017,@115712 Please send us a Private Message so th...,3.0,5.0
4,5,115712,1,Tue Oct 31 21:49:35 +0000 2017,@sprintcare I did.,4.0,6.0
5,6,sprintcare,0,Tue Oct 31 21:46:24 +0000 2017,@115712 Can you please send us a private messa...,57.0,8.0
6,8,115712,1,Tue Oct 31 21:45:10 +0000 2017,@sprintcare is the worst customer service,9610.0,
7,11,sprintcare,0,Tue Oct 31 22:10:35 +0000 2017,@115713 This is saddening to hear. Please shoo...,,12.0
8,12,115713,1,Tue Oct 31 22:04:47 +0000 2017,@sprintcare You gonna magically change your co...,111314.0,15.0
9,15,sprintcare,0,Tue Oct 31 20:03:31 +0000 2017,@115713 We understand your concerns and we'd l...,12.0,16.0


In [41]:
cursor.execute(f"SELECT COUNT(*) FROM {raw_table};")
row_count = cursor.fetchone()[0]

print(f"Total rows in `{raw_table}`: {row_count}")
conn.close()

Total rows in `raw_tweets`: 1745000


In [19]:
import pandas as pd

conn = sqlite3.connect(sqlite_path)
df_preview = pd.read_sql(f"SELECT * FROM {raw_table} LIMIT 5;", conn)
conn.close()

df_preview.head()

Unnamed: 0,tweet_id,author_id,inbound,created_at,text,response_tweet_id,in_response_to_tweet_id
0,1,sprintcare,0,Tue Oct 31 22:10:47 +0000 2017,@115712 I understand. I would like to assist y...,2.0,3.0
1,2,115712,1,Tue Oct 31 22:11:45 +0000 2017,@sprintcare and how do you propose we do that,,1.0
2,3,115712,1,Tue Oct 31 22:08:27 +0000 2017,@sprintcare I have sent several private messag...,1.0,4.0
3,4,sprintcare,0,Tue Oct 31 21:54:49 +0000 2017,@115712 Please send us a Private Message so th...,3.0,5.0
4,5,115712,1,Tue Oct 31 21:49:35 +0000 2017,@sprintcare I did.,4.0,6.0


In [None]:
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
print("📦 Tables in DB:", cursor.fetchall())

conn.close()

📦 Tables in DB: [('raw_tweets',), ('interaction_table',)]


In [None]:
conn = sqlite3.connect(sqlite_path)

df = pd.read_sql("SELECT * FROM raw_tweets", conn)
conn.close()

df['tweet_id'] = df['tweet_id'].astype(str).str.strip()
df['in_response_to_tweet_id'] = df['in_response_to_tweet_id'].astype(str).str.strip()

inbound_df = df[df['inbound'] == 1].copy()
outbound_df = df[df['inbound'] == 0].copy()

matches = outbound_df['in_response_to_tweet_id'].isin(inbound_df['tweet_id'])

print("✅ Outbound replies that match inbound tweets:", matches.sum())

print(outbound_df[matches][['tweet_id', 'in_response_to_tweet_id']].head())

✅ Outbound replies that match inbound tweets: 0
Empty DataFrame
Columns: [tweet_id, in_response_to_tweet_id]
Index: []


## **2. Preprocessing**

| Task                                             | Tool                                        |
| ------------------------------------------------ | ------------------------------------------- |
| Handle missing `text`, `in_response_to_tweet_id` | Pandas `.dropna()` or `.fillna()`           |
| Convert `created_at` to datetime                 | `pd.to_datetime()`                          |
| Standardize casing and punctuation               | `text_cleaner.py` (utils)                   |
| Remove/flag masked tokens (`__email__`, etc.)    | Regex                                       |
| Link response chains into pairs/threads          | Custom logic in `normalizer.py`             |
| Tokenize (optional later)                        | `nltk` or `spacy`                           |
| Idempotency                                      |composite key of `conversation_id`, `customer_id`, `agent_id`, `created_at` for deduplication |


In [31]:
#src/utils/text_cleaner.py

import re
import string

def clean_text(text: str) -> str:
    if not isinstance(text, str):
        return ""
    text = text.lower()
    text = re.sub(r'__email__|__phone__|__url__', '', text)
    text = re.sub(r"http\S+|www\S+", "", text)  # remove links
    text = text.translate(str.maketrans('', '', string.punctuation))
    text = re.sub(r"\s+", " ", text).strip()
    return text

In [None]:
# src/components/normalizer.py

import sqlite3
import pandas as pd

def normalize_tweets(db_path: str, raw_table: str, output_table: str):
    conn = sqlite3.connect(db_path)

    df = pd.read_sql(f"SELECT * FROM {raw_table}", conn)
    df['created_at'] = pd.to_datetime(df['created_at'], errors='coerce', format='%a %b %d %H:%M:%S %z %Y')
    df['text_cleaned'] = df['text'].apply(clean_text)

    df = df[df['tweet_id'].notnull()]
    df['tweet_id'] = df['tweet_id'].astype(float).astype(int).astype(str)
    df['in_response_to_tweet_id'] = df['in_response_to_tweet_id'].fillna(-1)
    df['in_response_to_tweet_id'] = df['in_response_to_tweet_id'].astype(float).astype(int).astype(str)

    inbound_df = df[df['inbound'] == 1].copy()
    outbound_df = df[df['inbound'] == 0].copy()

    merged = outbound_df.merge(
        inbound_df,
        left_on='in_response_to_tweet_id',
        right_on='tweet_id',
        suffixes=('_agent', '_cust')
    )

    merged['conversation_id'] = merged['tweet_id_cust']

    interaction_df = merged[[
        'conversation_id',
        'author_id_cust', 'author_id_agent',
        'text_cleaned_cust', 'text_cleaned_agent',
        'created_at_cust'
    ]].rename(columns={
        'author_id_cust': 'customer_id',
        'author_id_agent': 'agent_id',
        'text_cleaned_cust': 'customer_text',
        'text_cleaned_agent': 'agent_response',
        'created_at_cust': 'created_at'
    })

    try:
        existing = pd.read_sql(
            f"SELECT conversation_id, customer_id, agent_id, created_at FROM {output_table}",
            conn
        )
        interaction_df = interaction_df.merge(
            existing,
            on=['conversation_id', 'customer_id', 'agent_id', 'created_at'],
            how='left',
            indicator=True
        ).query('_merge == "left_only"').drop(columns=['_merge'])
    except:
        pass  

    if not interaction_df.empty:
        interaction_df.to_sql(output_table, conn, index=False, if_exists='append')

    print(f"Created interaction table with {len(interaction_df)} new rows.")
    conn.close()

In [66]:
normalize_tweets(
    db_path=sqlite_path,
    raw_table=raw_table,
    output_table=interaction_table
)

Created interaction table with 780225 new rows.


In [None]:
conn = sqlite3.connect(sqlite_path)

df_interactions = pd.read_sql(f"SELECT * FROM {interaction_table} LIMIT 20;", conn)
display(df_interactions)

row_count = pd.read_sql(f"SELECT COUNT(*) as count FROM {interaction_table};", conn)
print(f"Total interactions: {row_count['count'].iloc[0]}")

conn.close()


Unnamed: 0,conversation_id,customer_id,agent_id,customer_text,agent_response,created_at
0,3,115712,sprintcare,sprintcare i have sent several private message...,115712 i understand i would like to assist you...,2017-10-31 22:08:27+00:00
1,5,115712,sprintcare,sprintcare i did,115712 please send us a private message so tha...,2017-10-31 21:49:35+00:00
2,8,115712,sprintcare,sprintcare is the worst customer service,115712 can you please send us a private messag...,2017-10-31 21:45:10+00:00
3,12,115713,sprintcare,sprintcare you gonna magically change your con...,115713 this is saddening to hear please shoot ...,2017-10-31 22:04:47+00:00
4,16,115713,sprintcare,sprintcare since i signed up with yousince day 1,115713 we understand your concerns and wed lik...,2017-10-31 20:00:43+00:00
5,18,115713,sprintcare,115714 y’all lie about your “great” connection...,115713 h there wed definitely like to work wit...,2017-10-31 19:56:01+00:00
6,20,115715,sprintcare,115714 whenever i contact customer support the...,115715 please send me a private message so tha...,2017-10-31 22:03:34+00:00
7,24,115716,Ask_Spectrum,askspectrum that is incorrect information i ha...,115716 what information is incorrect jk,2017-10-31 22:13:02+00:00
8,22,115716,Ask_Spectrum,askspectrum would you like me to email you a c...,115716 our department is part of the corporate...,2017-10-31 22:16:48+00:00
9,26,115716,Ask_Spectrum,askspectrum i received this from your corporat...,115716 no thank you jk,2017-10-31 22:19:56+00:00


Total interactions: 1560450


In [69]:
df_interactions.to_csv(r"D:\IIT BBS\Job Resources\Riverline\Riverline_NBA_system\data\processed\interaction_table.csv", index=False)
pd.read_csv(r"D:\IIT BBS\Job Resources\Riverline\Riverline_NBA_system\data\processed\interaction_table.csv").head()

Unnamed: 0,conversation_id,customer_id,agent_id,customer_text,agent_response,created_at
0,3,115712,sprintcare,sprintcare i have sent several private message...,115712 i understand i would like to assist you...,2017-10-31 22:08:27+00:00
1,5,115712,sprintcare,sprintcare i did,115712 please send us a private message so tha...,2017-10-31 21:49:35+00:00
2,8,115712,sprintcare,sprintcare is the worst customer service,115712 can you please send us a private messag...,2017-10-31 21:45:10+00:00
3,12,115713,sprintcare,sprintcare you gonna magically change your con...,115713 this is saddening to hear please shoot ...,2017-10-31 22:04:47+00:00
4,16,115713,sprintcare,sprintcare since i signed up with yousince day 1,115713 we understand your concerns and wed lik...,2017-10-31 20:00:43+00:00


## **Delivery Format**

| Field             | Description                                            |
| ----------------- | ------------------------------------------------------ |
| `conversation_id` | Group ID for each conversation (via thread/root tweet) |
| `customer_id`     | From `author_id` of inbound tweets                     |
| `agent_id`        | From response tweet                                    |
| `customer_text`   | Text of the inbound tweet                              |
| `agent_response`  | Text of the matching outbound tweet                    |
| `created_at`      | Timestamp of first message                             |
| `resolved_status` | Initial label for downstream filtering                 |


## **Automation and Scheduling (CI)**

| Tool                       | Cost                      | Best Use                    | Recommendation                        |
| -------------------------- | ------------------------- | --------------------------- | ------------------------------------- |
| ✅ `cron + bash/python`     | Free                      | Lightweight automation      | Best for dev/local runs               |
| ✅ GitHub Actions           | Free tier                 | Scheduled workflows on push | Great for cloud-native jobs           |
| 🆓 **Airflow + Astro CLI** | Free dev usage            | DAG + monitoring            | Good if you want production-grade ETL |
| 🆓 Prefect                 | Free for small scale      | Python-native workflows     | Alternative to Airflow                |
| ❌ AWS Lambda               | Free only for light loads | Cold starts, setup effort   | Overkill for now                      |
