In [0]:
#uncomment below cell for manual run else comment for job execution

In [0]:
%run ./01-SharepointAccess_final

In [0]:
%run ../Main_Config

In [0]:
%run ../sql_query

In [0]:
#Comment this for manual run
# %pip install -U -q -r ../requirements.txt
# dbutils.library.restartPython()

In [0]:
%pip install -q "camelot-py[all]" --upgrade

In [0]:
#Comment this for manual run
# SRC_TASK_KEY = "Getting_Sharepoint_Data"

# try:
#     token = dbutils.jobs.taskValues.get(taskKey=SRC_TASK_KEY, key="access_token", debugValue=None)
#     site_id = dbutils.jobs.taskValues.get(taskKey=SRC_TASK_KEY, key="site_id", debugValue=None)
#     drive_id = dbutils.jobs.taskValues.get(taskKey=SRC_TASK_KEY, key="drive_id", debugValue=None)
# except Exception as e:
#     print("Failed to fetch values from previous task:", e)
#     token = site_id = drive_id = None


In [0]:
#Helper Functions to save file and to read content from the different file format.

import camelot
def save_file(content_response,volume_path,file_name):
    if content_response.status_code == 200:
        pdf_binary = content_response.content
        # Define the Databricks volume path
        volume_save_path = f"{volume_path}"+ f"{file_name}"
        # Save the PDF to the volume
        with open(volume_save_path, "wb") as f:
            f.write(pdf_binary)
            
    else:
        print(f"Failed to download PDF. Status code: {content_response.status_code}")
        return False 
    # Verify the file exists
    return volume_save_path

def read_pdf_tables(file_path):
    """Reads tables from a PDF using Camelot."""
    tables = camelot.read_pdf(file_path, pages="all", flavor="lattice")
    cleaned_tables = []
    
    for idx, table in enumerate(tables):
        df = table.df.copy()

        # Remove rows/columns that are completely empty
        df.replace(r'^\s*$', None, regex=True, inplace=True)
        df.dropna(axis=0, how='all', inplace=True)
        df.dropna(axis=1, how='all', inplace=True)

        # Optional: skip tiny or invalid tables
        if df.shape[0] < 2 or df.shape[1] < 2:
            continue

        # Clean line breaks and whitespace
        df = df.applymap(lambda x: str(x).replace("\n", " ").strip() if isinstance(x, str) else x)

        cleaned_tables.append(df)
    if not cleaned_tables:
        return ""
    return str(cleaned_tables)

def read_docx_tables(file_path):
    """Reads tables from a Word document using python-docx."""
    doc = Document(file_path)
    all_tables = []

    for table in doc.tables:
        data = []
        for row in table.rows:
            data.append([cell.text.strip() for cell in row.cells])
        
        df = pd.DataFrame(data)
        
        # Skip empty tables
        if df.dropna(how='all').shape[0] > 0:
            all_tables.append(df)

    if not all_tables:
        return ""
    return str(all_tables)
    
def read_odt_tables(file_path):
    """Reads tables from an ODT file and returns a list of DataFrames."""
    odt_doc = load(file_path)
    tables = odt_doc.getElementsByType(Table)
    
    table_dfs = []
    for table in tables:
        data = []
        rows = table.getElementsByType(TableRow)
        for row in rows:
            row_data = []
            cells = row.getElementsByType(TableCell)
            for cell in cells:
                texts = cell.getElementsByType(P)
                text_content = " ".join([str(t.firstChild.data if t.firstChild else "") for t in texts])
                row_data.append(text_content.strip())
            if any(cell.strip() for cell in row_data):  # Skip empty rows
                data.append(row_data)
        
        if data:  # If table has content
            df = pd.DataFrame(data)
            table_dfs.append(df)

    return str(table_dfs) if table_dfs else ""

# Helper function to extract text from PDF
def extract_text_from_pdf(pdf_content, file_name,volume_save_path):
    try:
        with pdfplumber.open(BytesIO(pdf_content)) as pdf:
            text = "\n".join([page.extract_text() for page in pdf.pages if page.extract_text()])
        pdf_tables = read_pdf_tables(volume_save_path)
        return text + pdf_tables
    except Exception as e:
        logger.error(f"Error extracting text from PDF {file_name}: {e}", exc_info=True)
        raise RuntimeError(f"Error extracting text from PDF {file_name}: {e}") from e

# Helper function to extract text from DOCX
def extract_text_from_docx(docx_content, file_name,volume_save_path):
    try:
        doc = Document(BytesIO(docx_content))
        doc_text = "\n".join([para.text for para in doc.paragraphs]) 
        docx_tables = read_docx_tables(volume_save_path)
        
        return doc_text + docx_tables
    except Exception as e:
        logger.error(f"Error extracting text from DOCX {file_name}: {e}", exc_info=True)
        raise RuntimeError(f"Error extracting text from DOCX {file_name}: {e}") from e

# Helper function to extract text from XLSX
def extract_text_from_xlsx(xlsx_content, file_name):
    try:
        df = pd.read_excel(BytesIO(xlsx_content), sheet_name=0)
        return df  # Saving as Dataframe
    except Exception as e:
        logger.error(f"Error extracting text from XLSX {file_name}: {e}", exc_info=True)
        raise RuntimeError(f"Error extracting text from XLSX {file_name}: {e}") from e

# Helper function to extract text from ODT
def extract_text_from_odt(odt_bytes, file_name,volume_save_path):
    """Read text content from raw bytes of an ODT file. Raises on failure."""
    try:
        buffer = BytesIO(odt_bytes)
        textdoc = load(buffer)

        all_text = []
        for elem in textdoc.getElementsByType(P):
            if elem.firstChild:
                all_text.append(str(elem.firstChild.data))

        odt_tables = read_odt_tables(volume_save_path)
        all_text_para = "\n".join(all_text)
        return all_text_para + odt_tables

    except Exception as e:
        raise RuntimeError(f"Error extracting text from ODT {file_name}: {e}") from e

In [0]:
import requests
import pandas as pd
import pdfplumber
from io import BytesIO
from docx import Document
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from odf.opendocument import load
from odf.text import P 
from datetime import datetime

def get_all_files():
    try:
        #uncomment for manual run
        token = get_access_token()
        site_id = get_site_id()
        drive_id = get_drive_id()

        volume_path = "/Volumes/spire_catalog/default/temp_storage/"
        headers = {"Authorization": f"Bearer {token}"}
        all_files = []

        def process_folder(folder_id=None):
            nonlocal headers, all_files

            # Determine the URL based on whether it's the root or a subfolder
            if folder_id is None:
                url = microsoft_graph_url + f"{site_id}" + drive_path + f"{drive_id}" + root_path
            else:
                url = microsoft_graph_url + f"{site_id}" + drive_path + f"{drive_id}" + item_path + f"{folder_id}" + children_path

            while url:
                response = requests.get(url, headers=headers)
                response.raise_for_status()
                data = response.json()
                items = data.get("value", [])

                for item in items:
                    if 'folder' in item:
                        process_folder(item['id'])  # Recursively process subfolders
                    elif 'file' in item:
                        file_location = item['webUrl']
                        file_extension = file_location.lower().split('.')[-1][:4]
                        file_id = item.get('id')
                        file_name = item.get('name'),                                   
                        created_datetime = datetime.strptime(item.get('createdDateTime'), "%Y-%m-%dT%H:%M:%SZ")
                        last_modified_datetime = datetime.strptime(item.get('lastModifiedDateTime'), "%Y-%m-%dT%H:%M:%SZ")
                        
                        last_update_time = get_last_modified_datetime(source_table_name)
                        
                        if last_update_time is None:
                            last_update_time = datetime(1900, 1, 1)

                        if file_extension in FILE_LIST and last_modified_datetime > last_update_time:
                            try:
                                content_url = microsoft_graph_url + f"{site_id}" + drive_path + f"{drive_id}" + item_path + f"{item['id']}" + content_path
                                
                                content_response = requests.get(content_url, headers=headers)
                                content_response.raise_for_status()
                                
                                #saving file to volume for reading tables
                                volume_save_path = save_file(content_response,volume_path, item.get('name'))
                                file_content = None

                                if file_extension == "txt":
                                    file_content = content_response.text
                                
                                elif file_extension == "pdf":
                                    file_content = extract_text_from_pdf(content_response.content, file_name,volume_save_path)
                                
                                elif file_extension == "docx":
                                    file_content = extract_text_from_docx(content_response.content, file_name,volume_save_path)

                                elif file_extension == "xlsx" :
                                    file_content = extract_text_from_xlsx(content_response.content, file_name)

                                elif file_extension == "odt&" :
                                    file_content=extract_text_from_odt(content_response.content, file_name,volume_save_path)

                                all_files.append({
                                    COL_FILE_ID: file_id,
                                    COL_FILE_LOCATION: file_location,
                                    COL_FILE_NAME: file_name[0],
                                    COL_CONTENT: file_content,                                    
                                    COL_CREATED_DATETIME: created_datetime,
                                    COL_LAST_MODIFIED_DATETIME: last_modified_datetime,
                                    COL_LOAD_DATETIME: datetime.utcnow()
                                })
                            
                            except requests.exceptions.RequestException as e:
                                logger.error(f"Error reading file {file_location}: {e}", exc_info=True)
                                all_files.append({
                                    COL_FILE_ID: file_id,
                                    COL_FILE_LOCATION: file_location,
                                    COL_FILE_NAME: file_name[0],
                                    COL_CONTENT: None,
                                    COL_CREATED_DATETIME: created_datetime,
                                    COL_LAST_MODIFIED_DATETIME: last_modified_datetime,
                                    COL_LOAD_DATETIME: datetime.utcnow()
                                })

                # Handle pagination
                url = data.get('@odata.nextLink', None)

        # Start processing from the root folder
        process_folder()

        # Convert to Spark DataFrame
        schema = StructType([
            StructField(COL_FILE_ID, StringType(), True),
            StructField(COL_FILE_LOCATION, StringType(), True),
            StructField(COL_FILE_NAME, StringType(), True),
            StructField(COL_CONTENT, StringType(), True),
            StructField(COL_CREATED_DATETIME, TimestampType(), True),
            StructField(COL_LAST_MODIFIED_DATETIME, TimestampType(), True),
            StructField(COL_LOAD_DATETIME, TimestampType(), True)
        ])
        
        df = spark.createDataFrame(all_files, schema=schema)
        return df

    except requests.exceptions.RequestException as e:
        logger.error(f"Error fetching files: {e}", exc_info=True)
        return None





In [0]:
def save_to_table(df, database_name:str, table_name:str):
  
    table_path = f"{default_catalog}.{database_name}.{table_name}"
 
    try:
        df.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable(table_path)

        logger.info("Table successfully saved.")
 
    except Exception as e:
        logger.error(f"Error saving table '{str(e)}", exc_info=True)
 

# Call function in Databricks notebook
data_source = get_all_files()

# Save DataFrame as a table
if spark.catalog.tableExists(f"{default_catalog}.{default_schema}.{source_table_name}"):
    try:
        data_source.createOrReplaceTempView(TEMP_SOURCE_DATA)
    
        merge_with_table(source_table_name, TEMP_SOURCE_DATA)
        # spark.sql(source_merge_stmt)

    except Exception as e:
        logger.error(f"Error merging data: {str(e)}", exc_info=True)
else:
    save_to_table(data_source, database_name = default_schema, table_name = source_table_name)