In [152]:
import docx
import os
import xml.etree.ElementTree as ET
import re
from fuzzywuzzy import fuzz


In [153]:
rtf_path = os.path.abspath('sample_rtf')
xml_path = os.path.abspath('sample_xml')
for root, dirs, files in os.walk(rtf_path):
        for file in files:
            print(os.path.join(root, file))

e:\BNY Mellon capstone project\BKG\data\sample_rtf\Northern Trust Corporation, Q1 2020 Earnings Call, Apr 21, 2020.rtf
e:\BNY Mellon capstone project\BKG\data\sample_rtf\State Street Corporation, Q4 2019 Earnings Call, Jan 17, 2020.rtf
e:\BNY Mellon capstone project\BKG\data\sample_rtf\The Bank of New York Mellon Corporation, Q2 2023 Earnings Call, Jul 18, 2023 (1).rtf
e:\BNY Mellon capstone project\BKG\data\sample_rtf\The Bank of New York Mellon Corporation, Q3 2020 Earnings Call, Oct 16, 2020.rtf
e:\BNY Mellon capstone project\BKG\data\sample_rtf\The Bank of New York Mellon Corporation, Q3 2023 Earnings Call, Oct 17, 2023 (1).rtf
e:\BNY Mellon capstone project\BKG\data\sample_rtf\The Bank of New York Mellon Corporation, Q4 2023 Earnings Call, Jan 12, 2024 (1).rtf


In [154]:

filename = "State Street Corporation, Q4 2019 Earnings Call, Jan 17, 2020"

In [155]:
import aspose.words as aw
doc = aw.Document(os.path.join(rtf_path, filename+".rtf"))
doc.save(filename+".docx")

<aspose.words.saving.SaveOutputParameters object at 0x0000025CB3703B10>

In [156]:

doc = docx.Document(filename+".docx")

In [157]:
filename.split(",")[0]

'State Street Corporation'

In [158]:
def remove_empty_columns(arr):
    transposed = list(zip(*arr))

    filtered = [col for col in transposed if any(cell != "" for cell in col)]

    return list(zip(*filtered))

In [None]:
speaker_list = {}

def compare_entities(name1, name2):
    return fuzz.ratio(name1, name2)

In [None]:
def deal_ambigity(speaker_list,person_info):
    exist = False
    for p in speaker_list:
        similarity_score = compare_entities(p["name"], person_info["name"]) * compare_entities(p["company"], person_info["company"]) / 10000
        if similarity_score > 0.65:
            exist = True
            print(p, person_info, similarity_score)
            break

In [159]:
def build_first_table(data):
    data = [list(dict.fromkeys(row)) for row in data]
    data = remove_empty_columns(data)
    root = ET.Element("table",attrib={"id":"0", "name":"Earnings Estimates Comparison Table"})

    time_periods = ET.SubElement(root, "timePeriods")
    for period in data[0][1:]:
        ET.SubElement(time_periods, "period", name=period.replace("-", ""))

    metrics = ET.SubElement(root, "metrics")
    for metric_data in data[2:]:
        metric = ET.SubElement(metrics, "metric", name=metric_data[0])
        for i, value in enumerate(metric_data[1:]):
            period_name = data[0][i + 1]
            value_type = data[1][i + 1]
            ET.SubElement(metric, "value", period=period_name, type=value_type).text = value
    
    return root

def build_second_table(data):
    # clean duplicates   
    data = [list(dict.fromkeys(row)) for row in data]
    data = remove_empty_columns(data)
    
    root = ET.Element("table",attrib={"id":"1","name":"EPS Normalized Comparison Table"})
    

    type_header = data[1]

    time_periods = ET.SubElement(root, "timePeriods") 
    metrics = ET.SubElement(root, "metrics") 
    metric = ET.SubElement(metrics, "metric", name="EPS Normalized")
    for row in data[2:]:
        period = row[0]
        ET.SubElement(time_periods, "period", name=period)
        for i, cell in enumerate(row[1:], 1):
            type_ = type_header[i]  
            if type_ in ["CONSENSUS", "ACTUAL", "SURPRISE"]:
                value_element = ET.SubElement(metric, "value")
                value_element.text = cell
                value_element.set("period", period)
                value_element.set("type", type_)

    return root

def build_third_table(data,company):
    id = 1
    root = ET.Element("Call Participants")
    # speaker_list = {}
         
    current_group = ''
    for row in data[1:]:
        
        row_data = '\n \n \n'.join(row).strip()
        elements = row_data.split('\n \n \n')
        for element in elements:

            lines = element.split('\n')

            if len(lines) == 1 :
                current_group = lines[0].strip()
                
            if len(lines) > 1:

                name = re.sub(r'\s+', ' ', lines[0].strip())
                person_info = {}
                position = lines[1].strip()
                origin_position = position
                if current_group == "EXECUTIVES":
                    person_element = ET.SubElement(root, "person", company = company, position=position, group=current_group, id = str(id))
                    person_info["company"] = company
                    person_info["position"] = position

                    
                else:
                    position = position.replace("Research Division", "").strip()
                    if position[-1] == ",":
                        position = position[:-1].strip()
                    person_element = ET.SubElement(root, "person", company = position, group=current_group, id = str(id))
                    person_info["company"] = position

                    
                    
                person_element.text = name
                person_info["name"] = name
        
                person_info["id"] = str(id)
                              
                person_info["origin position"] = origin_position
                
                speaker_list[name] = person_info
                id+=1
    return root, speaker_list


def process_presentation(dialog,speaker_list, name):
    paragraph = dialog.split('\n')

    conversation = ET.Element("section", attrib={"name": name})
    i = 0 
    while i < len(paragraph):
        speaker_name = re.sub(r'\s+', ' ', paragraph[i].strip())
        if speaker_name  in speaker_list:
            id = speaker_list[speaker_name]["id"]
            title = paragraph[i+1].strip()
            
            if title != speaker_list[speaker_name]["origin position"]:
                origin_position = speaker_list[speaker_name]["origin position"]
                parts = title.split(origin_position)
                other_part = parts[1] if len(parts) > 1 else ""
                
                text = other_part.strip() + "\n" if other_part!="" else ""
            else:
                text = ""
            
            statement = ET.SubElement(conversation, "statement")
            speaker_element = ET.SubElement(statement, "speaker", id=id, position=speaker_list[speaker_name]["origin position"])
            speaker_element.text = re.sub(r'\s+', ' ', paragraph[i].strip()) 
            para = ET.SubElement(speaker_element, "text")
            i += 2
            while i < len(paragraph) and re.sub(r'\s+', ' ', paragraph[i].strip()) not in speaker_list and paragraph[i].strip()!= "Operator":
                if len(paragraph[i].strip()) != 0:
                    text += paragraph[i] + "\n"
                i += 1
            
            para.text = text.strip()
            
        elif "Operator" in paragraph[i]:
            id = "0"
            position = "Operator"
            statement = ET.SubElement(conversation, "statement")
            speaker_element = ET.SubElement(statement, "speaker", id=id, position=position)
            speaker_element.text = "Operator"
            text = ""
            para = ET.SubElement(speaker_element, "text")
            i += 1
            while i < len(paragraph) and re.sub(r'\s+', ' ', paragraph[i].strip()) not in speaker_list:
                if len(paragraph[i].strip()) != 0:
                    text += paragraph[i] + "\n"
                i += 1
            para.text = text.strip()
            
        else:
            i += 1
    return conversation


def process_dialog(dialog,speaker_list, name):
    question_id = -1
    followup_id = -1
    end = False
    paragraph = dialog.split('\n')
    cur_question = None
    conversation = ET.Element("section", attrib={"name": name})
    i = 0 
    hasSub = False
    last_question_element = None
    last_question_answered = True
    while i < len(paragraph):
        speaker_name = re.sub(r'\s+', ' ', paragraph[i].strip())
        if speaker_name in speaker_list:
            id = speaker_list[speaker_name]["id"]
            title = paragraph[i+1].strip()
            if title != speaker_list[speaker_name]["origin position"]:
                origin_position = speaker_list[speaker_name]["origin position"]
                parts = title.split(origin_position)
                other_part = parts[1] if len(parts) > 1 else ""
                
                text = other_part.strip() + "\n" if other_part!="" else ""
            else:
                text = ""
            if end:
                context = ET.SubElement(conversation, "ending", id = str(question_id))
                
            elif cur_question == None:
                if last_question_element is not None and not last_question_answered:
                    if last_question_element.tag =="question":
                        question_id-=1
                    last_question_element.tag = "other"
                followup_id = -1
                context = ET.SubElement(conversation, "question", id = str(question_id))
                cur_question = paragraph[i].strip()
                last_question_element = context
                last_question_answered = False
            elif paragraph[i].strip() == cur_question :
                if last_question_element is not None and not last_question_answered:
                    if last_question_element.tag =="question":
                        question_id-=1
                    elif last_question_element.tag =="followQuestion":
                        print(last_question_element.tag)
                        followup_id -=1
                    last_question_element.tag = "other"

                followup_id += 1
                context = ET.SubElement(conversation, "followQuestion", id=str(followup_id),  question_id = str(question_id))
                hasSub = True
                last_question_element = context
                last_question_answered = False
            elif hasSub and paragraph[i].strip()!= cur_question:
                context = ET.SubElement(conversation, "followAnswer", id=str(followup_id),  question_id = str(question_id))
                hasSub = False
                last_question_answered = True
            else:
                context = ET.SubElement(conversation, "answer", id = str(question_id))
                last_question_answered = True
            speaker_element = ET.SubElement(context, "speaker", id=id, position=speaker_list[speaker_name]["origin position"])
            speaker_element.text = re.sub(r'\s+', ' ', paragraph[i].strip()) 
            
            para = ET.SubElement(speaker_element, "text")
            i += 2
            while i < len(paragraph) and re.sub(r'\s+', ' ', paragraph[i].strip()) not in speaker_list and paragraph[i].strip()!= "Operator" and not paragraph[i].startswith("Operator"):
                # print(paragraph[i])
                # print(paragraph[i].startswith("Operator"))
                # print("--------------------------")
                if len(paragraph[i].strip()) != 0:
                    text += paragraph[i] + "\n"
                i += 1
            para.text = text.strip()
            
        elif "Operator" in paragraph[i]:
            if last_question_element is not None and not last_question_answered:
                if last_question_element.tag =="question":
                    question_id-=1
                last_question_element.tag = "other"
            last_question_element = None
            last_question_answered = False
            id = "0"
            position = "Operator"
            cur_question = None
            hasSub = False
            question_id += 1
            followup_id = -1
            context =ET.SubElement(conversation, "transition") 
            speaker_element = ET.SubElement(context, "speaker", id=id, position=position)
            speaker_element.text = "Operator"
            text = ""
            para = ET.SubElement(speaker_element, "text")
            paragraph[i] = paragraph[i].replace("Operator", "")
            while i < len(paragraph) and re.sub(r'\s+', ' ', paragraph[i].strip())  not in speaker_list:
                if len(paragraph[i].strip()) != 0:
                    text += paragraph[i] + "\n"
                i += 1
            para.text = text.strip()
            if "conclude" in para.text:
                context.tag = "ending"
                end = True
                
            
        else:
            i += 1

    return conversation

In [160]:
def prettify(element, indent='    ', level=0):
    
    if element:  
        if not element.text or not element.text.strip():
            element.text = '\n' + indent * (level + 1)
        if not element.tail or not element.tail.strip():
            element.tail = '\n' + indent * level
    else:
        if level and (not element.tail or not element.tail.strip()):
            element.tail = '\n' + indent * level
    
    for subelement in element:
        prettify(subelement, indent, level + 1)

In [161]:
company = ""
for i, paragraph in enumerate(doc.paragraphs):

    if i ==2 :
        company = paragraph.text
        break
tables = []
for table_index, table in enumerate(doc.tables):
        t = []
        for row_index, row in enumerate(table.rows):
            row_data = []

           
            for cell in row.cells:
                row_data.append(cell.text.strip())

            if all(element == "" for element in row_data):
                 continue
            t.append(row_data)
        if t== [['']] or t ==[]:
             continue
        tables.append(t)
        
t1 = build_first_table(tables[0])
t2 = build_second_table(tables[1])
t3,speaker_list = build_third_table(tables[3],company)
sec1 = ET.Element("section", attrib={"name": "Financial Tables"})
sec1.append(t1)
sec1.append(t2)
t3.tag = "section"
t3.set("name", "Call Participants")

In [162]:
import yfinance as yf
from datetime import datetime,timedelta
def get_stock_info(ticker_symbol, time):
    open = None
    close = None

    try:
        ticker = yf.Ticker(ticker_symbol)


        date_str = time
        date_format = "%A, %B %d, %Y %I:%M %p %Z"
        datetime_obj = datetime.strptime(date_str, date_format)

        formatted_date = datetime_obj.strftime("%Y-%m-%d")
        datetime_obj_plus_one = datetime_obj + timedelta(days=1)
        print(formatted_date)
        data = ticker.history(start=formatted_date, end=datetime_obj_plus_one)

        if not data.empty:
            open =  data['Open'][0]
            close = data['Close'][0]
        else:
            print("No data available for the specified date.")
    except Exception as e:
        print("An error occurred:", str(e))

    return open,close



In [163]:
body = ET.Element("body")
company = ""
title = ""
time = ""
currency = ""
note = ""
QA = None
presentation = None
for i, paragraph in enumerate(doc.paragraphs):

    if i ==2 :
        company = paragraph.text
        
    elif i == 3:
        title = paragraph.text
    elif i == 4:
        time = paragraph.text
    elif i == 6:
        currency= paragraph.text
    elif i == 7:
        note= paragraph.text
    
    elif paragraph.text.strip().startswith("Question and Answer"):
        QA = process_dialog(paragraph.text,speaker_list,"Question and Answer")
    elif paragraph.text.strip().startswith("Presentation"):
        
        presentation = process_presentation(paragraph.text,speaker_list,"Presentation ")
    
header = ET.Element("header")
ticker = company.split(":")[1].strip()
match = re.search(r"Q\d \d{4}", title)
q_y = match.group(0).replace(" ", "-") if match else "No match found"
quarter, year = q_y.split("-")
ET.SubElement(header, "company").text = company
ET.SubElement(header, "quarter").text = quarter
ET.SubElement(header, "year").text = year
ET.SubElement(header, "time").text = time
ET.SubElement(header, "currency").text = currency
ET.SubElement(header, "note").text = note
ET.SubElement(header, "ticker").text = ticker
open, close = get_stock_info(ticker,time)
ET.SubElement(header, "stock_price_before").text =  f"{open:.6f}"
ET.SubElement(header, "stock_price_after").text = f"{close:.6f}"
if abs(close - open) <=1:
    performance = "neutral"
elif (close - open) < 0:
    performance = "negative"
else:
    performance = "positive"
ET.SubElement(header, "stock_performance").text = performance
print(time)



2020-01-17
Friday, January 17, 2020 3:00 PM GMT


In [164]:
open, close = get_stock_info(ticker,time)

2020-01-17


In [165]:
body.append(sec1)
body.append(t3)
body.append(presentation)
body.append(QA)
root = ET.Element("Transcript")
root.append(header)
root.append(body)

In [166]:
# root = ET.Element("body")
# sec1 = ET.Element("section", attrib={"name": "financial tables"})
# t1 = build_first_table(tables[0])
# t2 = build_second_table(tables[1])
# t3,speaker_list = build_third_table(tables[3])
# sec1.append(t1)
# sec1.append(t2)
# t3.tag = "section"
# t3.set("name", "call participants")
# root.append(header)
# root.append(sec1)
# root.append(t3)


prettify(root)


tree = ET.ElementTree(root)


out_file_name = f"{ticker}-{quarter}-{year}"

tree.write(os.path.join(xml_path,out_file_name+".xml"), encoding="utf-8", xml_declaration=True)

In [167]:
os.remove(filename+'.docx')

In [168]:
speaker_list

{'Eric Walter Aboaf': {'company': 'State Street Corporation NYSE:STT',
  'position': 'Executive VP & CFO',
  'name': 'Eric Walter Aboaf',
  'id': '1',
  'origin position': 'Executive VP & CFO'},
 'Ilene Fiszel Bieler': {'company': 'State Street Corporation NYSE:STT',
  'position': 'Global Head of Investor Relations',
  'name': 'Ilene Fiszel Bieler',
  'id': '2',
  'origin position': 'Global Head of Investor Relations'},
 'Ronald Philip O’Hanley': {'company': 'State Street Corporation NYSE:STT',
  'position': 'Chairman, President & CEO',
  'name': 'Ronald Philip O’Hanley',
  'id': '3',
  'origin position': 'Chairman, President & CEO'},
 'Alexander Blostein': {'company': 'Goldman Sachs Group Inc.',
  'name': 'Alexander Blostein',
  'id': '4',
  'origin position': 'Goldman Sachs Group Inc., Research Division'},
 'Betsy Lynn Graseck': {'company': 'Morgan Stanley',
  'name': 'Betsy Lynn Graseck',
  'id': '5',
  'origin position': 'Morgan Stanley, Research Division'},
 'Brennan Hawken': {'co

## get images

In [94]:
# import zipfile
# import os
# from PIL import Image
# from io import BytesIO

# def extract_and_open_images(docx_filename):

#     temp_dir = "extracted_images"
#     os.makedirs(temp_dir, exist_ok=True)

#     with zipfile.ZipFile(docx_filename, 'r') as docx:

#         for file in docx.namelist():
#             if file.startswith('word/media/'):
#                 image_data = docx.read(file)
#                 image = Image.open(BytesIO(image_data))
#                 image.show()  


#                 image_filename = os.path.join(temp_dir, os.path.basename(file))
#                 image.save(image_filename)


# docx_filename = filename+'.docx'  
# extract_and_open_images(docx_filename)

ImportError: cannot import name '_imaging' from 'PIL' (c:\Users\xiaomi\anaconda3\envs\BKG\lib\site-packages\PIL\__init__.py)