Official docs <br >
https://www.twilio.com/docs/usage/secure-credentials: Everything about setting up environmental creds <br >
https://www.twilio.com/docs/messaging/tutorials/how-to-retrieve-and-modify-message-history: Everything about getting messages

In [None]:
# Alternative method of saving enviromental variables. Only works for 1 session
# Never share or save your tokens!

# imports
import os
import pandas as pd
from twilio.rest import Client
from datetime import datetime, timedelta
from tqdm import tqdm
import numpy as np


# Find your credentials at twilio.com/console. Keeps your credentials safe
os.environ['TWILIO_ACCOUNT_SID'] = '' # insert your Account SID here
os.environ['TWILIO_ACCOUNT_TOKEN'] = '' # insert your token here

### Read credentials

In [None]:
# Load your credentials. In this example we already saved them in local environment
account_sid = os.environ['TWILIO_ACCOUNT_SID']
auth_token = os.environ['TWILIO_ACCOUNT_TOKEN']

# Initialise
client = Client(account_sid, auth_token)

### Test cell

In [1]:

# Repo and installation https://github.com/twilio/twilio-python
import os
from twilio.rest import Client

# Limit, how many messages do we want to get as a response
messages = client.messages.list(limit=100)

# Test that we recieve a cpecific data from a specific column
for record in messages:
    print(record.sid) # specify a column

### Functions

In [None]:
def create_dataframe(records):
    df_data = []
    for record in records:
        df_data.append({
            'phone_from': record.from_,
            'phone_to': record.to,
            'body': record.body,
            'status': record.status,
            'date_sent_format': record.date_sent.strftime("%Y-%m-%d"), #datetime.strptime(record.date_sent, "%a, %d %b %Y %H:%M:%S %z").strftime("%Y-%m-%d"),
            'date_sent': record.date_sent,
            'api_version': record.api_version,
            'num_segments': record.num_segments,
            'error_message': record.error_message,
            'account_sid': record.account_sid,
            'sid': record.sid,
            'direction': record.direction,
            'price': record.price,
            'price_unit': record.price_unit,
        })

    return pd.DataFrame(df_data)

def limiter(limit=None, date_range=None):
    if date_range:
        start_date, end_date = date_range
    else:
        start_date, end_date = None, None

    all_records = []
    page_size = 200  # Twilio default page size

    # Get total number of messages
    total_messages = len(list(client.messages.list(
        date_sent_after=start_date,
        date_sent_before=end_date,
    )))

    # Calculate the number of batches needed
    total_batches = total_messages // page_size + (total_messages % page_size > 0)

    # Initialize tqdm to reflect the number of batches
    pbar = tqdm(total=total_batches, desc="Loading batches", unit="batch")

    # Paginate through the records using the stream method
    for page in client.messages.stream(
            date_sent_after=start_date,
            date_sent_before=end_date
    ):
        all_records.append(page)

        # Update the progress bar description
        pbar.set_description(f"Loading batches {len(all_records)//page_size}/{total_batches} - Time: {pbar.format_dict['elapsed']}")

        # If the limit is set, break after loading the required number of records
        if limit and len(all_records) >= limit:
            break

    pbar.close()

    # Create the DataFrame from all records
    df = create_dataframe(all_records[:limit])
    return df

# Date range mode - load messages within a specific date range
start_date = datetime(2023, 9, 1)
end_date = datetime(2023, 11, 28)
date_range = (start_date, end_date)


### Create table and first run

In [None]:
table_create = '''
CREATE OR REPLACE TABLE your_project.dataset.twilio

(
    phone_from       INT64,
    phone_to         INT64,
    body             STRING,
    status           STRING,
    date_sent_format DATE,
    date_sent        TIMESTAMP,
    api_version      STRING,
    num_segments     STRING,
    error_message    STRING,
    account_sid      STRING,
    sid              STRING,
    direction        STRING,
    price            FLOAT64,
    price_unit       STRING
)
PARTITION BY date_sent_format
CLUSTER BY date_sent, phone_to, sid
OPTIONS (
    DESCRIPTION = 'Contains whatsapp messages from twilio service'
    )
'''

In [None]:

# first run
# ------------------------- Commented. First use only - get initial chunk of data and load it into an empty table -------------------------
# df_records_date_range = limiter(date_range=date_range)

## remove "whatsapp +" from number and price to float64
# df_records_date_range['phone_from'] = df_records_date_range['phone_from'].str.extract('(\d+)').replace(np.nan, 0).astype(int)
# df_records_date_range['phone_to'] = df_records_date_range['phone_to'].str.extract('(\d+)').replace(np.nan, 0).astype(int)
# df_records_date_range['price'] = df_records_date_range['price'].replace(np.nan,0).astype(float).abs()
# df_records_date_range

# initial load
# df_records_date_range.to_gbq(destination_table='dataset.twilio', project_id='your_project', if_exists='replace')
# print(f'Loaded! At {datetime.now()}')
# df_records_date_range.info()

# ------------------------- Commented. First use only ------------------------

### Main ETL

In [None]:
# Load data to BigQuery

# -30 days from yesterday. You can set any date interval for your particular case
start_date = (datetime.now().date() - timedelta(days=30)).strftime('%Y-%m-%d')
end_date = (datetime.now().date()).strftime('%Y-%m-%d')
date_range = (start_date, end_date)

# Run job and filter only specific dates
df_to_load = limiter(date_range=date_range)

# Remove "whatsapp +" from number and price to float64
df_to_load['phone_from'] = df_to_load['phone_from'].str.extract('(\d+)').replace(np.nan, 0).astype(int)
df_to_load['phone_to'] = df_to_load['phone_to'].str.extract('(\d+)').replace(np.nan, 0).astype(int)
df_to_load['price'] = df_to_load['price'].replace(np.nan,0).astype(float).abs()

In [None]:
# Delete last 31 days in BigQuery. Not essential. We use replace when uploading data
# pd.read_gbq('''DELETE FROM indrive-inlocal.supermasters.twilio
# WHERE date_sent_format > DATE_SUB(CURRENT_DATE(), INTERVAL 31 DAY);''')
import pandas_gbq

# Set the amount of dates you are going to load later
sql = '''
DELETE FROM your_project.dataset.twilio
WHERE date_sent_format > DATE_SUB(CURRENT_DATE(), INTERVAL 31 DAY);
'''

# Execute the query
pandas_gbq.read_gbq(sql, project_id='your_project')

In [None]:
# Load 30 days of fresh data from Twilio
df_to_load.to_gbq(destination_table='twilio', project_id='your_project', if_exists='append')
print(f'Loaded! At {datetime.now()}')

In [None]:
# PS. Check that process if finished successfully. Check loaded data first time around