In [1]:
from pathlib import Path
from utils import *
import pandas as pd
import pyodbc
import string
import re
import spacy
from pprint import pprint
from tqdm import tqdm

DATA_FOLDER = Path("C:/Users/Jean-BaptistePERNEY/Documents/ECHO/DATA/2022_audio_database/2022_3/raw_transcriptions/")

### Identify and save personnal information from transcripts

In [89]:
def is_valid_name(name):
    """Check if the name is valid for matching."""
    cleaned_name = name.strip()
        # Exclude names that are too short
    if len(cleaned_name) <= 2:
        return False
    
    # Exclude specific non-name words or symbols
    non_name_keywords = ["sos", "oui", "allô", "médecin", "médecins", "bonjour", "covid"]  # Add more keywords as needed
    if cleaned_name.lower() in non_name_keywords:
        return False

    return True

def find_names_with_spacy(transcript):
    nlp = spacy.load('fr_dep_news_trf')
    doc = nlp(transcript)
    names = list(set([w.text for w in doc if w.pos_ == 'PROPN' and is_valid_name(w.text)]))
    
    return names

def month_name_to_number(month_name):
    month_names = ['janvier', 'février', 'mars', 'avril', 'mai', 'juin', 
                   'juillet', 'août', 'septembre', 'octobre', 'novembre', 'décembre']
    try:
        return month_names.index(month_name.lower()) + 1
    except ValueError:
        return None
    

def extract_full_date(transcript):
    min_year = 1900

    # Pattern for numerical full date
    full_date_pattern = r'\b(0?[1-9]|[12][0-9]|3[01])[-/](0?[1-9]|1[0-2])[-/](\d{2}|\d{4})\b'
    
    # Pattern for full date with month name
    month_names = '|'.join(['janvier', 'février', 'mars', 'avril', 'mai', 'juin', 
                            'juillet', 'août', 'septembre', 'octobre', 'novembre', 'décembre'])
    date_with_month_pattern = r'\b(0?[1-9]|[12][0-9]|3[01])\s+(' + month_names + r')\s+(\d{4})\b'

    # Try to extract full date in numerical format
    full_dates = re.findall(full_date_pattern, transcript)
    for date in full_dates:
        day, month, year = date
        if len(year) == 2:  # Convert two-digit year to four digits
            year = "19" + year if year > "24" else "20" + year
         # Handle transcript erros (ex: 1568 instead of 1968)

        if int(year) < min_year:
            year = int("19" + year[-2:])

        formatted_date = f"{day.zfill(2)}/{month.zfill(2)}/{str(year)}"
        return formatted_date

    # Try to extract full date with month name
    dates_with_month = re.findall(date_with_month_pattern, transcript)
    for date in dates_with_month:
        day, month_name, year = date
        month_number = month_name_to_number(month_name)
        if month_number:
            formatted_date = f"{day.zfill(2)}/{month_number:02d}/{year}"
            return formatted_date

    # Extract just the year
    year_pattern = r'\b(19\d{2}|20\d{2})\b'
    years = re.findall(year_pattern, transcript)
    if years:
        year = int(years[0])
        if year < min_year:
            year = int("19" + year[-2:])
        return f"01/01/{str(year)}"  # Defaulting to '01/01/YYYY' for year-only cases

    return None

def extract_phone_numbers(transcript):
    phone_regex = r'(06|07)(\d{8})'

    transcript = transcript.replace(" ", "")
    transcript = transcript.translate(str.maketrans('', '', string.punctuation))
    match = re.search(phone_regex, transcript)
    match = match.group() if match else match
    return match

def extract_postcodes(transcript):
    # Existing pattern for postcodes like '75015' or '75-015'
    postcode_pattern = r'\b(75|77|78|91|92|93|94|95)(?:[-\s]?)(\d{3})\b'
    raw_postcodes = re.findall(postcode_pattern, transcript)

    # Additional pattern for postcodes written as '19ème'
    arrondissement_pattern = r'\b(\d{1,2})[eè]me\b'
    arrondissements = re.findall(arrondissement_pattern, transcript)

    # Formatting arrondissements to Paris postcodes (e.g., '75019' for '19ème')
    formatted_arrondissements = ['75' + arr.zfill(3) for arr in arrondissements]

    # Combining and returning all found postcodes
    return [''.join(match) for match in raw_postcodes] + formatted_arrondissements

def extract_personal_info(data_folder):
    personal_info = {}

    address_pattern = r'\b(\d+\s+(?:rue|rues|boulevard|avenue)\s+[a-zA-Zéèàêûôâîç\s-]+)\b'
    number_sequence_pattern = r'\b\d+\b'

    for file_path in tqdm(data_folder.rglob('*.txt')):
        with open(file_path, 'r', encoding='utf-8') as file:
            transcript = file.read()

            personal_info[str(Path(file_path.name))] = {
                'date_of_birth': {
                    'full_date': extract_full_date(transcript)
                },
                'addresses': re.findall(address_pattern, transcript),
                'postcodes': extract_postcodes(transcript),
                'phone_numbers': extract_phone_numbers(transcript),
                # 'names' : list(set(find_names_with_spacy(transcript))),
                # 'other_findings': re.findall(number_sequence_pattern, transcript),
            }
        
    return personal_info

personnal_data = extract_personal_info(DATA_FOLDER)

12202it [00:38, 316.55it/s]


In [92]:
count = 0
personnal_data_filtered = {}

for key, item in personnal_data.items():
    if item['addresses']:
        count += 1
        personnal_data_filtered[key] = item
print(count)

4954


In [95]:
count = 0
personnal_data_filtered = {}

for key, item in personnal_data.items():
    if item['date_of_birth']['full_date']:
        count += 1
        personnal_data_filtered[key] = item
print(count)

4998


In [99]:
personnal_data_filtered

{'2022_3_10_0_17_43_ch25_transcription.txt': {'date_of_birth': {'full_date': '02/11/1966'},
  'addresses': ['19 rue de l'],
  'postcodes': [],
  'phone_numbers': None},
 '2022_3_10_0_58_28_ch25_transcription.txt': {'date_of_birth': {'full_date': '13/02/1948'},
  'addresses': [],
  'postcodes': ['75005'],
  'phone_numbers': None},
 '2022_3_10_10_10_15_ch26_transcription.txt': {'date_of_birth': {'full_date': '16/01/2022'},
  'addresses': ['29 rue de Chazelle'],
  'postcodes': [],
  'phone_numbers': None},
 '2022_3_10_10_17_49_ch25_transcription.txt': {'date_of_birth': {'full_date': '29/09/1952'},
  'addresses': [],
  'postcodes': [],
  'phone_numbers': None},
 '2022_3_10_10_22_21_ch25_transcription.txt': {'date_of_birth': {'full_date': '06/10/2019'},
  'addresses': [],
  'postcodes': [],
  'phone_numbers': '0610197140'},
 '2022_3_10_10_2_7_ch25_transcription.txt': {'date_of_birth': {'full_date': '15/05/2010'},
  'addresses': [],
  'postcodes': [],
  'phone_numbers': None},
 '2022_3_10_10

In [100]:
count = 0
personnal_data_filtered = {}

for key, item in personnal_data.items():
    if (item['addresses'] or 
        (item['date_of_birth'] and item['date_of_birth']['full_date']) or 
        item['phone_numbers']):
        count += 1
        personnal_data_filtered[key] = item
print(count)

7400


### Load telecom log database

In [101]:
def load_mdb_data(file_path, table_name="InboundVoiceCalls"):
    try:
        # Connection string for MDB file
        conn_str = f'DRIVER={{Microsoft Access Driver (*.mdb, *.accdb)}};DBQ={file_path};'
        
        # Establishing connection to the MDB file
        conn = pyodbc.connect(conn_str)
        cursor = conn.cursor()
        
        # Reading each table into a pandas DataFrame
        df = pd.read_sql(f'SELECT * FROM [{table_name}]', conn)
        
        cursor.close()
        conn.close()

        return df
        
    except Exception as e:
        print(f"Error occurred: {e}")
        return None

file_path = "/Users/Jean-BaptistePERNEY/Documents/ECHO/DATA/telecom_log_2022/01022022.mdb"
telecom_log_df = load_mdb_data(file_path, table_name="InboundVoiceCalls")

telecom_log_df = telecom_log_df[telecom_log_df['CallAgentCommunicationDuration'] != 0]
telecom_log_df = telecom_log_df[telecom_log_df['CallServiceID'] != 'SOS']

columns_to_keep = ["CallTime", "CallCLID", "AgentID", "CallAgentCommunicationDuration"]
telecom_log_df = telecom_log_df[columns_to_keep]

telecom_log_df.head()

  df = pd.read_sql(f'SELECT * FROM [{table_name}]', conn)


Unnamed: 0,CallTime,CallCLID,AgentID,CallAgentCommunicationDuration
38,2021-12-11 08:24:43,667513808,martynowski2438,118
49,2021-12-11 09:00:07,689634034,martynowski2438,99
72,2021-12-11 10:15:33,614477661,martynowski2438,165
100,2021-12-11 10:53:13,783847741,martynowski2438,224
103,2021-12-11 10:57:41,619819855,martynowski2438,115


### Data matching

In [102]:
import pandas as pd
from datetime import datetime
import re

def find_matching_row(csv_path, data_dict):

    # Read the CSV just once
    df = pd.read_csv(csv_path, sep=',',  encoding='latin-1' )

    # Explicitly convert the 'DateAppel' column to datetime objects
    df['Date'] = pd.to_datetime(df['Date'], errors='coerce', dayfirst=True)

    # Prepare results dictionary
    results = {}

    for file_path, info in data_dict.items():
        # Parse the timestamp from the file_path
        timestamp_parts = file_path.split('_')
        date_str = f"{timestamp_parts[0]}-{timestamp_parts[1]}-{timestamp_parts[2]}"
        search_date = pd.to_datetime(date_str).date()

        # Filter the dataframe by the date
        df['Date'] = pd.to_datetime(df['Date'], format='%d/%m/%Y')
        date_filtered_df = df[df['Date'].dt.date == search_date]

        date_filtered_df = date_filtered_df.copy()
        date_filtered_df['DateNaiss'] = pd.to_datetime(date_filtered_df['DateNaiss'], format='%Y-%m-%d', errors='coerce')

        # Attempt to match phone number
        if 'phone_numbers' in info and info['phone_numbers']:   
            transcript_phone_number = info['phone_numbers']
            medical_db_phone_number = date_filtered_df['TelPatient'].str.replace('.', '', regex=False).str.contains(transcript_phone_number, na=False)
            match = date_filtered_df[medical_db_phone_number]
            if not match.empty:
                results[file_path] = (match, "phone_numbers")
                continue  # Found a match with phone number, skip to next entry

        # Attempt to match addresses
        if 'addresses' in info and info['addresses']:
            for address in info['addresses']:
                address = address.lower()
                match = date_filtered_df[date_filtered_df['FullAdresse'].str.contains(address, na=False)]
                
                if not match.empty:
                    results[file_path] = (match, "addresses")
                    break  # Found a match with address, skip to next entry

                # Attempt to match date of birth
        if 'date_of_birth' in info and info['date_of_birth']:
            dob_str = info['date_of_birth']['full_date']
            dob = pd.to_datetime(dob_str, dayfirst=True, errors='coerce')
            if dob is not pd.NaT:
                match = date_filtered_df[date_filtered_df['DateNaiss'] == dob]
                if not match.empty:
                    results[file_path] = (match, "date_of_birth")
                    continue  # Found a match with date of birth, skip to next entry

        # Attempt to match postcodes
        if 'postcodes' in info and info['postcodes']:
            for postcode in info['postcodes']:
                match = date_filtered_df[date_filtered_df['CodePostal'] == str(postcode)]
                if not match.empty:
                    results[file_path] = (match, "postcodes")
                    break  # Found a match with postcode, skip to next entry

        # # Attempt to match names
        # if 'names' in info and info['names']:
        #     for name in info['names']:
        #         match = date_filtered_df[(date_filtered_df['Nom'].str.contains(name, na=False)) | 
        #                                  (date_filtered_df['Prenom'].str.contains(name, na=False))]
        #         if not match.empty:
        #             results[file_path] = (match, "names")
        #             break  # Found a match with name, skip to next entry

    return results

# Example usage:
medical_db_path = "/Users/Jean-BaptistePERNEY/Documents/ECHO/DATA/medical_database_2022/BaseMed2022LR_1.csv"

matches = find_matching_row(medical_db_path, personnal_data)
print(len(matches))
# for file_path, matched_rows in matches.items():
#     print(f"File: {file_path}")
#     print(matched_rows[0]["IdAppel"], matched_rows[1])

  df = pd.read_csv(csv_path, sep=',',  encoding='latin-1' )


KeyboardInterrupt: 