In [12]:
%pip install requests requests_ntlm xmltodict pytz

Note: you may need to restart the kernel to use updated packages.


In [13]:
from dotenv import load_dotenv
load_dotenv()

True

In [14]:
import requests
from requests_ntlm import HttpNtlmAuth
import warnings
import xmltodict
import json
from datetime import datetime
import pytz
from typing import Optional, List, Dict, Any
import logging
import os


In [15]:
# Configure logging
def setup_logger():
    """
    Set up logging configuration
    """
    # Use fixed log file name in the same directory
    log_file = os.path.join(os.getcwd(), "sharepoint.log")

    # Configure logging
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s - %(message)s",
        handlers=[logging.FileHandler(log_file)],
    )
    return logging.getLogger(__name__)


# Initialize logger
logger = setup_logger()


In [16]:
# Disable SSL warnings (only if needed for development)
warnings.filterwarnings("ignore", message="Unverified HTTPS request")

In [17]:
class SharePointClient:
    """
    A client for interacting with SharePoint REST API.
    
    This class provides methods for authenticating with SharePoint,
    browsing folders, listing files, and downloading content.
    """
    
    def __init__(self, 
                 site_url: str, 
                 username: Optional[str] = None, 
                 password: Optional[str] = None,
                 verify_ssl: bool = True):
        """
        Initialize the SharePoint client with authentication credentials and URLs.
        
        Args:
            site_url (str): Base SharePoint site URL (e.g., https://company.sharepoint.com/sites/sitename)
            username (str, optional): Username for NTLM authentication. Defaults to env variable SHAREPOINT_USERNAME.
            password (str, optional): Password for NTLM authentication. Defaults to env variable SHAREPOINT_PASSWORD.
            verify_ssl (bool, optional): Whether to verify SSL certificates. Defaults to True.
        """
        # Remove trailing slash if present
        self.site_url = site_url.rstrip('/')
        
        # Set up API endpoints
        self.api_base_url = f"{self.site_url}/_api"
        self.folders_url = f"{self.api_base_url}/web/folders"
        self.file_download_url = f"{self.api_base_url}/web/GetFileByServerRelativeUrl"
        self.folder_files_url = f"{self.api_base_url}/web/GetFolderByServerRelativeUrl"
        
        # Set up authentication
        domain = os.getenv("SHAREPOINT_DOMAIN", "")
        env_username = os.getenv("SHAREPOINT_USERNAME", "")
        env_password = os.getenv("SHAREPOINT_PASSWORD", "")
        
        # Use provided credentials or fall back to environment variables
        if username:
            if '\\' in username:
                self.username = username
            else:
                self.username = f"{domain}\\{username}"
        else:
            self.username = f"{domain}\\{env_username}"
            
        self.password = password or env_password
        self.verify_ssl = verify_ssl
        
        logger.info(f"SharePoint client initialized for site: {self.site_url}")
        logger.debug(f"API base URL: {self.api_base_url}")
        
    def create_session(self) -> requests.Session:
        """
        Create a requests session with NTLM authentication.
        
        Returns:
            requests.Session: Configured session with NTLM authentication
        """
        session = requests.Session()
        auth = HttpNtlmAuth(self.username, self.password)
        session.auth = auth
        return session
    
    def get_response(self, url: str) -> requests.Response:
        """
        Make GET request to SharePoint and return response.
        
        Args:
            url (str): The SharePoint API URL to request
            
        Returns:
            requests.Response: Response from SharePoint
            
        Raises:
            Exception: If the request fails
        """
        try:
            logger.info(f"Making request to SharePoint URL: {url}")
            session = self.create_session()
            response = session.get(url, verify=self.verify_ssl)
            
            # Log response status and headers
            logger.info(f"Response status code: {response.status_code}")
            logger.debug(f"Response headers: {dict(response.headers)}")
            
            # Log response content in JSON format
            if response.text:
                try:
                    # Parse XML to JSON
                    json_data = self.parse_xml_to_json(response.text)
                    # Log the full JSON response
                    logger.info("API Response JSON:")
                    logger.info(json.dumps(json_data, indent=2, ensure_ascii=False))
                except Exception as e:
                    logger.error(f"Error parsing response to JSON: {str(e)}")
                    # If JSON parsing fails, log the raw response
                    logger.debug(f"Raw response content: {response.text[:500]}...")
            
            return response
        except Exception as e:
            logger.error(f"Error making request to SharePoint: {str(e)}")
            raise
    
    def get_folder_url(self, folder: Dict[str, Any]) -> str:
        """
        Generate SharePoint folder URL for accessing files.
        
        Args:
            folder (Dict[str, Any]): Folder information dictionary
            
        Returns:
            str: SharePoint folder URL
        """
        folder_url = f"{self.folder_files_url}('{folder['server_relative_url']}')/Files"
        return folder_url
    
    def get_folders(self, url: str = None) -> Optional[List[Dict[str, Any]]]:
        """
        Get folders from a SharePoint site or folder.
        
        Args:
            url (str, optional): The SharePoint site URL. Defaults to folders_url.
            
        Returns:
            Optional[List[Dict[str, Any]]]: List of folder information dictionaries or None if failed
        """
        try:
            sharepoint_url = url if url else self.folders_url
            logger.info(f"Accessing SharePoint site: {sharepoint_url}")
            response = self.get_response(sharepoint_url)
            
            if response.status_code == 200:
                logger.info("Successfully connected to SharePoint!")
                json_data = self.parse_xml_to_json(response.text)
                folders = self.extract_folders(json_data)
                logger.info(f"Found {len(folders)} folders")
                return folders
            else:
                logger.error(f"Failed to access SharePoint site. Status code: {response.status_code}")
                logger.error(f"Response: {response.text}")
                return None
        except Exception as e:
            logger.error(f"Error accessing SharePoint site: {str(e)}")
            return None
    
    def list_folder_files(self, folder: Dict[str, Any]) -> Optional[List[Dict[str, Any]]]:
        """
        List all files in a specific SharePoint folder.
        
        Args:
            folder (Dict[str, Any]): The SharePoint folder information
            
        Returns:
            Optional[List[Dict[str, Any]]]: List of file information dictionaries or None if failed
        """
        try:
            folder_url = self.get_folder_url(folder)
            logger.info(f"Listing files in folder: {folder['name']}")
            response = self.get_response(folder_url)
            
            if response.status_code == 200:
                json_data = self.parse_xml_to_json(response.text)
                files = self.extract_files(json_data)
                logger.info(f"Found {len(files)} files in folder")
                return files
            else:
                logger.error(f"Failed to list files in folder. Status code: {response.status_code}")
                return None
        except Exception as e:
            logger.error(f"Error listing files in folder: {str(e)}")
            return None
    
    def download_file(self, file_url: str) -> Optional[bytes]:
        """
        Download a file from SharePoint.
        
        Args:
            file_url (str): The file's server relative URL
            
        Returns:
            Optional[bytes]: File content if successful, None otherwise
        """
        try:
            logger.info(f"Downloading file: {file_url}")
            clean_url = file_url.replace("'", "''")
            download_url = f"{self.file_download_url}('{clean_url}')/$value"
            logger.debug(f"Generated download URL: {download_url}")
            
            response = self.get_response(download_url)
            
            if response.status_code == 200:
                logger.info(f"Successfully downloaded file: {file_url}")
                return response.content
            else:
                logger.error(f"Failed to download file. Status code: {response.status_code}")
                logger.error(f"Response: {response.text}")
                return None
        except Exception as e:
            logger.error(f"Error downloading file: {str(e)}")
            return None
    
    def download_file_by_info(self, file_info: Dict[str, Any]) -> Optional[bytes]:
        """
        Download a file using its file information dictionary.
        
        Args:
            file_info (Dict[str, Any]): File information dictionary from extract_files
            
        Returns:
            Optional[bytes]: File content if successful, None otherwise
        """
        if "download_url" in file_info:
            return self.get_response(file_info["download_url"]).content
        elif "server_relative_url" in file_info:
            return self.download_file(file_info["server_relative_url"])
        else:
            logger.error("File info does not contain download_url or server_relative_url")
            return None
    
    # Utility methods
    @staticmethod
    def convert_size(size_bytes: str) -> str:
        """
        Convert size in bytes to human readable format (KB, MB, GB).
        
        Args:
            size_bytes (str): Size in bytes as string
            
        Returns:
            str: Human readable size
        """
        try:
            size = int(size_bytes)
            for unit in ["B", "KB", "MB", "GB"]:
                if size < 1024.0:
                    return f"{size:.2f} {unit}"
                size /= 1024.0
            return f"{size:.2f} TB"
        except (ValueError, TypeError):
            return "0 B"
    
    @staticmethod
    def convert_to_hk_time(utc_time_str: str) -> str:
        """
        Convert UTC time string to Hong Kong time.
        
        Args:
            utc_time_str (str): UTC time string in format 'YYYY-MM-DDTHH:MM:SSZ'
            
        Returns:
            str: Hong Kong time string in format 'YYYY-MM-DD HH:MM:SS'
        """
        try:
            utc_time = datetime.strptime(utc_time_str, "%Y-%m-%dT%H:%M:%SZ")
            utc_time = pytz.UTC.localize(utc_time)
            hk_tz = pytz.timezone("Asia/Hong_Kong")
            hk_time = utc_time.astimezone(hk_tz)
            return hk_time.strftime("%Y-%m-%d %H:%M:%S")
        except Exception as e:
            return utc_time_str
    
    @staticmethod
    def parse_xml_to_json(response_text: str) -> dict:
        """
        Parse XML response to JSON format.
        
        Args:
            response_text (str): XML response text
            
        Returns:
            dict: Parsed JSON data
        """
        xml_dict = xmltodict.parse(response_text)
        return json.loads(json.dumps(xml_dict))
    
    def extract_folders(self, json_data: dict) -> List[Dict[str, Any]]:
        """
        Extract folder information from JSON data.
        
        Args:
            json_data (dict): Parsed JSON data from SharePoint
            
        Returns:
            list: List of folder information dictionaries
        """
        folders = []
        if "feed" in json_data and "entry" in json_data["feed"]:
            entries = json_data["feed"]["entry"]
            # Handle both single entry and multiple entries
            if not isinstance(entries, list):
                entries = [entries]
            
            for entry in entries:
                if entry.get("category", {}).get("@term") == "SP.Folder":
                    folder_info = {
                        "id": entry["id"],
                        "type": "Folder",
                        "name": entry["content"]["m:properties"]["d:Name"],
                        "server_relative_url": entry["content"]["m:properties"]["d:ServerRelativeUrl"],
                        "item_count": entry["content"]["m:properties"]["d:ItemCount"]["#text"],
                        "updated": entry["updated"],
                        "has_subfolders": False,  # Will be updated later
                    }
                    folder_url = self.get_folder_url(folder_info)
                    has_subfolders = self.check_subfolders(folder_url)
                    folder_info["has_subfolders"] = has_subfolders
                    folders.append(folder_info)
        return folders
    
    def check_subfolders(self, folder_url: str) -> bool:
        """
        Check if a folder contains any subfolders.
        
        Args:
            folder_url (str): The SharePoint folder URL
            
        Returns:
            bool: True if folder contains subfolders, False otherwise
        """
        try:
            response = self.get_response(folder_url)
            if response.status_code == 200:
                json_data = self.parse_xml_to_json(response.text)
                if "feed" in json_data and "entry" in json_data["feed"]:
                    entries = json_data["feed"]["entry"]
                    if not isinstance(entries, list):
                        entries = [entries]
                    
                    for entry in entries:
                        if entry.get("category", {}).get("@term") == "SP.Folder":
                            return True
            return False
        except Exception:
            return False
    
    def extract_files(self, json_data: dict) -> List[Dict[str, Any]]:
        """
        Extract file information from JSON data.
        
        Args:
            json_data (dict): Parsed JSON data from SharePoint
            
        Returns:
            list: List of file information dictionaries
        """
        files = []
        if "feed" in json_data and "entry" in json_data["feed"]:
            entries = json_data["feed"]["entry"]
            # Handle both single entry and multiple entries
            if not isinstance(entries, list):
                entries = [entries]
            
            for entry in entries:
                if entry.get("category", {}).get("@term") == "SP.File":
                    server_relative_url = entry["content"]["m:properties"]["d:ServerRelativeUrl"]
                    # Generate download URL
                    clean_url = server_relative_url.replace("'", "''")
                    download_url = f"{self.file_download_url}('{clean_url}')/$value"
                    
                    file_info = {
                        "id": entry["id"],
                        "type": entry["category"]["@term"],
                        "name": entry["content"]["m:properties"]["d:Name"],
                        "server_relative_url": server_relative_url,
                        "download_url": download_url,
                        "length": entry["content"]["m:properties"]["d:Length"]["#text"],
                        "updated": entry["updated"],
                        "size_readable": self.convert_size(entry["content"]["m:properties"]["d:Length"]["#text"]),
                        "updated_hk_time": self.convert_to_hk_time(entry["updated"])
                    }
                    files.append(file_info)
        return files


In [19]:
client = SharePointClient(
    site_url=os.getenv("SHAREPOINT_SITE_URL"),
    verify_ssl=False  # Set to True in production
)

folders = client.get_folders()
print("Folders:", len(folders) if folders else 0)
for folder in folders or []:
    print(folder['name'][:2] + "*"*10)
    print("Has SubFolder: ", folder["has_subfolders"])
    files = client.list_folder_files(folder)
    print(f" File counts: {len(files) if files else 0}")
    for file in files:
        print(f"filename: {file["name"][:2]}" + "*"*10)

Folders: 45
Le**********
Has SubFolder:  False
 File counts: 19
filename: TF**********
filename: ch**********
filename: ic**********
filename: TF**********
filename: TF**********
filename: ic**********
filename: ic**********
filename: TF**********
filename: ic**********
filename: TF**********
filename: ic**********
filename: ic**********
filename: TF**********
filename: ic**********
filename: ic**********
filename: ic**********
filename: ic**********
filename: ic**********
filename: ic**********
HS**********
Has SubFolder:  False
 File counts: 0
Si**********
Has SubFolder:  False
 File counts: 0
AI**********
Has SubFolder:  False
 File counts: 0
Pu**********
Has SubFolder:  False
 File counts: 0
Pa**********
Has SubFolder:  False
 File counts: 27
filename: Co**********
filename: Co**********
filename: FA**********
filename: Ed**********
filename: de**********
filename: Se**********
filename: Do**********
filename: Se**********
filename: My**********
filename: Wh**********
filename: My*