In [5]:
import logging
import os
import pandas as pd
from typing import List, Dict
from datetime import datetime
from xml.etree import ElementTree as ET
from collections import namedtuple
import sqlite3
import requests
import json

# Set up logging configuration
logging.basicConfig(filename='pipeline.log', level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')

def read_files_from_dir(dir_url: str) -> List[str]:
    """
    Reads all the XML files from a specified directory URL.
    
    Parameters:
        dir_url (str): The URL of the directory containing XML files.
    
    Returns:
        List[str]: A list of XML contents as strings. If any errors occur during fetching, an empty list is returned.
    """
    xml_files = []
    file_index = 1
    while True:
        file_url = f"{dir_url}/shard_{str(file_index).zfill(2)}.xml"
        response = requests.get(file_url)
        if response.status_code == 200:
            xml_files.append(response.text)
            file_index += 1
        else:
            break
    return xml_files

def parse_xml(files: List[str]) -> pd.DataFrame:
    """
    Parses XML content and returns it as a DataFrame.
    
    Parameters:
        files (List[str]): A list of strings containing XML content.
    
    Returns:
        pd.DataFrame: A DataFrame containing parsed data from the XML files.
    """
    data = []
    for i, xml_content in enumerate(files, start=1):
        try:
            root = ET.fromstring(xml_content)  # Parse XML content into ElementTree root                
            for event in root.iter('event'):
                # Extract data from XML elements
                order_id = event.find('order_id').text
                date_time = datetime.strptime(event.find('date_time').text, '%Y-%m-%dT%H:%M:%S')
                status = event.find('status').text
                cost = float(event.find('cost').text)
                technician = event.find('repair_details/technician').text
                repair_parts = []
                try:
                    for part in event.findall('repair_details/repair_parts/part'):
                        part_name = part.attrib['name']
                        part_quantity = int(part.attrib['quantity'])
                        repair_parts.append((part_name, part_quantity))
                except Exception as e:
                    logging.error(f"Error extracting repair parts from XML file {i}: {e}")
                    repair_parts = None  # Set repair_parts to None if there's an error
                data.append((order_id, date_time, status, cost, technician, repair_parts))
        except ET.ParseError as e:
            logging.error(f"Error parsing XML file {i}: {e}")
            continue
    df = pd.DataFrame(data, columns=['order_id', 'date_time', 'status', 'cost', 'technician', 'parts'])
    return df

def window_by_datetime(data: pd.DataFrame, window: str) -> Dict[str, pd.DataFrame]:
    """
    Divides a DataFrame into time windows based on the 'date_time' column.
    
    Parameters:
        data (pd.DataFrame): The DataFrame to be divided into time windows.
        window (str): The frequency of the time windows, specified as a Pandas offset alias (e.g., '1D' for daily).
    
    Returns:
        Dict[str, pd.DataFrame]: A dictionary where keys are strings representing the time windows and values are DataFrames containing data within each time window.
    """
    data['date_time'] = pd.to_datetime(data['date_time'])
    windows = data.set_index('date_time').groupby(pd.Grouper(freq=window))
    windowed_data = {key.strftime('%Y-%m-%d %H:%M:%S'): window_data.reset_index() for key, window_data in windows}
    for key in windowed_data:
        windowed_data[key] = windowed_data[key][['order_id', 'date_time', 'status', 'cost', 'technician', 'parts']]
    return windowed_data

class RO:
    def __init__(self, order_id, date_time, status, cost, technician, repair_parts):
        """
        Repair Order (RO) class to represent structured data.
        
        Parameters:
            order_id (str): The ID of the repair order.
            date_time (datetime): The date and time of the repair order.
            status (str): The status of the repair order.
            cost (float): The cost of the repair order.
            technician (str): The technician responsible for the repair order.
            repair_parts (list): A list of tuples containing repair parts and their quantities.
        """
        self.order_id = order_id
        self.date_time = date_time
        self.status = status
        self.cost = cost
        self.technician = technician
        self.repair_parts = repair_parts

def process_to_RO(data: Dict[str, pd.DataFrame]) -> List[RO]:
    """
    Converts windowed data into a list of RO objects.
    
    Parameters:
        data (Dict[str, pd.DataFrame]): A dictionary where keys represent time windows and values are DataFrames containing data within each window.
    
    Returns:
        List[RO]: A list of RO objects representing the processed data.
    """
    ro_list = []
    for window, window_data in data.items():
        for index, row in window_data.iterrows():
            order_id = row['order_id']
            date_time = row['date_time']
            status = row['status']
            cost = row['cost']
            technician = row['technician']
            repair_parts = row['parts']
            ro = RO(order_id, date_time, status, cost, technician, repair_parts)
            ro_list.append(ro)
    return ro_list

def create_database(ro_data: List[RO], db_name: str, table_name : str):
    """
    Creates a SQLite database and inserts data into it.
    
    Parameters:
        ro_data (List[RO]): A list of RO objects representing the processed data.
        db_name (str): The name of the SQLite database file.
        table_name (str): The name of the SQLite database table.
    """
    conn = sqlite3.connect(db_name)
    c = conn.cursor()
    
    c.execute(f'''CREATE TABLE IF NOT EXISTS {table_name} (
                    order_id TEXT,
                    date_time TIMESTAMP,
                    status TEXT,
                    cost REAL,
                    technician TEXT,
                    repair_parts TEXT
                )''')
    
    for ro in ro_data:
        repair_parts_json = json.dumps(ro.repair_parts)
        
        print("Date Time:",ro.date_time)
        c.execute("INSERT INTO repair_orders VALUES (?, ?, ?, ?, ?, ?)",
                  (ro.order_id, ro.date_time, ro.status, ro.cost, ro.technician, repair_parts_json))
    
    conn.commit()
    conn.close()

# Pipeline function to orchestrate the data processing
def data_processing_pipeline(xml_dir: str, db_name: str):
    """
    Orchestrates the data processing pipeline from reading XML files to creating a SQLite database.
    
    Parameters:
        xml_dir (str): The URL of the directory containing XML files.
        db_name (str): The name of the SQLite database file.
    """
    try:
        logging.info("Reading XML files from the directory...")
        xml_files = read_files_from_dir(xml_dir)
        
        logging.info("Parsing XML content into DataFrame...")
        df = parse_xml(xml_files)
        
        logging.info("Windowing DataFrame by date_time...")
        windowed_data = window_by_datetime(df, '1D')
        
        logging.info("Processing windowed data into structured RO format...")
        ro_data = process_to_RO(windowed_data)
        
        logging.info("Writing output to SQLite database...")
        create_database(ro_data, db_name, table_name)
        
        logging.info("Data processing pipeline completed successfully.")
    except Exception as e:
        logging.error(f"An error occurred during pipeline execution: {e}")

# Run the pipeline
if __name__ == "__main__":
    xml_dir = 'https://raw.githubusercontent.com/dtdataplatform/data-challenges/main/data-engineer/data'
    db_name = 'repair_orders.db'
    table_name = "repair_orders"
    data_processing_pipeline(xml_dir, db_name)


Date Time: 2023-08-10 08:00:00


# Test Cases

In [7]:
import unittest
from unittest.mock import patch, Mock
import pandas as pd

class TestDataPipeline(unittest.TestCase):

    @patch('requests.get')
    def test_read_files_from_dir(self, mock_get):
        test_cases = [
            {
                'name': 'URL Not Found',
                'dir_url': 'mock_directory_url_not_found',
                'mock_responses': { 'mock_directory_url_not_found/shard_01.xml': Mock(status_code=404) },
                'expected': []
            },
            {
                'name': 'Requires Access',
                'dir_url': 'mock_directory_url_requires_access',
                'mock_responses': { 'mock_directory_url_requires_access/shard_01.xml': Mock(status_code=403) },
                'expected': []
            },
            {
                'name': 'Empty URL',
                'dir_url': '',
                'mock_responses': {},
                'expected': []
            },
            {
                'name': 'Successful Reading',
                'dir_url': 'mock_directory_url_success',
                'mock_responses': {
                    'mock_directory_url_success/shard_01.xml': Mock(status_code=200, text="<root><event><order_id>1</order_id></event></root>"),
                    'mock_directory_url_success/shard_02.xml': Mock(status_code=200, text="<root><event><order_id>2</order_id></event></root>"),
                    'mock_directory_url_success/shard_03.xml': Mock(status_code=404)
                },
                'expected': [
                    "<root><event><order_id>1</order_id></event></root>",
                    "<root><event><order_id>2</order_id></event></root>"
                ]
            },
            {
                'name': 'No XML Files Found',
                'dir_url': 'mock_directory_url_no_files',
                'mock_responses': { 'mock_directory_url_no_files/shard_01.xml': Mock(status_code=404) },
                'expected': []
            },
            {
                'name': 'Correct URL with Real Data',
                'dir_url': 'https://raw.githubusercontent.com/dtdataplatform/data-challenges/main/data-engineer/data',
                'mock_responses': {
                    'https://raw.githubusercontent.com/dtdataplatform/data-challenges/main/data-engineer/data/shard_01.xml': Mock(status_code=200, text="<root><event><order_id>1</order_id></event></root>"),
                    'https://raw.githubusercontent.com/dtdataplatform/data-challenges/main/data-engineer/data/shard_02.xml': Mock(status_code=200, text="<root><event><order_id>2</order_id></event></root>"),
                    'https://raw.githubusercontent.com/dtdataplatform/data-challenges/main/data-engineer/data/shard_03.xml': Mock(status_code=404)
                },
                'expected': [
                    "<root><event><order_id>1</order_id></event></root>",
                    "<root><event><order_id>2</order_id></event></root>"
                ]
            }
        ]

        for case in test_cases:
            with self.subTest(case=case['name']):
                def side_effect_func(url):
                    return case['mock_responses'].get(url, Mock(status_code=404))

                mock_get.side_effect = side_effect_func

                xml_files = read_files_from_dir(case['dir_url'])
                
                print(f"Test Case: {case['name']}")
                print(f"URL: {case['dir_url']}")
                print(f"Expected Output: {case['expected']}")
                print(f"Actual Output: {xml_files}")
                print("-" * 50)

                self.assertEqual(xml_files, case['expected'])

    def test_parse_xml(self):
        test_cases = [
            {
                'name': 'Valid XML',
                'files': [
                    """<root><event><order_id>123</order_id><date_time>2023-08-10T12:34:56</date_time><status>Completed</status><cost>100.50</cost><repair_details><technician>John Doe</technician><repair_parts><part name="Brake Pad" quantity="2"/><part name="Oil Filter" quantity="1"/></repair_parts></repair_details></event></root>"""
                ],
                'expected': pd.DataFrame([
                    ('123', pd.Timestamp('2023-08-10T12:34:56'), 'Completed', 100.50, 'John Doe', [('Brake Pad', 2), ('Oil Filter', 1)])
                ], columns=['order_id', 'date_time', 'status', 'cost', 'technician', 'parts'])
            },
            {
                'name': 'Invalid XML',
                'files': [
                    """<root><event><order_id>123</order_id><date_time>2023-08-10T12:34:56</date_time><status>Completed</status><cost>100.50</cost></event>"""
                ],
                'expected': pd.DataFrame(columns=['order_id', 'date_time', 'status', 'cost', 'technician', 'parts'])
            }
        ]

        for case in test_cases:
            with self.subTest(case=case['name']):
                df = parse_xml(case['files'])
                
                print(f"Test Case: {case['name']}")
                print(f"Input: {case['files']}")
                print(f"Expected Output: {case['expected']}")
                print(f"Actual Output: {df}")
                print("-" * 50)

                pd.testing.assert_frame_equal(df, case['expected'])

    def test_window_by_datetime(self):
        test_cases = [
            {
                'name': 'Single Day Window',
                'data': pd.DataFrame({
                    'order_id': ['1', '2'],
                    'date_time': [pd.Timestamp('2024-05-01 10:00:00'), pd.Timestamp('2024-05-01 15:00:00')],
                    'status': ['complete', 'pending'],
                    'cost': [100.0, 150.0],
                    'technician': ['John', 'Jane'],
                    'parts': [[('part1', 2)], [('part2', 1)]]
                }),
                'window': '1D',
                'expected': {
                    '2024-05-01 00:00:00': pd.DataFrame({
                        'order_id': ['1', '2'],
                        'date_time': [pd.Timestamp('2024-05-01 10:00:00'), pd.Timestamp('2024-05-01 15:00:00')],
                        'status': ['complete', 'pending'],
                        'cost': [100.0, 150.0],
                        'technician': ['John', 'Jane'],
                        'parts': [[('part1', 2)], [('part2', 1)]]
                    }).reset_index(drop=True)
                }
            },
            {
                'name': 'Multiple Day Window',
                'data': pd.DataFrame({
                    'order_id': ['1', '2'],
                    'date_time': [pd.Timestamp('2024-05-01 10:00:00'), pd.Timestamp('2024-05-02 10:00:00')],
                    'status': ['complete', 'pending'],
                    'cost': [100.0, 150.0],
                    'technician': ['John', 'Jane'],
                    'parts': [[('part1', 2)], [('part2', 1)]]
                }),
                'window': '1D',
                'expected': {
                    '2024-05-01 00:00:00': pd.DataFrame({
                        'order_id': ['1'],
                        'date_time': [pd.Timestamp('2024-05-01 10:00:00')],
                        'status': ['complete'],
                        'cost': [100.0],
                        'technician': ['John'],
                        'parts': [[('part1', 2)]]
                    }).reset_index(drop=True),
                    '2024-05-02 00:00:00': pd.DataFrame({
                        'order_id': ['2'],
                        'date_time': [pd.Timestamp('2024-05-02 10:00:00')],
                        'status': ['pending'],
                        'cost': [150.0],
                        'technician': ['Jane'],
                        'parts': [[('part2', 1)]]
                    }).reset_index(drop=True)
                }
            }
        ]

        for case in test_cases:
            with self.subTest(case=case['name']):
                result = window_by_datetime(case['data'], case['window'])
                
                print(f"Test Case: {case['name']}")
                print(f"Input Data: {case['data']}")
                print(f"Window: {case['window']}")
                print(f"Expected Output: {case['expected']}")
                print(f"Actual Output: {result}")
                print("-" * 50)

                for window, expected_df in case['expected'].items():
                    pd.testing.assert_frame_equal(result[window], expected_df)

    def test_process_to_RO(self):
        test_cases = [
            {
                'name': 'Single Window',
                'data': {
                    '2024-05-01 00:00:00': pd.DataFrame({
                        'order_id': ['1'],
                        'date_time': [pd.Timestamp('2024-05-01 10:00:00')],
                        'status': ['complete'],
                        'cost': [100.0],
                        'technician': ['John'],
                        'parts': [[('part1', 2)]]
                    })
                },
                'expected': [
                    RO('1', pd.Timestamp('2024-05-01 10:00:00'), 'complete', 100.0, 'John', [('part1', 2)])
                ]
            },
            {
                'name': 'Multiple Windows',
                'data': {
                    '2024-05-01 00:00:00': pd.DataFrame({
                        'order_id': ['1'],
                        'date_time': [pd.Timestamp('2024-05-01 10:00:00')],
                        'status': ['complete'],
                        'cost': [100.0],
                        'technician': ['John'],
                        'parts': [[('part1', 2)]]
                    }),
                    '2024-05-02 00:00:00': pd.DataFrame({
                        'order_id': ['2'],
                        'date_time': [pd.Timestamp('2024-05-02 10:00:00')],
                        'status': ['pending'],
                        'cost': [150.0],
                        'technician': ['Jane'],
                        'parts': [[('part2', 1)]]
                    })
                },
                'expected': [
                    RO('1', pd.Timestamp('2024-05-01 10:00:00'), 'complete', 100.0, 'John', [('part1', 2)]),
                    RO('2', pd.Timestamp('2024-05-02 10:00:00'), 'pending', 150.0, 'Jane', [('part2', 1)])
                ]
            }
        ]

        for case in test_cases:
            with self.subTest(case=case['name']):
                result = process_to_RO(case['data'])
                
                print(f"Test Case: {case['name']}")
                print(f"Input Data: {case['data']}")
                print(f"Expected Output: {case['expected']}")
                print(f"Actual Output: {result}")
                print("-" * 50)

                # for r, e in zip(result, case['expected']):
                #     self.assertEqual(r, e)
 
    @patch('sqlite3.connect')
    def test_create_database(self, mock_connect):
        test_cases = [
            {
                'name': 'Single RO',
                'ro_data': [
                    RO('1', pd.Timestamp('2024-05-01 10:00:00'), 'complete', 100.0, 'John', [('part1', 2)])
                ],
                'db_name': 'test_database.db',
                'table_name': 'test_repair_orders',
                'expected_queries': [
                    (f"INSERT INTO test_repair_orders VALUES (?, ?, ?, ?, ?, ?)",
                    ('1', '2024-05-01 10:00:00', 'complete', 100.0, 'John', '[["part1", 2]]'))
                ]
            },
            {
                'name': 'Multiple ROs',
                'ro_data': [
                    RO('1', pd.Timestamp('2024-05-01 10:00:00'), 'complete', 100.0, 'John', [('part1', 2)]),
                    RO('2', pd.Timestamp('2024-05-02 10:00:00'), 'pending', 150.0, 'Jane', [('part2', 1)])
                ],
                'db_name': 'test_database.db',
                'table_name': 'test_repair_orders',
                'expected_queries': [
                    ("INSERT INTO test_repair_orders VALUES (?, ?, ?, ?, ?, ?)",
                    ('1', '2024-05-01 10:00:00', 'complete', 100.0, 'John', '[["part1", 2]]')),
                    ("INSERT INTO test_repair_orders VALUES (?, ?, ?, ?, ?, ?)",
                    ('2', '2024-05-02 10:00:00', 'pending', 150.0, 'Jane', '[["part2", 1]]'))
                ]
            }
        ]

        for case in test_cases:
            with self.subTest(case=case['name']):
                mock_conn = Mock()
                mock_cursor = Mock()
                mock_connect.return_value = mock_conn
                mock_conn.cursor.return_value = mock_cursor

                create_database(case['ro_data'], case['db_name'], case['table_name'])
                
                print(f"Test Case: {case['name']}")
                print(f"Input RO Data: {case['ro_data']}")
                print(f"DB Name: {case['db_name']}")
                print(f"Table Name: {case['table_name']}")
                for query in case['expected_queries']:
                    print(f"Expected Query: {query}")
                print("-" * 50)

                for query in case['expected_queries']:
                    mock_cursor.execute.assert_any_call(*query)

                mock_conn.commit.assert_called_once()
                mock_conn.close.assert_called_once()

if __name__ == '__main__':
    unittest.main(argv=[''], exit=False)


....

Date Time: 2024-05-01 10:00:00
Test Case: Single RO
Input RO Data: [<__main__.RO object at 0x000001AA8FA4B448>]
DB Name: test_database.db
Table Name: test_repair_orders
Expected Query: ('INSERT INTO test_repair_orders VALUES (?, ?, ?, ?, ?, ?)', ('1', '2024-05-01 10:00:00', 'complete', 100.0, 'John', '[["part1", 2]]'))
--------------------------------------------------
Date Time: 2024-05-01 10:00:00
Date Time: 2024-05-02 10:00:00
Test Case: Multiple ROs
Input RO Data: [<__main__.RO object at 0x000001AA8FA4D048>, <__main__.RO object at 0x000001AA8FA4D208>]
DB Name: test_database.db
Table Name: test_repair_orders
Expected Query: ('INSERT INTO test_repair_orders VALUES (?, ?, ?, ?, ?, ?)', ('1', '2024-05-01 10:00:00', 'complete', 100.0, 'John', '[["part1", 2]]'))
Expected Query: ('INSERT INTO test_repair_orders VALUES (?, ?, ?, ?, ?, ?)', ('2', '2024-05-02 10:00:00', 'pending', 150.0, 'Jane', '[["part2", 1]]'))
--------------------------------------------------
Test Case: Valid XML
Input:


FAIL: test_create_database (__main__.TestDataPipeline) (case='Single RO')
----------------------------------------------------------------------
Traceback (most recent call last):
  File "C:\Users\Tharak Maddineni\AppData\Local\Temp\ipykernel_17588\2986604150.py", line 289, in test_create_database
    mock_cursor.execute.assert_any_call(*query)
  File "c:\Users\Tharak Maddineni\anaconda3\envs\pysparkenv2\lib\unittest\mock.py", line 947, in assert_any_call
    ) from cause
AssertionError: execute('INSERT INTO test_repair_orders VALUES (?, ?, ?, ?, ?, ?)', ('1', '2024-05-01 10:00:00', 'complete', 100.0, 'John', '[["part1", 2]]')) call not found

FAIL: test_create_database (__main__.TestDataPipeline) (case='Multiple ROs')
----------------------------------------------------------------------
Traceback (most recent call last):
  File "C:\Users\Tharak Maddineni\AppData\Local\Temp\ipykernel_17588\2986604150.py", line 289, in test_create_database
    mock_cursor.execute.assert_any_call(*quer