In [1]:
!pip install jsonpath_ng

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [1]:
import os
import json
import numpy as np 
import pandas as pd 
import sqlite3
import functools as ft
import matplotlib.pyplot as plt
from abc import ABC, abstractmethod
from jsonpath_ng import parse
from enum import Enum
%matplotlib inline 

# Source to Target Mapping - OOP Approach for Single Table

### Data

In [2]:
with open("example_data.json", "r") as file:
    data = json.loads(json.load(file))
    for item in data:
        item["Salary"] = str(item["Salary"])
data[:10]

[{'jobTitle': '   SOFTWARE ENGINEER   ',
  'Salary': '22616',
  'Company': '   AMAZON'},
 {'jobTitle': 'BI DEVELOPER  ', 'Salary': '22978', 'Company': 'Amazon'},
 {'jobTitle': ' BI developer', 'Salary': '13295', 'Company': ' 1trn LLC  '},
 {'jobTitle': 'BI DEVELOPER  ', 'Salary': '20000', 'Company': 'VISION'},
 {'jobTitle': ' BI developer', 'Salary': '11442', 'Company': ' 1trn LLC  '},
 {'jobTitle': ' BI developer', 'Salary': '18047', 'Company': 'VISION'},
 {'jobTitle': ' BI developer', 'Salary': '23295', 'Company': '   AMAZON'},
 {'jobTitle': 'SOFTWARE ENGINEER         ',
  'Salary': '10182',
  'Company': 'VISION'},
 {'jobTitle': 'DATA analyst', 'Salary': '19540', 'Company': 'GOOGLE'},
 {'jobTitle': ' Data ANALYST', 'Salary': '14831', 'Company': ' 1trn LLC  '}]

###  Abstract Base Calss (ABC) for mutual methods

In [3]:
class Interface(ABC):

    @abstractmethod
    def get_data_by_field(self, field_name):
        """Fetch the data by given feild name """

    @abstractmethod
    def get_data_by_id(self, id):
        """Fetch the data by given ID  """

    @abstractmethod
    def get(self):
        """Fetch all data """

### Transform Operations
inherithed from Enum - class that automatic enumrate the variables

In [4]:
class TransformMask(Enum):
    # add here any masks you want 
    CLEAN_STRING = ".strip().lower()" 
    CAPITAL_LETTER = ".strip().lower().title()"
    

### Database Class - Define Common Properties for Source, Target, Mapping

In [5]:
class Database:
    def __init__(self):
        pass

    # built-in function that creates and returns a property object
    # get data by: get_data_source_target_mapping.get(dict_key)
    @property 
    def get_data_source_target_mapping(self):
        return {
            "source": [
                {
                    "id": 1,
                    "source_field_name": "jobTitle",
                    "source_field_mapping": "$.jobTitle",
                    "source_field_type": "str", # use python types
                    "is_required": True,

                },
                {
                    "id": 2,
                    "source_field_name": "Salary",
                    "source_field_mapping": "$.Salary",
                    "source_field_type": "str",
                    "is_required": True,

                }
            ],
            "destination": [
                {
                    "id": 1,
                    "destination_field_name": "job_title",
                    "destination_field_mapping": "job_title",
                    "destination_field_type": "str",
                    "default_value": "n/a",
                },
                {
                    "id": 2,
                    "destination_field_name": "salary",
                    "destination_field_mapping": "salary",
                    "destination_field_type": "float",
                    "default_value": "0"
                }

            ],
            "transform": [ # using the Enums 
                {
                    "id": 1,
                    "transform_mask": 'CAPITAL_LETTER'
                },
                {
                    "id": 54,
                    "transform_mask": 'CLEAN_STRING'
                }
            ],
            "mapping": [
                {
                    "id": 1,
                    "mapping_source": 1,
                    "mapping_destination": 1,
                    "mapping_transform": 1
                },
                {
                    "id": 2,
                    "mapping_source": 2,
                    "mapping_destination": 2,
                }
            ]
        } # How it would be better to manage it? 

### Source class

Inherited from Interface for the common methods and from Database for common variables

In [6]:
class Source(Interface, Database):
    def __init__(self):
        Database.__init__(self)

    # should be implemented - inherited from Interface
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("source")

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id") == self.id:
                return x
        return None

### Target class

Inherited from Interface for the common methods and from Database for common variables

In [7]:
class Target(Interface, Database):

    def __init__(self):
        Database.__init__(self)

    # should be implemented - inherited from Interface
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("destination")

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id").__str__() == self.id.__str__():
                return x
        return None

### Transform Class

Inherited from Interface for the common methods and from Database for common variables

In [8]:
class Transform(Interface, Database):

    def __init__(self):
        Database.__init__(self)

    # should be implemented - inherited from Interface
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("transform", [])

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id").__str__() == self.id.__str__():
                return x
        return None

### Mapping class

Inherited from Interface for the common methods and from Database for common variables

In [9]:
class Mappings(Interface, Database):

    def __init__(self):
        Database.__init__(self)

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("mapping")

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id").__str__() == self.id.__str__():
                return x
        return None

    def get_data_by_field(self, field_name):
        return None

### Format Class - JSON

Search the source data value inside a JSON file 

In [10]:
class JsonQuery:
    def __init__(self, json_path, json_data):
        self.json_path = json_path
        self.json_data = json_data

    def get(self):
        jsonpath_expression = parse(self.json_path)
        match = jsonpath_expression.find(self.json_data)
        source_data_value = match[0].value
        return source_data_value

### Combine it All - STTM

In [11]:
class STTM:
    def __init__(self, input_json):
        self.input_json = input_json
        self.mapping_instance = Mappings()
        self.source_instance = Source()
        self.destination_instance = Target()
        self.transform_instance = Transform()
        self.look_up_mask = {i.name: i.value for i in TransformMask}
        self.json_data_transformed = {}

    def _get_mapping_data(self):
        return self.mapping_instance.get

    def _get_mapping_source_data(self):
        return self.source_instance.get

    def get_transformed_data(self):

        for mappings in self._get_mapping_data():

            """fetch the source mapping """
            mapping_source_id = mappings.get("mapping_source")
            mapping_destination_id = mappings.get("mapping_destination")
            mapping_transform_id = mappings.get("mapping_transform")

            mapping_source_data = self.source_instance.get_data_by_id(id=mapping_source_id)
            transform_data = self.transform_instance.get_data_by_id(id=mapping_transform_id)

            """Fetch Source  field Name"""
            source_field_name = mapping_source_data.get("source_field_name")

            """if field given is not present incoming json """
            if source_field_name not in self.input_json.keys():
                if mapping_source_data.get("is_required"):
                    raise Exception(
                        "Alert ! Field {} is not present in JSON please FIX mappings ".format(source_field_name))
                else:
                    pass

            else:
                source_data_value = JsonQuery(
                    json_path=mapping_source_data.get("source_field_mapping"),
                    json_data=self.input_json
                ).get()

                """check the data type for source if matches with what we have """
                if mapping_source_data.get("source_field_type") != type(source_data_value).__name__:
                    if source_data_value is not None:
                        _message = (
                            "Alert ! Source Field :{} Datatype has changed from {} to {} ".format(source_field_name,
                                                                                                  mapping_source_data.get(
                                                                                                      "source_field_type"),
                                                                                                  type(
                                                                                                      source_data_value).__name__))
                        print(_message)
                        raise Exception(_message)

                """Query and fetch the Destination | target """
                destination_mappings_json_object = self.destination_instance.get_data_by_id(
                    id=mappings.get("mapping_destination"))

                destination_field_name = destination_mappings_json_object.get("destination_field_name")
                destination_field_type = destination_mappings_json_object.get("destination_field_type")

                dtypes = [str, float, list, int, set, dict]

                for dtype in dtypes:

                    """Datatype Conversion """
                    if destination_field_type == str(dtype.__name__):

                        """is source is none insert default value"""
                        if source_data_value is None:
                            self.json_data_transformed[destination_field_name] = dtype.__call__(
                                destination_mappings_json_object.get("default_value")
                            )

                        else:
                            """check if you have items to transform"""
                            if transform_data is not None:
                                """ check for invalid mask name """
                                if transform_data.get("transform_mask") not in list(self.look_up_mask.keys()):
                                    raise Exception(
                                        f"Specified Transform {transform_data.get('transform_mask')} is not available please select from following Options :{list(self.look_up_mask.keys())}")
                                else:
                                    mask_apply = self.look_up_mask.get(transform_data.get("transform_mask"))
                                    converted_dtype = dtype.__call__(source_data_value)
                                    mask = f'converted_dtype{mask_apply}'
                                    curated_value = eval(mask)
                                    self.json_data_transformed[destination_field_name] = curated_value

                            else:
                                self.json_data_transformed[destination_field_name] = dtype.__call__(source_data_value)

        return self.json_data_transformed

In [12]:
transformed_data = []
for item in data:
    helper = STTM(input_json=item)
    response = helper.get_transformed_data()
    transformed_data.append(response)
    print(response)

{'job_title': 'Software Engineer', 'salary': 22616.0}
{'job_title': 'Bi Developer', 'salary': 22978.0}
{'job_title': 'Bi Developer', 'salary': 13295.0}
{'job_title': 'Bi Developer', 'salary': 20000.0}
{'job_title': 'Bi Developer', 'salary': 11442.0}
{'job_title': 'Bi Developer', 'salary': 18047.0}
{'job_title': 'Bi Developer', 'salary': 23295.0}
{'job_title': 'Software Engineer', 'salary': 10182.0}
{'job_title': 'Data Analyst', 'salary': 19540.0}
{'job_title': 'Data Analyst', 'salary': 14831.0}
{'job_title': 'Bi Developer', 'salary': 23320.0}
{'job_title': 'Bi Developer', 'salary': 11631.0}
{'job_title': 'Bi Developer', 'salary': 24984.0}
{'job_title': 'Bi Developer', 'salary': 22239.0}
{'job_title': 'Data Analyst', 'salary': 12108.0}
{'job_title': 'Software Engineer', 'salary': 10242.0}
{'job_title': 'Bi Developer', 'salary': 20210.0}
{'job_title': 'Software Engineer', 'salary': 18814.0}
{'job_title': 'Data Analyst', 'salary': 24139.0}
{'job_title': 'Bi Developer', 'salary': 15423.0}


In [13]:
pd.DataFrame(transformed_data)

Unnamed: 0,job_title,salary
0,Software Engineer,22616.0
1,Bi Developer,22978.0
2,Bi Developer,13295.0
3,Bi Developer,20000.0
4,Bi Developer,11442.0
5,Bi Developer,18047.0
6,Bi Developer,23295.0
7,Software Engineer,10182.0
8,Data Analyst,19540.0
9,Data Analyst,14831.0


In [14]:
pd.DataFrame(transformed_data).groupby("job_title").mean()

Unnamed: 0_level_0,salary
job_title,Unnamed: 1_level_1
Bi Developer,18440.166667
Data Analyst,20239.153846
Software Engineer,16860.210526
