In [1]:
import pandas as pd
import json
import os
from datetime import datetime

# Function to load CSV data with incremental updates
def load_csv(file_path, last_execution_timestamp):
    data = pd.read_csv(file_path)
    # Assuming there's a 'last_updated' column in CSV indicating record modification time
    new_or_modified_data = data[data['last_updated'] > last_execution_timestamp]
    return new_or_modified_data

# Function to load JSON data with incremental updates
def load_json(file_path, last_execution_timestamp):
    with open(file_path, 'r') as json_file:
        data = json.load(json_file)
        new_or_modified_data = [record for record in data if record['Created At'] > last_execution_timestamp]
    return new_or_modified_data

In [2]:
import pandas as pd
import json
import os
from datetime import datetime

def load_csv(file_path, last_execution_timestamp):
    data = pd.read_csv(file_path)
    if 'last_updated' in data.columns:
        data['last_updated'] = pd.to_datetime(data['last_updated'], utc=True)  # Ensure timezone-aware datetime
        data = data[data['last_updated'] > last_execution_timestamp]
    return data

def parse_iso_timestamp(iso_timestamp):
    # Parse ISO 8601 format timestamp (e.g., '2021-03-15T17:13:19.000Z')
    return datetime.strptime(iso_timestamp, '%Y-%m-%dT%H:%M:%S.%fZ')

def load_json(file_path, last_execution_timestamp):
    with open(file_path, 'r') as json_file:
        data = json.load(json_file)

        # Convert last_execution_timestamp to datetime object if it's not already
        if not isinstance(last_execution_timestamp, datetime):
            last_execution_timestamp = datetime.strptime(last_execution_timestamp, '%Y-%m-%d %H:%M:%S')

        # Filter records based on the timestamp comparison
        new_or_modified_data = [
            record for record in data
            if parse_iso_timestamp(record['Created At']) > last_execution_timestamp
        ]

        # Convert filtered records to DataFrame
        filtered_data_df = pd.DataFrame(new_or_modified_data)

    return filtered_data_df

def clean_order_ids(data):
    # Remove specific suffixes from the 'Order_ID' column
    if 'Order_ID' in data.columns:
        data['Order_ID'] = data['Order_ID'].astype(str).str.replace('YR-', '').str.replace(',0', '')
    return data

def merge_and_transform(customers_data, orders_data, deliveries_data):
    merged_data = pd.merge(customers_data, orders_data, on='Order_ID', how='outer')
    merged_data = pd.merge(merged_data, deliveries_data, on='Order_ID', how='outer')
    return merged_data

def ingest_data(file_path, last_execution_timestamp):
    file_extension = os.path.splitext(file_path)[1]
    if file_extension == '.csv':
        data = load_csv(file_path, last_execution_timestamp)
    elif file_extension == '.json':
        data = load_json(file_path, last_execution_timestamp)
    else:
        raise ValueError("Unsupported file format")
    return data

def main():
    # Set the last execution timestamp (assuming it's stored persistently)
    last_execution_timestamp = '2024-04-10 00:00:00'

    # Example usage
    customers_data = ingest_data('data_dir/Market 1 Customers.json', last_execution_timestamp)
    orders_data = ingest_data('data_dir/Market 1 Orders.csv', last_execution_timestamp)
    deliveries_data = ingest_data('data_dir/Market 1 Deliveries.csv', last_execution_timestamp)

    # Clean 'Order_ID' columns in the loaded dataframes
    customers_data = clean_order_ids(customers_data)
    orders_data = clean_order_ids(orders_data)
    deliveries_data = clean_order_ids(deliveries_data)

    # Merge and transform the data
    merged_transformed_data = merge_and_transform(customers_data, orders_data, deliveries_data)

    # Update last_execution_timestamp after successful processing (store in persistent storage)
    last_execution_timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')  # Update to current time

    # Store or update last_execution_timestamp in persistent storage for future use
    # Example: write to a file, database, or environment variable

if __name__ == "__main__":
    main()


  data = pd.read_csv(file_path)


KeyError: 'Order_ID'

In [16]:
import pandas as pd
import json
import os
from datetime import datetime

def load_csv(file_path, last_execution_timestamp):
    data = pd.read_csv(file_path)
    if 'last_updated' in data.columns:
        data['last_updated'] = pd.to_datetime(data['last_updated'], utc=True)  # Ensure timezone-aware datetime
        data = data[data['last_updated'] > last_execution_timestamp]
    return data

def load_json(file_path, last_execution_timestamp):
    with open(file_path, 'r') as json_file:
        data = json.load(json_file)
        data_df = pd.DataFrame(data)

        # Convert 'Created At' column to datetime if present
        if 'Created At' in data_df.columns:
            data_df['Created At'] = pd.to_datetime(data_df['Created At'])

        # Clean and rename 'Order ID' to 'Customer ID' if present
        if 'Order ID' in data_df.columns:
            data_df['Order ID'] = data_df['Customer ID'].astype(str).str.replace('YR-', '').str.replace(',0', '')

    return data_df

def ingest_data(file_path, last_execution_timestamp):
    file_extension = os.path.splitext(file_path)[1]
    if file_extension == '.csv':
        data = load_csv(file_path, last_execution_timestamp)
    elif file_extension == '.json':
        data = load_json(file_path, last_execution_timestamp)
    else:
        raise ValueError("Unsupported file format")
    return data

def main():
    # Set the last execution timestamp (assuming it's stored persistently)
    last_execution_timestamp = '2024-04-10 00:00:00'

    # Example usage for loading JSON and CSV data
    customers_data = ingest_data('data_dir/Market 1 Customers.json', last_execution_timestamp)
    orders_data = ingest_data('data_dir/Market 1 Orders.csv', last_execution_timestamp)
    deliveries_data = ingest_data('data_dir/Market 1 Deliveries.csv', last_execution_timestamp)
        
    # Check column names of each DataFrame
    print("Customers Data Columns:", customers_data.columns)
    print("Orders Data Columns:", orders_data.columns)
    print("Deliveries Data Columns:", deliveries_data.columns)


if __name__ == "__main__":
    main()


Customers Data Columns: Index(['Customer ID', 'Last Used Platform', 'Is Blocked', 'Created At',
       'Language', 'Outstanding Amount', 'Loyalty Points',
       'Number of employees'],
      dtype='object')
Orders Data Columns: Index(['Order ID', 'Order Status', 'Category Name', 'SKU',
       'Customization Group', 'Customization Option', 'Quantity', 'Unit Price',
       'Cost Price', 'Total Cost Price', 'Total Price', 'Order Total',
       'Sub Total', 'Tax', 'Delivery Charge', 'Tip', 'Discount',
       'Remaining Balance', 'Payment Method', 'Additional Charge',
       'Taxable Amount', 'Transaction ID', 'Currency Symbol',
       'Transaction Status', 'Promo Code', 'Customer ID', 'Merchant ID',
       'Description', 'Distance (in km)', 'Order Time', 'Pickup Time',
       'Delivery Time', 'Ratings', 'Reviews', 'Merchant Earning',
       'Commission Amount', 'Commission Payout Status',
       'Order Preparation Time', 'Debt Amount', 'Redeemed Loyalty Points',
       'Consumed Loyalty P

  data = pd.read_csv(file_path)


In [12]:
import pandas as pd
import json
import os
from datetime import datetime

def load_csv(file_path, last_execution_timestamp):
    data = pd.read_csv(file_path)
    if 'last_updated' in data.columns:
        data['last_updated'] = pd.to_datetime(data['last_updated'], utc=True)  # Ensure timezone-aware datetime
        data = data[data['last_updated'] > last_execution_timestamp]
    # Rename 'order_ID' column to 'Order ID' if present
    if 'order_ID' in data.columns:
        data.rename(columns={'order_ID': 'Order ID'}, inplace=True)
    return data

def load_json(file_path, last_execution_timestamp):
    with open(file_path, 'r') as json_file:
        data = json.load(json_file)
        data_df = pd.DataFrame(data)

        # Convert 'Created At' column to datetime if present
        if 'Created At' in data_df.columns:
            data_df['Created At'] = pd.to_datetime(data_df['Created At'])

        # Clean and rename 'Order ID' to 'Customer ID' if present
        if 'Order ID' in data_df.columns:
            data_df['Customer ID'] = data_df['Order ID'].astype(str).str.replace('YR-', '').str.replace(',0', '')

    return data_df

def ingest_data(file_path, last_execution_timestamp):
    file_extension = os.path.splitext(file_path)[1]
    if file_extension == '.csv':
        data = load_csv(file_path, last_execution_timestamp)
    elif file_extension == '.json':
        data = load_json(file_path, last_execution_timestamp)
    else:
        raise ValueError("Unsupported file format")
    return data

def main():
    # Set the last execution timestamp (assuming it's stored persistently)
    last_execution_timestamp = '2024-04-10 00:00:00'

    # Example usage for loading JSON and CSV data
    customers_data = ingest_data('data_dir/Market 1 Customers.json', last_execution_timestamp)
    orders_data = ingest_data('data_dir/Market 1 Orders.csv', last_execution_timestamp)
    deliveries_data = ingest_data('data_dir/Market 1 Deliveries.csv', last_execution_timestamp)

    # Check column names of each DataFrame
    print("Customers Data Columns:", customers_data.columns)
    print("Orders Data Columns:", orders_data.columns)
    print("Deliveries Data Columns:", deliveries_data.columns)

    # Merge data based on 'Customer ID' and 'Order ID'
    if 'Customer ID' in customers_data.columns and 'Order ID' in orders_data.columns and 'Order ID' in deliveries_data.columns:
        # Rename 'order_ID' column to 'Order ID' in orders_data DataFrame
        if 'order_ID' in orders_data.columns:
            orders_data.rename(columns={'order_ID': 'Order ID'}, inplace=True)

        merged_data = pd.merge(customers_data, orders_data, on=['Customer ID', 'Order ID'], how='outer')
        merged_data = pd.merge(merged_data, deliveries_data, on=['Customer ID', 'Order ID'], how='outer')

        # Display the merged data
        print("Merged Data:")
        print(merged_data)
    else:
        print("Required columns not found for merging.")

if __name__ == "__main__":
    main()


Customers Data Columns: Index(['Customer ID', 'Last Used Platform', 'Is Blocked', 'Created At',
       'Language', 'Outstanding Amount', 'Loyalty Points',
       'Number of employees'],
      dtype='object')
Orders Data Columns: Index(['Order ID', 'Order Status', 'Category Name', 'SKU',
       'Customization Group', 'Customization Option', 'Quantity', 'Unit Price',
       'Cost Price', 'Total Cost Price', 'Total Price', 'Order Total',
       'Sub Total', 'Tax', 'Delivery Charge', 'Tip', 'Discount',
       'Remaining Balance', 'Payment Method', 'Additional Charge',
       'Taxable Amount', 'Transaction ID', 'Currency Symbol',
       'Transaction Status', 'Promo Code', 'Customer ID', 'Merchant ID',
       'Description', 'Distance (in km)', 'Order Time', 'Pickup Time',
       'Delivery Time', 'Ratings', 'Reviews', 'Merchant Earning',
       'Commission Amount', 'Commission Payout Status',
       'Order Preparation Time', 'Debt Amount', 'Redeemed Loyalty Points',
       'Consumed Loyalty P

  data = pd.read_csv(file_path)


In [2]:
import pandas as pd

In [3]:
customers_json1= pd.read_json('data_dir/Market 1 Customers.json')
Deliveries1= pd.read_csv('data_dir/Market 1 Deliveries.csv')
Orders1= pd.read_csv('data_dir/Market 1 Orders.csv')


  Deliveries1= pd.read_csv('data_dir/Market 1 Deliveries.csv')


In [9]:
customers_json1.columns

Index(['Customer ID', 'Last Used Platform', 'Is Blocked', 'Created At',
       'Language', 'Outstanding Amount', 'Loyalty Points',
       'Number of employees'],
      dtype='object')

In [8]:
Orders1['Customer ID']

0        3755460
1        4541187
2        4541187
3        4541187
4        4541187
          ...   
12237    4313129
12238    4313129
12239    3986302
12240    3986302
12241    3986302
Name: Customer ID, Length: 12242, dtype: int64

In [None]:
Deliveries1.head()

Unnamed: 0,Task_ID,Order_ID,Relationship,Team_Name,Task_Type,Notes,Agent_ID,Agent_Name,Distance(m),Total_Time_Taken(min),...,Tip,Delivery_Charges,Discount,Subtotal,Payment_Type,Task_Category,Earning,Pricing,Unnamed: 30,Unnamed: 31
0,357410575,"YR-10691280,0",3.57e+29,Default Team,Delivery,Deliver tomorrow morning by 8.00a.m note,potatoes should be Medium large enough for Mak...,1280904,Malombe Kimathi,-,...,-,-,-,-,-,-,-,-,-,
1,357410575,"YR-10691280,0",3.57e+29,Default Team,Delivery,Deliver tomorrow morning by 8.00a.m note,potatoes should be Medium large enough for Mak...,1280904,Malombe Kimathi,-,...,Deliver tomorrow morning by 8.00a.m note,potatoes should be Medium large enough for Mak...,KSh 0.00,KSh 0.00,KSh 0.00,5900,CASH,-,-,-
2,303698961,"YR-8221753,0",3.04e+29,Default Team,Delivery,To be delivered today by 12,30pm,1115711,Kennedy Chege,-,...,To be delivered today by 12,30pm,KSh 0.00,KSh 0.00,KSh 0.00,6080,CASH,-,-,-
3,302906575,"YR-8133470,0",3.03e+29,Default Team,Delivery,DELIVER BY 6,30 PLEASE,-,-,-,...,-,-,-,-,-,-,-,-,-,
4,302906575,"YR-8133470,0",3.03e+29,Default Team,Delivery,DELIVER BY 6,30 PLEASE,-,-,-,...,DELIVER BY 6,30 PLEASE,KSh 0.00,KSh 0.00,KSh 0.00,5730,CASH,-,-,-


In [None]:
Orders1.head()

Unnamed: 0,Order ID,Order Status,Category Name,SKU,Customization Group,Customization Option,Quantity,Unit Price,Cost Price,Total Cost Price,...,Commission Amount,Commission Payout Status,Order Preparation Time,Debt Amount,Redeemed Loyalty Points,Consumed Loyalty Points,Cancellation Reason,Flat Discount,Checkout Template Name,Checkout Template Value
0,11265015,ORDERED,Cooking Fat & Oil,KKCO0487,,,1,4400,4250.0,4250.0,...,,,0,-,50,-,,0,-,-
1,11264651,ORDERED,Cleaning & Hygiene,KKPT280100,,,1,180,130.0,130.0,...,,,0,-,-,-,,0,-,-
2,11264651,ORDERED,Flour & Sugar,KKFS0702,,,6,263,247.0,1482.0,...,,,0,-,-,-,,0,-,-
3,11264651,ORDERED,Salt & Seasoning,KKSS0002,,,10,65,58.0,580.0,...,,,0,-,-,-,,0,-,-
4,11264651,ORDERED,Beverages,KKBE0105,,,1,249,208.5,208.5,...,,,0,-,-,-,,0,-,-


In [None]:
customers_json1.head()

Unnamed: 0,Customer ID,Last Used Platform,Is Blocked,Created At,Language,Outstanding Amount,Loyalty Points,Number of employees
0,3144837,WEB,0,2021-03-15T17:13:19.000Z,en,0,0,
1,3174590,WEB,0,2021-03-20T14:15:11.000Z,en,0,0,
2,3181998,WEB,0,2021-03-21T15:36:51.000Z,en,0,0,
3,3191244,WEB,0,2021-03-23T08:54:00.000Z,en,0,367,
4,3274222,WEB,0,2021-04-06T13:52:39.000Z,en,0,0,


In [None]:
Orders1.isna().sum()

Order ID                        0
Order Status                    0
Category Name                   6
SKU                             3
Customization Group         12242
Customization Option        12242
Quantity                        0
Unit Price                      0
Cost Price                      0
Total Cost Price                0
Total Price                     0
Order Total                     0
Sub Total                       0
Tax                             0
Delivery Charge                 0
Tip                             0
Discount                        0
Remaining Balance               0
Payment Method                  0
Additional Charge               0
Taxable Amount                  0
Transaction ID                  0
Currency Symbol                 0
Transaction Status          12242
Promo Code                  12242
Customer ID                     0
Merchant ID                     0
Description                  7073
Distance (in km)                0
Order Time    

In [None]:
customers_json1.head()

Unnamed: 0,Customer ID,Last Used Platform,Is Blocked,Created At,Language,Outstanding Amount,Loyalty Points,Number of employees
0,3144837,WEB,0,2021-03-15T17:13:19.000Z,en,0,0,
1,3174590,WEB,0,2021-03-20T14:15:11.000Z,en,0,0,
2,3181998,WEB,0,2021-03-21T15:36:51.000Z,en,0,0,
3,3191244,WEB,0,2021-03-23T08:54:00.000Z,en,0,367,
4,3274222,WEB,0,2021-04-06T13:52:39.000Z,en,0,0,


In [None]:
customers_json1.columns

Index(['Customer ID', 'Last Used Platform', 'Is Blocked', 'Created At',
       'Language', 'Outstanding Amount', 'Loyalty Points',
       'Number of employees'],
      dtype='object')

In [None]:
import os
import pandas as pd
import json
from datetime import datetime
import time

def load_csv(file_path):
    # Load CSV file into a pandas DataFrame
    try:
        df = pd.read_csv(file_path)
        return df
    except Exception as e:
        print(f"Error loading CSV file '{file_path}': {e}")
        return None

def load_json(file_path):
    # Load JSON file into a Python dictionary
    try:
        with open(file_path, 'r') as json_file:
            data = json.load(json_file)
            return data
    except Exception as e:
        print(f"Error loading JSON file '{file_path}': {e}")
        return None

def load_data_from_folder(folder_path):
    # Get list of files with their modification times
    files = [(f, os.path.getmtime(os.path.join(folder_path, f))) for f in os.listdir(folder_path)]

    # Sort files by modification time in descending order
    files.sort(key=lambda x: x[1], reverse=True)

    # Process only the most recent files (e.g., process the latest 5 files)
    num_files_to_process = 3  # Adjust as needed
    for file_name, _ in files[:num_files_to_process]:
        file_path = os.path.join(folder_path, file_name)

        if file_name.endswith('.csv'):
            # Load CSV file
            csv_data = load_csv(file_path)
            if csv_data is not None:
                print(f"CSV file '{file_name}' loaded successfully.")
                # Process or store the CSV data here

        elif file_name.endswith('.json'):
            # Load JSON file
            json_data = load_json(file_path)
            if json_data is not None:
                print(f"JSON file '{file_name}' loaded successfully.")
                # Process or store the JSON data here

        else:
            print(f"Unsupported file format: '{file_name}'")

def main():
    folder_path = 'C:/Users/kobby/OneDrive/Documents/Data Analysis/trestle_project/data-pipeline-for-a-large-e-commerce-platform/data_dir'  # Specify your folder path here
    if not os.path.isdir(folder_path):
        print(f"Error: Folder '{folder_path}' not found.")
        return

    load_data_from_folder(folder_path)

if __name__ == "__main__":
    main()


CSV file 'Market 1 Orders.csv' loaded successfully.
CSV file 'Market 1 Deliveries.csv' loaded successfully.
JSON file 'Market 1 Customers.json' loaded successfully.


  df = pd.read_csv(file_path)
