In [1]:
!pip install jsonpath_ng



In [2]:
import os
import csv
import json
import numpy as np
import pandas as pd
import sqlite3
import functools as ft
import matplotlib.pyplot as plt
from abc import ABC, abstractmethod
from jsonpath_ng import parse
from enum import Enum
%matplotlib inline


Data

In [3]:
with open("Sales by Store.csv", "r", encoding="utf-8-sig") as file:
    data = list(csv.DictReader(file))
    for item in data:
        item["transaction_id"] = str(item["transaction_id"])
data[:10]

[{'transaction_id': '1338',
  'transaction_date': '42736',
  'transaction_time': '0.295960648',
  'store_city': 'New York',
  'staff_id': '30',
  'staff_Name': 'Amela Chadwick',
  'customer_id': '5465',
  'instore_yn': 'Y',
  'order': '1',
  'line_item_id': '1',
  'product_id': '32',
  'product': 'Ethiopia Rg',
  'product_category': 'Coffee',
  'quantity_sold': '2',
  'unit_price': '3',
  'promo_item_yn': 'N'},
 {'transaction_id': '399',
  'transaction_date': '42736',
  'transaction_time': '0.29787037',
  'store_city': 'New York',
  'staff_id': '30',
  'staff_Name': 'Amela Chadwick',
  'customer_id': '5895',
  'instore_yn': 'Y',
  'order': '1',
  'line_item_id': '1',
  'product_id': '57',
  'product': 'Spicy Eye Opener Chai Lg',
  'product_category': 'Tea',
  'quantity_sold': '2',
  'unit_price': '3.1',
  'promo_item_yn': 'N'},
 {'transaction_id': '451',
  'transaction_date': '42736',
  'transaction_time': '0.301435185',
  'store_city': 'New York',
  'staff_id': '26',
  'staff_Name': '

Abstract Base Calss (ABC) for mutual methods

In [4]:
class Interface(ABC):

    @abstractmethod
    def get_data_by_field(self, field_name):
        """Fetch the data by given feild name """

    @abstractmethod
    def get_data_by_id(self, id):
        """Fetch the data by given ID  """

    @abstractmethod
    def get(self):
        """Fetch all data """

Transform Operations

In [5]:
class TransformMask(Enum):
    # add here any masks you want
    CLEAN_STRING = ".strip().lower()"
    CAPITAL_LETTER = ".strip().lower().title()"
    CLEAN_NUM = ""

Database Class - Define Common Properties for Source, Target, Mapping

In [6]:
class Database:
    def __init__(self):
        self.db = {
            "source": [],
            "destination": [],
            "transform": [],
            "mapping": []
        }
        self.add_source(1, "transaction_id", "str")
        self.add_destination(1, "Transaction_Id", "int", "Fact")
        self.add_transform(1, "CLEAN_NUM")
        self.add_mapping(1, 1, 1, 1, "Fact")

        self.add_source(2, "transaction_date", "str")
        self.add_destination(2, "Transaction_Date", "str", "Fact")
        self.add_transform(2, "CLEAN_STRING")
        self.add_mapping(2, 2, 2, 2, "Fact")

        self.add_source(3, "store_city", "str")
        self.add_destination(3, "Store_City", "str", "Fact")
        self.add_transform(3, "CAPITAL_LETTER")
        self.add_mapping(3, 3, 3, 3, "Fact")

        self.add_source(4, "staff_id", "str")
        self.add_destination(4, "Staff_Id", "int", "Fact")
        self.add_transform(1, "CLEAN_NUM")
        self.add_mapping(4, 4, 4, 1, "Fact")

        self.add_destination(5, "Staff_Id", "int", "Dim_Staff")
        self.add_transform(1, "CLEAN_NUM")
        self.add_mapping(5, 4, 5, 1, "Dim_Staff")

        self.add_source(5, "staff_Name", "str")
        self.add_destination(6, "Staff_Name", "str", "Dim_Staff")
        self.add_transform(3, "CAPITAL_LETTER")
        self.add_mapping(6, 5, 6, 3, "Dim_Staff")

        self.add_source(6, "customer_id", "str")
        self.add_destination(7, "Customer_Id", "int", "Fact")
        self.add_transform(1, "CLEAN_NUM")
        self.add_mapping(7, 6, 7, 1, "Fact")

        self.add_source(7, "product_id", "str")
        self.add_destination(8, "Product_Id", "int", "Fact")
        self.add_transform(1, "CLEAN_NUM")
        self.add_mapping(8, 7, 8, 1, "Fact")

        self.add_destination(9, "Product_Id", "int", "Dim_Product")
        self.add_transform(1, "CLEAN_NUM")
        self.add_mapping(9, 7, 9, 1, "Dim_Product")

        self.add_source(8, "product", "str")
        self.add_destination(10, "Product", "str", "Dim_Product")
        self.add_transform(3, "CAPITAL_LETTER")
        self.add_mapping(10, 8, 10, 3, "Dim_Product")

        self.add_source(9, "product_category", "str")
        self.add_destination(11, "Product_Category", "str", "Dim_Product")
        self.add_transform(3, "CAPITAL_LETTER")
        self.add_mapping(11, 9, 11, 3, "Dim_Product")

        self.add_source(10, "quantity_sold", "str")
        self.add_destination(12, "Quantity", "int", "Fact")
        self.add_transform(1, "CLEAN_NUM")
        self.add_mapping(12, 10, 12, 1, "Fact")

        self.add_source(11, "unit_price", "str")
        self.add_destination(13, "Unit_Price", "float", "Fact")
        self.add_transform(1, "CLEAN_NUM")
        self.add_mapping(13, 11, 13, 1, "Fact")


    def add_source(self, id, field, typ):
            self.db["source"].append({
                "id": id,
                "source_field_name": field,
                "source_field_mapping": field,
                "source_field_type": typ,
                "is_required": True,
            })

    def add_destination(self, id, field, typ, table):
        self.db["destination"].append({
            "id": id,
            "destination_field_name": field,
            "destination_field_mapping": field,
            "destination_field_type": typ,
            "default_value": "0",
            "destination_table": table,
        })

    def add_transform(self, id, mask):
        self.db["transform"].append({
            "id": id,
            "transform_mask": mask,
        })

    def add_mapping(self, id, source, dest, transform,table):
      self.db["mapping"].append({
          "id": id,
          "mapping_source": source,
          "mapping_destination": dest,
          "mapping_transform": transform,
          "destination_table": table,
        })


    @property
    def get_data_source_target_mapping(self):
      return self.db


Source Class

In [7]:
class Source(Interface, Database):
    def __init__(self):
        Database.__init__(self)

    # should be implemented - inherited from Interface
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("source")

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id") == self.id:
                return x
        return None

Target Class

In [8]:
class Target(Interface, Database):

    def __init__(self):
        Database.__init__(self)

    # should be implemented - inherited from Interface
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("destination")

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id").__str__() == self.id.__str__():
                return x
        return None

Transform Class

In [9]:
class Transform(Interface, Database):

    def __init__(self):
        Database.__init__(self)

    # should be implemented - inherited from Interface
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("transform", [])

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id").__str__() == self.id.__str__():
                return x
        return None

Mapping Class

In [10]:
class Mappings(Interface, Database):

    def __init__(self):
        Database.__init__(self)

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("mapping")

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id").__str__() == self.id.__str__():
                return x
        return None

    def get_data_by_field(self, field_name):
        return None

Format Class - JSON

In [11]:
class JsonQuery:
    def __init__(self, json_path, json_data):
        self.json_path = json_path
        self.json_data = json_data

    def get(self):
        jsonpath_expression = parse(self.json_path)
        match = jsonpath_expression.find(self.json_data)
        source_data_value = match[0].value
        return source_data_value

Combine it All - STTM

In [12]:
class STTM:
    def __init__(self, input_json):
        self.input_json = input_json
        self.mapping_instance = Mappings()
        self.source_instance = Source()
        self.destination_instance = Target()
        self.transform_instance = Transform()
        self.look_up_mask = {i.name: i.value for i in TransformMask}
        self.json_data_transformed = {}
        self.to_table = {}

    def _get_mapping_data(self):
        return self.mapping_instance.get

    def _get_mapping_source_data(self):
        return self.source_instance.get

    def get_transformed_data(self):

        for mappings in self._get_mapping_data():

            """fetch the source mapping """
            mapping_source_id = mappings.get("mapping_source")
            mapping_destination_id = mappings.get("mapping_destination")
            mapping_transform_id = mappings.get("mapping_transform")
            mapping_table = mappings.get("destination_table")


            mapping_source_data = self.source_instance.get_data_by_id(id=mapping_source_id)
            transform_data = self.transform_instance.get_data_by_id(id=mapping_transform_id)

            """Fetch Source  field Name"""
            source_field_name = mapping_source_data.get("source_field_name")

            """if field given is not present incoming json """
            if source_field_name not in self.input_json.keys():
                if mapping_source_data.get("is_required"):
                    raise Exception(
                        "Alert ! Field {} is not present in JSON please FIX mappings ".format(source_field_name))
                else:
                    pass

            else:
                source_data_value = JsonQuery(
                    json_path=mapping_source_data.get("source_field_mapping"),
                    json_data=self.input_json
                ).get()

                """check the data type for source if matches with what we have """
                if mapping_source_data.get("source_field_type") != type(source_data_value).__name__:
                    if source_data_value is not None:
                        _message = (
                            "Alert ! Source Field :{} Datatype has changed from {} to {} ".format(source_field_name,
                                                                                                  mapping_source_data.get(
                                                                                                      "source_field_type"),
                                                                                                  type(
                                                                                                      source_data_value).__name__))
                        print(_message)
                        raise Exception(_message)

                """Query and fetch the Destination | target """
                destination_mappings_json_object = self.destination_instance.get_data_by_id(
                    id=mappings.get("mapping_destination"))

                destination_field_name = destination_mappings_json_object.get("destination_field_name")
                destination_field_type = destination_mappings_json_object.get("destination_field_type")

                # check if the dictionary is empty
                if destination_field_name in self.to_table:
                  self.to_table[destination_field_name].append(mapping_table)
                else:
                  self.to_table[destination_field_name] = [mapping_table]


                dtypes = [str, float, list, int, set, dict]

                for dtype in dtypes:

                    """Datatype Conversion """
                    if destination_field_type == str(dtype.__name__):

                        """is source is none insert default value"""
                        if source_data_value is None:
                            self.json_data_transformed[destination_field_name] = dtype.__call__(
                                destination_mappings_json_object.get("default_value")
                            )

                        else:
                            """check if you have items to transform"""
                            if transform_data is not None:
                                """ check for invalid mask name """
                                if transform_data.get("transform_mask") not in list(self.look_up_mask.keys()):
                                    raise Exception(
                                        f"Specified Transform {transform_data.get('transform_mask')} is not available please select from following Options :{list(self.look_up_mask.keys())}")
                                else:
                                    mask_apply = self.look_up_mask.get(transform_data.get("transform_mask"))
                                    converted_dtype = dtype.__call__(source_data_value)
                                    mask = f'converted_dtype{mask_apply}'
                                    curated_value = eval(mask)
                                    self.json_data_transformed[destination_field_name] = curated_value

                            else:
                                self.json_data_transformed[destination_field_name] = dtype.__call__(source_data_value)

        return self.json_data_transformed, self.to_table

In [13]:
transformed_data = []
for item in data:
    helper = STTM(input_json=item)
    response, mapping = helper.get_transformed_data()
    transformed_data.append(response)
    print(response)
print(mapping)

{'Transaction_Id': 1338, 'Transaction_Date': '42736', 'Store_City': 'New York', 'Staff_Id': 30, 'Staff_Name': 'Amela Chadwick', 'Customer_Id': 5465, 'Product_Id': 32, 'Product': 'Ethiopia Rg', 'Product_Category': 'Coffee', 'Quantity': 2, 'Unit_Price': 3.0}
{'Transaction_Id': 399, 'Transaction_Date': '42736', 'Store_City': 'New York', 'Staff_Id': 30, 'Staff_Name': 'Amela Chadwick', 'Customer_Id': 5895, 'Product_Id': 57, 'Product': 'Spicy Eye Opener Chai Lg', 'Product_Category': 'Tea', 'Quantity': 2, 'Unit_Price': 3.1}
{'Transaction_Id': 451, 'Transaction_Date': '42736', 'Store_City': 'New York', 'Staff_Id': 26, 'Staff_Name': 'Joelle Christen', 'Customer_Id': 5412, 'Product_Id': 59, 'Product': 'Dark Chocolate Lg', 'Product_Category': 'Drinking Chocolate', 'Quantity': 2, 'Unit_Price': 4.5}
{'Transaction_Id': 1170, 'Transaction_Date': '42736', 'Store_City': 'New York', 'Staff_Id': 12, 'Staff_Name': 'Britanni Jorden', 'Customer_Id': 5676, 'Product_Id': 22, 'Product': 'Our Old Time Diner Ble

In [14]:
pd.DataFrame(transformed_data)

Unnamed: 0,Transaction_Id,Transaction_Date,Store_City,Staff_Id,Staff_Name,Customer_Id,Product_Id,Product,Product_Category,Quantity,Unit_Price
0,1338,42736,New York,30,Amela Chadwick,5465,32,Ethiopia Rg,Coffee,2,3.00
1,399,42736,New York,30,Amela Chadwick,5895,57,Spicy Eye Opener Chai Lg,Tea,2,3.10
2,451,42736,New York,26,Joelle Christen,5412,59,Dark Chocolate Lg,Drinking Chocolate,2,4.50
3,1170,42736,New York,12,Britanni Jorden,5676,22,Our Old Time Diner Blend Sm,Coffee,1,2.00
4,248,42736,New York,30,Amela Chadwick,5479,57,Spicy Eye Opener Chai Lg,Tea,2,3.10
...,...,...,...,...,...,...,...,...,...,...,...
95,1872,42736,New York,45,Pandora Neville,8059,60,Sustainably Grown Organic Rg,Drinking Chocolate,1,3.75
96,842,42736,New York,12,Britanni Jorden,5392,59,Dark Chocolate Lg,Drinking Chocolate,1,4.50
97,550,42736,New York,30,Amela Chadwick,5531,36,Jamaican Coffee River Lg,Coffee,2,3.75
98,589,42736,New York,15,Remedios Mari,8036,51,Earl Grey Lg,Tea,2,3.00


In [15]:
mapping

{'Transaction_Id': ['Fact'],
 'Transaction_Date': ['Fact'],
 'Store_City': ['Fact'],
 'Staff_Id': ['Fact', 'Dim_Staff'],
 'Staff_Name': ['Dim_Staff'],
 'Customer_Id': ['Fact'],
 'Product_Id': ['Fact', 'Dim_Product'],
 'Product': ['Dim_Product'],
 'Product_Category': ['Dim_Product'],
 'Quantity': ['Fact'],
 'Unit_Price': ['Fact']}

Splitting to tables and download as csv

In [16]:
fact_table = pd.DataFrame(transformed_data)
fact_columns = [col for col, labels in mapping.items() if 'Fact' in labels]
fact = pd.DataFrame(fact_table[fact_columns])
fact

Unnamed: 0,Transaction_Id,Transaction_Date,Store_City,Staff_Id,Customer_Id,Product_Id,Quantity,Unit_Price
0,1338,42736,New York,30,5465,32,2,3.00
1,399,42736,New York,30,5895,57,2,3.10
2,451,42736,New York,26,5412,59,2,4.50
3,1170,42736,New York,12,5676,22,1,2.00
4,248,42736,New York,30,5479,57,2,3.10
...,...,...,...,...,...,...,...,...
95,1872,42736,New York,45,8059,60,1,3.75
96,842,42736,New York,12,5392,59,1,4.50
97,550,42736,New York,30,5531,36,2,3.75
98,589,42736,New York,15,8036,51,2,3.00


In [17]:
fact.to_csv('Fact.csv', index=False)

In [18]:
staff = pd.DataFrame(transformed_data)
staff_columns = [col for col, labels in mapping.items() if 'Dim_Staff' in labels]
Dim_Staff = pd.DataFrame(staff[staff_columns])
Dim_Staff = Dim_Staff.drop_duplicates()
Dim_Staff

Unnamed: 0,Staff_Id,Staff_Name
0,30,Amela Chadwick
2,26,Joelle Christen
3,12,Britanni Jorden
17,45,Pandora Neville
20,42,Kylie Candace
38,15,Remedios Mari


In [19]:
Dim_Staff.to_csv('Dim_Staff.csv', index=False)

In [20]:
product = pd.DataFrame(transformed_data)
product_columns = [col for col, labels in mapping.items() if 'Dim_Product' in labels]
Dim_Product = pd.DataFrame(product[product_columns])
Dim_Product = Dim_Product.drop_duplicates()
Dim_Product

Unnamed: 0,Product_Id,Product,Product_Category
0,32,Ethiopia Rg,Coffee
1,57,Spicy Eye Opener Chai Lg,Tea
2,59,Dark Chocolate Lg,Drinking Chocolate
3,22,Our Old Time Diner Blend Sm,Coffee
5,77,Oatmeal Scone,Bakery
7,28,Columbian Medium Roast Sm,Coffee
8,39,Latte Rg,Coffee
9,58,Dark Chocolate Rg,Drinking Chocolate
10,56,Spicy Eye Opener Chai Rg,Tea
11,33,Ethiopia Lg,Coffee


In [21]:
Dim_Product.to_csv('Dim_Product.csv', index=False)