In [24]:
!pip install jsonpath_ng

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [25]:
import os
import json
import numpy as np 
import pandas as pd 
import sqlite3
import functools as ft
import matplotlib.pyplot as plt
from abc import ABC, abstractmethod
from jsonpath_ng import parse
from enum import Enum
%matplotlib inline 

# Source to Target Mapping - OOP Approach for Single Table

### Data

In [26]:

import json
with open("Glassdoor Gender.json", "r") as file:
    data = json.load(file)
    for item in data:
        item["JobTitle"] = str(item["JobTitle"])
data[:10]

[{'JobTitle': 'Graphic Designer',
  'Gender': 'Female',
  'Age': 18,
  'PerfEval': 5,
  'Education': 'College',
  'Dept': 'Operations',
  'Seniority': 2,
  'Salary': 42363,
  'Bonus': 9938,
  'Require_level': 'Junior',
  'ID_employee': 6789},
 {'JobTitle': 'Warehouse Associate',
  'Gender': 'Female',
  'Age': 19,
  'PerfEval': 4,
  'Education': 'PhD',
  'Dept': 'Administration',
  'Seniority': 5,
  'Salary': 90208,
  'Bonus': 9268,
  'Require_level': 'medium',
  'ID_employee': 6790},
 {'JobTitle': 'IT',
  'Gender': 'Female',
  'Age': 20,
  'PerfEval': 5,
  'Education': 'PhD',
  'Dept': 'Operations',
  'Seniority': 4,
  'Salary': 70890,
  'Bonus': 10126,
  'Require_level': 'medium',
  'ID_employee': 6791},
 {'JobTitle': 'Graphic Designer',
  'Gender': 'Female',
  'Age': 20,
  'PerfEval': 5,
  'Education': 'College',
  'Dept': 'Sales',
  'Seniority': 4,
  'Salary': 67585,
  'Bonus': 10541,
  'Require_level': 'medium',
  'ID_employee': 6792},
 {'JobTitle': 'Graphic Designer',
  'Gender': 

###  Abstract Base Calss (ABC) for mutual methods

In [27]:
class Interface(ABC):

    @abstractmethod
    def get_data_by_field(self, field_name):
        """Fetch the data by given feild name """

    @abstractmethod
    def get_data_by_id(self, id):
        """Fetch the data by given ID  """

    @abstractmethod
    def get(self):
        """Fetch all data """

### Transform Operations
inherithed from Enum - class that automatic enumrate the variables

In [28]:
class TransformMask(Enum):
    # add here any masks you want
    CLEAN_STRING = ".strip().lower()" 
    CAPITAL_LETTER = ".strip().lower().title()"
    CLEAN_NUM = ""
  

### Database Class - Define Common Properties for Source, Target, Mapping

In [29]:
class Database:
    def __init__(self):
        self.db = {
            "source": [],
            "destination": [],
            "transform": [],
            "mapping": []
        }
        self.add_source(1, "JobTitle", "str")
        self.add_destination(1, "JobTitle", "str")
        self.add_transform(1, "CLEAN_STRING")
        self.add_mapping(1, 1, 1, 1)
        self.add_source(8, "Salary", "int")
        self.add_destination(8, "Salary", "float")
        self.add_transform(8, "CLEAN_NUM")
        self.add_mapping(8, 8, 8, 8)
        self.add_source(9, "Bonus", "int")
        self.add_destination(9, "Bonus", "float")
        self.add_transform(9, "CLEAN_NUM")
        self.add_mapping(9, 9, 9, 9)
        self.add_source(11, "ID_employee", "int")
        self.add_destination(11, "ID_employee", "float")
        self.add_transform(11, "CLEAN_NUM")
        self.add_mapping(11, 11, 11, 11)
        

    
    def add_source(self, id, field, typ):
            self.db["source"].append({
                "id": id,
                "source_field_name": field,
                "source_field_mapping": field,
                "source_field_type": typ,
                "is_required": True,
            })
    
    def add_destination(self, id, field, typ):
        self.db["destination"].append({
            "id": id,
            "destination_field_name": field,
            "destination_field_mapping": field,
            "destination_field_type": typ,
            "default_value": "n/a", 
        })

    def add_transform(self, id, mask):
        self.db["transform"].append({
            "id": id,
            "transform_mask": mask,
        })

    def add_mapping(self, id, source, dest, transform):
      self.db["mapping"].append({
          "id": id,
          "mapping_source": source,
          "mapping_destination": dest,
          "mapping_transform": transform,
        })


    @property
    def get_data_source_target_mapping(self):
      return self.db


### Source class

Inherited from Interface for the common methods and from Database for common variables

In [30]:
class Source(Interface, Database):
    def __init__(self):
        Database.__init__(self)

    # should be implemented - inherited from Interface
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("source")

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id") == self.id:
                return x
        return None

### Target class

Inherited from Interface for the common methods and from Database for common variables

In [31]:
class Target(Interface, Database):

    def __init__(self):
        Database.__init__(self)

    # should be implemented - inherited from Interface
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("destination")

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id").__str__() == self.id.__str__():
                return x
        return None

### Transform Class

Inherited from Interface for the common methods and from Database for common variables

In [32]:
class Transform(Interface, Database):

    def __init__(self):
        Database.__init__(self)

    # should be implemented - inherited from Interface
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("transform", [])

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id").__str__() == self.id.__str__():
                return x
        return None

### Mapping class

Inherited from Interface for the common methods and from Database for common variables

In [33]:
class Mappings(Interface, Database):

    def __init__(self):
        Database.__init__(self)

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("mapping")

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id").__str__() == self.id.__str__():
                return x
        return None

    def get_data_by_field(self, field_name):
        return None

### Format Class - JSON

Search the source data value inside a JSON file 

In [34]:
class JsonQuery:
    def __init__(self, json_path, json_data):
        self.json_path = json_path
        self.json_data = json_data

    def get(self):
        jsonpath_expression = parse(self.json_path)
        match = jsonpath_expression.find(self.json_data)
        source_data_value = match[0].value
        return source_data_value

### Combine it All - STTM

In [35]:
class STTM:
    def __init__(self, input_json):
        self.input_json = input_json
        self.mapping_instance = Mappings()
        self.source_instance = Source()
        self.destination_instance = Target()
        self.transform_instance = Transform()
        self.look_up_mask = {i.name: i.value for i in TransformMask}
        self.json_data_transformed = {}

    def _get_mapping_data(self):
        return self.mapping_instance.get

    def _get_mapping_source_data(self):
        return self.source_instance.get

    def get_transformed_data(self):

        for mappings in self._get_mapping_data():

            """fetch the source mapping """
            mapping_source_id = mappings.get("mapping_source")
            mapping_destination_id = mappings.get("mapping_destination")
            mapping_transform_id = mappings.get("mapping_transform")

            mapping_source_data = self.source_instance.get_data_by_id(id=mapping_source_id)
            transform_data = self.transform_instance.get_data_by_id(id=mapping_transform_id)

            """Fetch Source  field Name"""
            source_field_name = mapping_source_data.get("source_field_name")

            """if field given is not present incoming json """
            if source_field_name not in self.input_json.keys():
                if mapping_source_data.get("is_required"):
                    raise Exception(
                        "Alert ! Field {} is not present in JSON please FIX mappings ".format(source_field_name))
                else:
                    pass

            else:
                source_data_value = JsonQuery(
                    json_path=mapping_source_data.get("source_field_mapping"),
                    json_data=self.input_json
                ).get()

                """check the data type for source if matches with what we have """
                if mapping_source_data.get("source_field_type") != type(source_data_value).__name__:
                    if source_data_value is not None:
                        _message = (
                            "Alert ! Source Field :{} Datatype has changed from {} to {} ".format(source_field_name,
                                                                                                  mapping_source_data.get(
                                                                                                      "source_field_type"),
                                                                                                  type(
                                                                                                      source_data_value).__name__))
                        print(_message)
                        raise Exception(_message)

                """Query and fetch the Destination | target """
                destination_mappings_json_object = self.destination_instance.get_data_by_id(
                    id=mappings.get("mapping_destination"))

                destination_field_name = destination_mappings_json_object.get("destination_field_name")
                destination_field_type = destination_mappings_json_object.get("destination_field_type")

                dtypes = [str, float, list, int, set, dict]

                for dtype in dtypes:

                    """Datatype Conversion """
                    if destination_field_type == str(dtype.__name__):

                        """is source is none insert default value"""
                        if source_data_value is None:
                            self.json_data_transformed[destination_field_name] = dtype.__call__(
                                destination_mappings_json_object.get("default_value")
                            )

                        else:
                            """check if you have items to transform"""
                            if transform_data is not None:
                                """ check for invalid mask name """
                                if transform_data.get("transform_mask") not in list(self.look_up_mask.keys()):
                                    raise Exception(
                                        f"Specified Transform {transform_data.get('transform_mask')} is not available please select from following Options :{list(self.look_up_mask.keys())}")
                                else:
                                    mask_apply = self.look_up_mask.get(transform_data.get("transform_mask"))
                                    converted_dtype = dtype.__call__(source_data_value)
                                    mask = f'converted_dtype{mask_apply}'
                                    curated_value = eval(mask)
                                    self.json_data_transformed[destination_field_name] = curated_value

                            else:
                                self.json_data_transformed[destination_field_name] = dtype.__call__(source_data_value)

        return self.json_data_transformed

In [36]:
transformed_data = []
for item in data:
    helper = STTM(input_json=item)
    response = helper.get_transformed_data()
    transformed_data.append(response)
    print(response)

{'JobTitle': 'graphic designer', 'Salary': 42363.0, 'Bonus': 9938.0, 'ID_employee': 6789.0}
{'JobTitle': 'warehouse associate', 'Salary': 90208.0, 'Bonus': 9268.0, 'ID_employee': 6790.0}
{'JobTitle': 'it', 'Salary': 70890.0, 'Bonus': 10126.0, 'ID_employee': 6791.0}
{'JobTitle': 'graphic designer', 'Salary': 67585.0, 'Bonus': 10541.0, 'ID_employee': 6792.0}
{'JobTitle': 'graphic designer', 'Salary': 112976.0, 'Bonus': 9836.0, 'ID_employee': 6793.0}
{'JobTitle': 'sales associate', 'Salary': 106524.0, 'Bonus': 9941.0, 'ID_employee': 6794.0}
{'JobTitle': 'driver', 'Salary': 62759.0, 'Bonus': 10124.0, 'ID_employee': 6795.0}
{'JobTitle': 'financial analyst', 'Salary': 84007.0, 'Bonus': 8990.0, 'ID_employee': 6796.0}
{'JobTitle': 'warehouse associate', 'Salary': 86220.0, 'Bonus': 9583.0, 'ID_employee': 6797.0}
{'JobTitle': 'warehouse associate', 'Salary': 95584.0, 'Bonus': 9745.0, 'ID_employee': 6798.0}
{'JobTitle': 'marketing associate', 'Salary': 73357.0, 'Bonus': 10334.0, 'ID_employee': 67

In [None]:
pd.DataFrame(transformed_data)

Unnamed: 0,id,Bonus
0,6789,9938.0
1,6790,9268.0
2,6791,10126.0
3,6792,10541.0
4,6793,9836.0
...,...,...
94,6886,7567.0
95,6887,7169.0
96,6888,7392.0
97,6889,7285.0
