Further information can be obtained on [Wharton's website](https://wrds-www.wharton.upenn.edu/pages/support/manuals-and-overviews/compustat/capital-iq/transcripts/wrds-overview-capitaliq-transcripts-data/#general-description).

In [None]:
import wrds
import logging
import pandas as pd
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from dataclasses import dataclass
from typing import Optional
from pathlib import Path


nltk.download('punkt')
nltk.download('stopwords')

In [None]:
logging.basicConfig(level=logging.INFO, force=True)

In [None]:
db: wrds.Connection = wrds.Connection()

Run the query
Using our WRDS connection, db, we can run a query with some joins and filters.This query retrieves transcript component text as well as transcript and speaker metadata.

Three tables are used:

- wrds_transcript_detail - transcript metadata
- wrds_transcript_person - speaker metadata
- ciqtranscriptcomponent - full transcript text
The transcript data is filtered to companies with CIQ CompanyId.

In [None]:
select_companies_with_id = '''
        SELECT DISTINCT d.companyid, d.companyname
        FROM ciq.wrds_transcript_detail as d
        WHERE date_part('year', mostimportantdateutc) BETWEEN 2023 AND 2024
'''

companies: pd.DataFrame = db.raw_sql(select_companies_with_id)

companies.companyid = companies.companyid.astype(int)

companies.head()

In [None]:
companies.dtypes

In [None]:
companies.to_csv(Path("..") / "data" / "companies.csv", sep="\t", index=False)

In [None]:
def get_companies(name: str) -> pd.DataFrame:
    """Filter function for search in companies dataframe
    I don't want to reach out to Wharton API with every and each search,
    so this function will filter in the pulled dataframe (companies)

    Args:
        name (str): Company name filter

    Returns:
        pd.DataFrame: list of companies with their id-s
    """

    return companies[companies.companyname.str.contains(name, case=False, na=False)]

In [None]:
def get_company_id(company: str) -> int | None:
    """Filtering based on company name and returning the one and only company's id

    Args:
        company (str): Company name, expected full match

    Returns:
        int: returned id, None if there's no such company
    """
    filtered: pd.DataFrame = companies[companies.companyname.str.fullmatch(company)]
    return filtered.companyid.item() if filtered.shape[0] == 1 else None

In [None]:
def get_company_names(ids: list[str]) -> pd.DataFrame:
    """Filter function for search in companies dataframe based on id
    I don't want to reach out to Wharton API with every and each search,
    so this function will filter in the pulled dataframe (companies)

    Args:
        ids (list[str]): Company id filter, all matching companies are returned

    Returns:
        pd.DataFrame: list of companies with their id-s
    """

    return companies[companies.companyname.isin(ids)]

In [None]:
company_name = 'goog'

filtered = get_companies(company_name)
filtered

In [None]:
goog: str = 'Google LLC'

google_id = get_company_id(goog)
google_id

In [None]:
chipotle = 'Chipotle Mexican Grill, Inc.'
chipotle_id = get_company_id(chipotle)

chipotle_id

In [None]:
asml = 'ASML Holding N.V.'
asml_id = get_company_id(asml)

asml_id

In [None]:
def get_id_string(ids: list[str]):
    company_id_string: str = ""
    for c in ids:
        company_id_string += str(c) + ','

    return company_id_string[:-1]

In [None]:
company_id_list: list = [google_id, chipotle_id, asml_id]

company_id_string = get_id_string(company_id_list)
company_id_string

In [None]:
sql_query = f'''
            SELECT a.*, b.*, c.componenttext
            FROM (
                  SELECT * 
                  FROM ciq.wrds_transcript_detail
                  WHERE companyid IN ({google_id})
                    AND date_part('year', mostimportantdateutc) BETWEEN 2000 AND 2025
                 ) AS a
            JOIN ciq.wrds_transcript_person AS b
              ON a.transcriptid = b.transcriptid
            JOIN ciq.ciqtranscriptcomponent AS c
              ON b.transcriptcomponentid = c.transcriptcomponentid
            ORDER BY a.transcriptid, b.componentorder;
            '''

df = db.raw_sql(sql_query)

In [None]:
df.head()

In [None]:
df = df.drop(['transcriptpersonname'], axis=1)

In [None]:
df.info()

In [None]:
transcripts: pd.DataFrame = (
    df.groupby(["companyid", "mostimportantdateutc", "mostimportanttimeutc", "headline"])
    .apply(
        lambda group: "\n".join(
            f"{row['speakertypename']}: {row['componenttext']}" for _, row in group.iterrows()
        ),
        include_groups=False,
    )
    .reset_index(name="full_text")
)

In [None]:
transcripts.companyid = transcripts.companyid.astype(int)
transcripts.head()

In [None]:
transcripts['word_count'] = transcripts['full_text'].apply(lambda x: len(str(x).split()))

In [None]:
transcripts['word_count_nltk'] = transcripts['full_text'].apply(
    lambda x: len(word_tokenize(str(x)))
)

In [None]:
grouped_transcripts = transcripts.groupby('companyid')

for company_id, group in grouped_transcripts:
    group.to_csv(
        Path("..") / "data" / f"{company_id}.csv",
        sep='\t',
        index=False,
        quoting=1,
        escapechar='\\',
        doublequote=True,
        quotechar='"',
        lineterminator='\n',
    )

In [None]:
df_google = pd.read_csv(
    Path("..") / "data" / f"{google_id}.csv",
    sep='\t',
    quoting=1,
    escapechar='\\',
    doublequote=True,
    quotechar='"',
)

df_google.head()

In [None]:
df_google

In [None]:
import logging


@dataclass
class WhartonCompanyIdSearchCache:
    id: int
    name: str
    df: pd.DataFrame
    transcripts: Optional[pd.DataFrame]


class WhartonScraper:
    """
    Wrapper Class for Scraping Wharton Transcripts database for a single company
    Based on Company name or id.
    """

    def __init__(
        self,
        connection: wrds.Connection,
    ):
        self.connection: wrds.Connection = connection
        self.company_search_cache: WhartonCompanyIdSearchCache = None

    def __repr__(self):
        return f"WhartonScraper(id={self.company_search_cache.id}, name={self.company_search_cache.name})"

    def __str__(self):
        return f"WhartonScraper for company ({self.company_search_cache.name})"

    def pipeline(self, company_id: str) -> None:
        """Full Pipeline for transcript acquisition from Wharton database, based on `companyid`

        Args:
            company_id (str): `companyid` to filter by
        """
        self.get_company_by_id(company_id)
        self.get_company_transcripts()
        self.transcripts_to_csv()

    def get_company_by_id(self, company_id: str) -> Optional[pd.DataFrame]:
        """
        Reaching out to Wharton database to see if `companyid` is present
        """
        if self.company_search_cache and self.company_search_cache.id == company_id:
            logging.debug(f"using cache on company: {company_id}")
            return self.company_search_cache.df

        select_company = f"""
            SELECT DISTINCT d.companyid, d.companyname
            FROM ciq.wrds_transcript_detail as d
            WHERE d.companyid = {company_id}
        """
        df: pd.DataFrame = self.connection.raw_sql(select_company)

        if df.shape[0] > 1:
            logging.debug(f"too many results for search: {df.shape[0]}")
            self.company_search_cache = None
            return None

        self.company_search_cache = WhartonCompanyIdSearchCache(
            id=company_id, name=df.companyname[0], df=df, transcripts=None
        )
        logging.info(f"information acquired for company: {company_id}")
        return df

    def get_company_transcripts(self) -> Optional[pd.DataFrame]:
        """
        Acquiring company transcripts based on the cached `companyid`
        """
        if not self.company_search_cache:
            logging.debug("no company cache")
            return None
        if self.company_search_cache.transcripts:
            logging.debug("transcripts already cached")
            return self.company_search_cache.transcripts

        query = f"""
            SELECT a.*, b.*, c.componenttext
            FROM (
                  SELECT * 
                  FROM ciq.wrds_transcript_detail
                  WHERE companyid = {self.company_search_cache.id}
                    AND date_part('year', mostimportantdateutc) BETWEEN 2023 AND 2025
                 ) AS a
            JOIN ciq.wrds_transcript_person AS b
              ON a.transcriptid = b.transcriptid
            JOIN ciq.ciqtranscriptcomponent AS c
              ON b.transcriptcomponentid = c.transcriptcomponentid
            ORDER BY a.transcriptid, b.componentorder;
            """
        df = self.connection.raw_sql(query)
        df = df.drop(["transcriptpersonname"], axis=1)
        transcripts: pd.DataFrame = (
            df.groupby(
                [
                    "companyid",
                    "mostimportantdateutc",
                    "mostimportanttimeutc",
                    "headline",
                ]
            )
            .apply(
                lambda group: "\n".join(
                    f"{row['speakertypename']}: {row['componenttext']}"
                    for _, row in group.iterrows()
                ),
                include_groups=False,
            )
            .reset_index(name="full_text")
        )
        transcripts.companyid = transcripts.companyid.astype(int)
        transcripts["word_count"] = transcripts["full_text"].apply(lambda x: len(str(x).split()))
        transcripts["word_count_nltk"] = transcripts["full_text"].apply(
            lambda x: len(word_tokenize(str(x)))
        )

        self.company_search_cache.transcripts = transcripts
        logging.info(
            f"transcripts acquired for company: {self.company_search_cache.id} with a shape: {transcripts.shape}"
        )
        return transcripts

    def transcripts_to_csv(self) -> None:
        """
        Writing transcript dataset to file if it is present
        """
        if self.company_search_cache.transcripts is None:
            logging.debug("no transcript records.")
            return

        self.company_search_cache.transcripts.to_csv(
            Path("..") / "data" / f"{self.company_search_cache.id}.csv",
            sep="\t",
            index=False,
            quoting=1,
            escapechar="\\",
            doublequote=True,
            quotechar='"',
            lineterminator="\n",
        )
        logging.info(f"transcripts successfully written to {self.company_search_cache.id}.csv")

In [None]:
scraper = WhartonScraper(connection=db)

df = scraper.get_company_by_id(google_id)
scraper

In [None]:
scraper.pipeline('31293209')
scraper

In [None]:
stop_words = stopwords.words('english')
stop_words

In [None]:
import re
from collections import Counter
import matplotlib.pyplot as plt


class WordCounter:
    def __init__(self, text: str, stop_words=None):
        self.text = text
        self.stop_words = set(stop_words) if stop_words else None
        self.word_counts = self._count_words()

    def _count_words(self):
        words = re.findall(r"\b\w+\b", self.text.lower())
        if self.stop_words:
            words = [word for word in words if word in self.stop_words]
        return Counter(words).most_common(36)

    def get_count(self, word: str) -> int:
        return self.word_counts.get(word.lower(), 0)

    def get_all_counts(self) -> dict:
        return dict(self.word_counts)

    def plot_counts(self):
        words, counts = zip(*self.word_counts)
        plt.figure(figsize=(10, 5))
        plt.bar(words, counts, color='skyblue')
        plt.xlabel("Words")
        plt.ylabel("Count")
        plt.title("Word Frequency")
        plt.xticks(rotation=45)
        plt.show()

In [None]:
example_text = scraper.company_search_cache.transcripts.full_text[0]

len(example_text)

In [None]:
counter = WordCounter(example_text, stop_words=stop_words)

counter.plot_counts()

In [None]:
nltk.download('punkt')
nltk.download('stopwords')

In [None]:
lower_text = example_text.lower()

words = word_tokenize(lower_text)

words = [word for word in words if word.isalnum()]

stop_words = set(stopwords.words('english'))
filtered_words = [word for word in words if word not in stop_words]

print(f'original: {len(words)}, filtered: {len(filtered_words)}')

In [None]:
len(example_text.split())

In [None]:
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
import evaluate

filtered_text = " ".join(filtered_words)

model_name = "facebook/bart-large-cnn"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSeq2SeqLM.from_pretrained(model_name)

In [None]:
len(filtered_words)

In [None]:
source_word_count = len(filtered_words)
target_min = int(source_word_count * 0.10)
target_max = int(source_word_count * 0.30)

inputs = tokenizer(filtered_text, return_tensors="pt", truncation=True, max_length=1024)

summary_ids = model.generate(
    inputs["input_ids"],
    num_beams=4,
    max_length=target_max,
    min_length=target_min,
    early_stopping=True,
)

summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True)

In [None]:
summary

In [None]:
def compression_ratio(summary, source):
    summary_word_count = len(summary.split())
    source_word_count = len(source.split())
    return summary_word_count / source_word_count


ratio = compression_ratio(summary, filtered_text)
print(f"\nCompression Ratio related to filted text: {ratio:.2%}")
ratio = compression_ratio(summary, example_text)
print(f"\nCompression Ratio related to original text: {ratio:.2%}")

In [None]:
rouge = evaluate.load("rouge")

rouge_scores = rouge.compute(predictions=[summary], references=[example_text])

print("\nROUGE Scores:")
for key, value in rouge_scores.items():
    print(f"{key}: {value:.4f}")

In [None]:
rouge_scores = rouge.compute(predictions=[summary], references=[filtered_text])

print("\nROUGE Scores:")
for key, value in rouge_scores.items():
    print(f"{key}: {value:.4f}")