In [17]:
!pip install jsonpath_ng


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [18]:
import os
import csv
import json
import numpy as np 
import pandas as pd 
import sqlite3
import functools as ft
import matplotlib.pyplot as plt
from abc import ABC, abstractmethod
from jsonpath_ng import parse
from enum import Enum
%matplotlib inline 

### Source to Target Mapping - OOP Approach for Single Table


Data

In [19]:
with open("factTableShorter.csv", "r", encoding="utf-8-sig") as file:
    data = list(csv.DictReader(file))
    for item in data:
        item["TotalPrice"] =str(item["TotalPrice"])
data[:10]

[{'TransactionNo': '581482', 'CustomerNo': '17490', 'TotalPrice': '257.64'},
 {'TransactionNo': '581475', 'CustomerNo': '13069', 'TotalPrice': '383.4'},
 {'TransactionNo': '581475', 'CustomerNo': '13069', 'TotalPrice': '138.36'},
 {'TransactionNo': '581475', 'CustomerNo': '13069', 'TotalPrice': '127.8'},
 {'TransactionNo': '581475', 'CustomerNo': '13069', 'TotalPrice': '71.64'},
 {'TransactionNo': '581475', 'CustomerNo': '13069', 'TotalPrice': '255.6'},
 {'TransactionNo': '581475', 'CustomerNo': '13069', 'TotalPrice': '207.54'},
 {'TransactionNo': '581475', 'CustomerNo': '13069', 'TotalPrice': '147'},
 {'TransactionNo': '581475', 'CustomerNo': '13069', 'TotalPrice': '127.8'},
 {'TransactionNo': '581475', 'CustomerNo': '13069', 'TotalPrice': '253.2'}]

###  Abstract Base Calss (ABC) for mutual methods

In [20]:
class Interface(ABC):

    @abstractmethod
    def get_data_by_field(self, field_name):
        """Fetch the data by given feild name """

    @abstractmethod
    def get_data_by_id(self, id):
        """Fetch the data by given ID  """

    @abstractmethod
    def get(self):
        """Fetch all data """

### Transform Operations
inherithed from Enum - class that automatic enumrate the variables

In [21]:
class TransformMask(Enum):
    # add here any masks you want 
    CLEAN_STRING = ".strip().lower()" 
    CAPITAL_LETTER = ".strip().lower().title()"
    

Database Class - Define Common Properties for Source, Target, Mapping

In [22]:
class Database:
    def __init__(self):
        self.db = {
            "source": [],
            "destination": [],
            "transform": [],
            "mapping": []
        }
        self.add_source(1,"TransactionNo","$.TransactionNo","str",True)
        self.add_source(2,"CustomerNo","$.CustomerNo","str",True)
        self.add_source(3,"TotalPrice","$.TotalPrice","str",True)
        self.add_destination(1,"TransactionNo","TransactionNo","str","n/a")
        self.add_destination(2,"CustomerNo","CustomerNo","str","n/a")
        self.add_destination(3,"TotalPrice","TotalPrice","float","0")
        self.add_transform(1,'CLEAN_STRING')
        self.add_mapping(1,1,1,1)
        self.add_mapping(2,2,2,1)
        self.add_mapping(3,3,3)

    def add_source(self,id,source_field_name,source_field_mapping,source_field_type,is_required):
      if source_field_type not in ["str", "float", "list", "int", "set", "dict"]:
        print("Error")
      if not isinstance(is_required, bool):
        print("Error")
      else:
        source = {
            "id": id,
            "source_field_name": source_field_name,
            "source_field_mapping": source_field_mapping,
            "source_field_type": source_field_type,
            "is_required": is_required
        }
        self.db["source"].append(source)
        
    def add_destination(self,id,destination_field_name,destination_field_mapping,destination_field_type,default_value):
      if destination_field_type not in ["str", "float", "list", "int", "set", "dict"]:
        print("Error")
      else:
        destination = {
              "id": id,
              "destination_field_name": destination_field_name,
              "destination_field_mapping": destination_field_mapping,
              "destination_field_type": destination_field_type,
              "default_value": default_value
          }
        self.db["destination"].append(destination)

    def add_transform(self,id,transform_mask):
      transform={
          "id": id,
          "transform_mask": transform_mask
      }
      self.db["transform"].append(transform)

    def add_mapping(self,id,mapping_source,mapping_destination,mapping_transform=None):
      if mapping_transform is not None:
        mapping={
          "id": id,
          "mapping_source": mapping_source,
          "mapping_destination": mapping_destination,
          "mapping_transform": mapping_transform
        }
      else:
        mapping={
          "id": id,
          "mapping_source": mapping_source,
          "mapping_destination": mapping_destination
          }
      self.db["mapping"].append(mapping)

    @property 
    def get_data_source_target_mapping(self):
        return self.db

## **Source class**
Inherited from Interface for the common methods and from Database for common variables

In [23]:
class Source(Interface, Database):
    def __init__(self):
        Database.__init__(self)

    # should be implemented - inherited from Interface
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("source")

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id") == self.id:
                return x
        return None

## **Target class**
Inherited from Interface for the common methods and from Database for common variables

In [24]:
class Target(Interface, Database):

    def __init__(self):
        Database.__init__(self)

    # should be implemented - inherited from Interface
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("destination")

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id").__str__() == self.id.__str__():
                return x
        return None

## **Transform Class**
Inherited from Interface for the common methods and from Database for common variables

In [25]:
class Transform(Interface, Database):

    def __init__(self):
        Database.__init__(self)

    # should be implemented - inherited from Interface
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("transform", [])

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id").__str__() == self.id.__str__():
                return x
        return None

## **Mapping class**
Inherited from Interface for the common methods and from Database for common variables

In [26]:
class Mappings(Interface, Database):

    def __init__(self):
        Database.__init__(self)

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("mapping")

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id").__str__() == self.id.__str__():
                return x
        return None

    def get_data_by_field(self, field_name):
        return None

### Format Class - JSON

Search the source data value inside a JSON file 

In [27]:
class JsonQuery:
    def __init__(self, json_path, json_data):
        self.json_path = json_path
        self.json_data = json_data

    def get(self):
        jsonpath_expression = parse(self.json_path)
        match = jsonpath_expression.find(self.json_data)
        source_data_value = match[0].value
        return source_data_value

### Combine it All - STTM

In [28]:
class STTM:
    def __init__(self, input_json):
        self.input_json = input_json
        self.mapping_instance = Mappings()
        self.source_instance = Source()
        self.destination_instance = Target()
        self.transform_instance = Transform()
        self.look_up_mask = {i.name: i.value for i in TransformMask}
        self.json_data_transformed = {}

    def _get_mapping_data(self):
        return self.mapping_instance.get

    def _get_mapping_source_data(self):
        return self.source_instance.get

    def get_transformed_data(self):

        for mappings in self._get_mapping_data():

            """fetch the source mapping """
            mapping_source_id = mappings.get("mapping_source")
            mapping_destination_id = mappings.get("mapping_destination")
            mapping_transform_id = mappings.get("mapping_transform")

            mapping_source_data = self.source_instance.get_data_by_id(id=mapping_source_id)
            transform_data = self.transform_instance.get_data_by_id(id=mapping_transform_id)

            """Fetch Source  field Name"""
            source_field_name = mapping_source_data.get("source_field_name")

            """if field given is not present incoming json """
            if source_field_name not in self.input_json.keys():
                if mapping_source_data.get("is_required"):
                    raise Exception(
                        "Alert ! Field {} is not present in JSON please FIX mappings ".format(source_field_name))
                else:
                    pass

            else:
                source_data_value = JsonQuery(
                    json_path=mapping_source_data.get("source_field_mapping"),
                    json_data=self.input_json
                ).get()

                """check the data type for source if matches with what we have """
                if mapping_source_data.get("source_field_type") != type(source_data_value).__name__:
                    if source_data_value is not None:
                        _message = (
                            "Alert ! Source Field :{} Datatype has changed from {} to {} ".format(source_field_name,
                                                                                                  mapping_source_data.get(
                                                                                                      "source_field_type"),
                                                                                                  type(
                                                                                                      source_data_value).__name__))
                        print(_message)
                        raise Exception(_message)

                """Query and fetch the Destination | target """
                destination_mappings_json_object = self.destination_instance.get_data_by_id(
                    id=mappings.get("mapping_destination"))
               # print(mappings.get("mapping_destination"))
                destination_field_name = destination_mappings_json_object.get("destination_field_name")
                destination_field_type = destination_mappings_json_object.get("destination_field_type")

                dtypes = [str, float, list, int, set, dict]

                for dtype in dtypes:

                    """Datatype Conversion """
                    if destination_field_type == str(dtype.__name__):

                        """is source is none insert default value"""
                        if source_data_value is None:
                            self.json_data_transformed[destination_field_name] = dtype.__call__(
                                destination_mappings_json_object.get("default_value")
                            )

                        else:
                            """check if you have items to transform"""
                            if transform_data is not None:
                                """ check for invalid mask name """
                                if transform_data.get("transform_mask") not in list(self.look_up_mask.keys()):
                                    raise Exception(
                                        f"Specified Transform {transform_data.get('transform_mask')} is not available please select from following Options :{list(self.look_up_mask.keys())}")
                                else:
                                    mask_apply = self.look_up_mask.get(transform_data.get("transform_mask"))
                                    converted_dtype = dtype.__call__(source_data_value)
                                    mask = f'converted_dtype{mask_apply}'
                                    curated_value = eval(mask)
                                    self.json_data_transformed[destination_field_name] = curated_value

                            else:
                                self.json_data_transformed[destination_field_name] = dtype.__call__(source_data_value)

        return self.json_data_transformed

In [29]:
transformed_data = []
for item in data:
    helper = STTM(input_json=item)
    response = helper.get_transformed_data()
    transformed_data.append(response)
    print(response)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
{'TransactionNo': '580680', 'CustomerNo': '13001', 'TotalPrice': 21.72}
{'TransactionNo': '580680', 'CustomerNo': '13001', 'TotalPrice': 43.44}
{'TransactionNo': '580680', 'CustomerNo': '13001', 'TotalPrice': 1390.08}
{'TransactionNo': '580680', 'CustomerNo': '13001', 'TotalPrice': 21.72}
{'TransactionNo': '580680', 'CustomerNo': '13001', 'TotalPrice': 144.8}
{'TransactionNo': '580680', 'CustomerNo': '13001', 'TotalPrice': 86.88}
{'TransactionNo': '580680', 'CustomerNo': '13001', 'TotalPrice': 86.88}
{'TransactionNo': '580680', 'CustomerNo': '13001', 'TotalPrice': 43.44}
{'TransactionNo': '580680', 'CustomerNo': '13001', 'TotalPrice': 45.96}
{'TransactionNo': '580680', 'CustomerNo': '13001', 'TotalPrice': 91.92}
{'TransactionNo': '580680', 'CustomerNo': '13001', 'TotalPrice': 22.98}
{'TransactionNo': '580680', 'CustomerNo': '13001', 'TotalPrice': 24.52}
{'TransactionNo': '580680', 'CustomerNo': '13001', 'TotalPrice': 24.5

In [30]:
pd.DataFrame(transformed_data)

Unnamed: 0,TransactionNo,CustomerNo,TotalPrice
0,581482,17490,257.64
1,581475,13069,383.40
2,581475,13069,138.36
3,581475,13069,127.80
4,581475,13069,71.64
...,...,...,...
19994,580160,12700,49.52
19995,580160,12700,49.52
19996,580160,12700,12.38
19997,580160,12700,123.80


In [31]:
pd.DataFrame(transformed_data).groupby("CustomerNo").mean()

  pd.DataFrame(transformed_data).groupby("CustomerNo").mean()


Unnamed: 0_level_0,TotalPrice
CustomerNo,Unnamed: 1_level_1
12256,106.348548
12347,110.198182
12358,148.560000
12362,46.976552
12364,122.766923
...,...
18237,65.543750
18272,68.359130
18273,120.800000
18283,17.570600


In [33]:
mean_data = pd.DataFrame(transformed_data).groupby("CustomerNo").mean()

# Sort Total price mean values in ascending order
sorted_data = mean_data.sort_values(by="TotalPrice", ascending=False)

print(sorted_data)

               TotalPrice
CustomerNo               
16446       250679.525000
15195         8690.760000
17949         8154.000000
16000         3514.544444
17857         2411.005000
...                   ...
17426          -74.280000
12967          -99.239231
16019         -103.992000
14298         -664.393333
na           -1671.300000

[549 rows x 1 columns]


  mean_data = pd.DataFrame(transformed_data).groupby("CustomerNo").mean()


We get values with (-) since those are cancelled transactions...

\
