<a href="https://colab.research.google.com/github/TomCaspi/Bi-Project/blob/main/BI__FINAL_PROJECT_STTM_OOP_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [121]:
!pip install jsonpath_ng



In [122]:
import os
import json
import numpy as np
import pandas as pd
import sqlite3
import functools as ft
import matplotlib.pyplot as plt
from abc import ABC, abstractmethod
from jsonpath_ng import parse
from enum import Enum
%matplotlib inline

# Source to Target Mapping - OOP Approach for Single Table

In [123]:
police = pd.read_excel('BIDataForColab.xlsx')

### Data

In [124]:
police.to_json('output_file.json', orient='records')
with open('output_file.json', 'r') as file:
    data = json.load(file)
    for item in data:
      item["driver_age_raw"]=str(item["driver_age_raw"])
data [:1]

[{'id': 'CA-2013-0000001',
  'state': 'CA',
  'stop_date': 1356998400000,
  'stop_time': None,
  'location_raw': 'San Diego',
  'county_name': 'San Diego County',
  'county_fips': 6073.0,
  'fine_grained_location': None,
  'police_department': None,
  'driver_gender': 'M',
  'driver_age_raw': '25-32',
  'driver_age': None,
  'driver_race_raw': 'Hispanic',
  'driver_race': 'Hispanic',
  'violation_raw': 'Mechanical or Nonmoving Violation (VC)',
  'violation': 'Equipment',
  'search_conducted': False,
  'search_type_raw': 'No Search',
  'search_type': None,
  'contraband_found': 0.0,
  'stop_outcome': 'CHP 215',
  'is_arrested': False,
  'ethnicity': 'H'}]

###  Abstract Base Calss (ABC) for mutual methods

In [125]:
class Interface(ABC):

    @abstractmethod
    def get_data_by_field(self, field_name):
        """Fetch the data by given feild name """

    @abstractmethod
    def get_data_by_id(self, id):
        """Fetch the data by given ID  """

    @abstractmethod
    def get(self):
        """Fetch all data """

### Transform Operations
inherithed from Enum - class that automatic enumrate the variables

In [126]:
class TransformMask(Enum):
    # add here any masks you want
    CLEAN_STRING = ".strip().lower()"
    CAPITAL_LETTER = ".strip().lower().title()"


### Database Class - Define Common Properties for Source, Target, Mapping

In [127]:
class Database:
    def __init__(self):
        self.db = {
            "source": [],
            "destination" : [],
            "transform" : [],
            "mapping" : []
        }

        self.add_source(1,'id',"str")
        self.add_source(2,'id',"str")
        self.add_destination(1,"id","str")

        self.add_transform(1,'CLEAN_STRING')
        self.add_transform(2,'CAPITAL_LETTER')

        #self.add_mapping(1,1,1,2)
        #self.add_mapping(1,1,1,3)


    def add_source(self,id, field,ty):
        if ty not in ["str"]:
          print("Error source.")
        self.db["source"].append( {"id": id,
        "source_field_name":field ,
          "source_field_mapping": "$."+field,
          "source_field_type": ty, # use python types
          "is_required": True})


    def add_destination(self,id, field,ty):
        if ty not in ["BOOL","str"]:
          print("Error destination.")
        self.db["destination"].append(
            {         "id": id,
                      "destination_field_name": field,
                      "destination_field_mapping": field,
                      "destination_field_type": ty,
                      "default_value": "n/a",
                  })


    def add_transform(self,id, transform_mask):
        self.db["transform"].append({
                      "id": id,
                      "transform_mask": transform_mask
                  })


    def add_mapping(self,id, mapping_source,mapping_destination,mapping_transform):
        self.db["mapping"].append({
                      "id": id,
                      "mapping_source": mapping_source,
                      "mapping_destination": mapping_destination,
                      "mapping_transform": mapping_transform
                  })

    @property
    def get_data_source_target_mapping(self):
        return self.db


### Source class

Inherited from Interface for the common methods and from Database for common variables

In [128]:
class Source(Interface, Database):
    def __init__(self):
        Database.__init__(self)

    # should be implemented - inherited from Interface
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("source")

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id") == self.id:
                return x
        return None

### Target class

Inherited from Interface for the common methods and from Database for common variables

In [129]:
class Target(Interface, Database):

    def __init__(self):
        Database.__init__(self)

    # should be implemented - inherited from Interface
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("destination")

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id").__str__() == self.id.__str__():
                return x
        return None

### Transform Class

Inherited from Interface for the common methods and from Database for common variables

In [130]:
class Transform(Interface, Database):

    def __init__(self):
        Database.__init__(self)

    # should be implemented - inherited from Interface
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("transform", [])

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id").__str__() == self.id.__str__():
                return x
        return None

### Mapping class

Inherited from Interface for the common methods and from Database for common variables

In [131]:
class Mappings(Interface, Database):

    def __init__(self):
        Database.__init__(self)

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("mapping")

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id").__str__() == self.id.__str__():
                return x
        return None

    def get_data_by_field(self, field_name):
        return None

### Format Class - JSON

Search the source data value inside a JSON file

In [132]:
class JsonQuery:
    def __init__(self, json_path, json_data):
        self.json_path = json_path
        self.json_data = json_data

    def get(self):
        jsonpath_expression = parse(self.json_path)
        match = jsonpath_expression.find(self.json_data)
        source_data_value = match[0].value
        return source_data_value

### Combine it All - STTM

In [133]:
class STTM:
    def __init__(self, input_json):
        self.input_json = input_json
        self.mapping_instance = Mappings()
        self.source_instance = Source()
        self.destination_instance = Target()
        self.transform_instance = Transform()
        self.look_up_mask = {i.name: i.value for i in TransformMask}
        self.json_data_transformed = {}
        self.to_table = {}

    def _get_mapping_data(self):
        return self.mapping_instance.get

    def _get_mapping_source_data(self):
        return self.source_instance.get

    def get_transformed_data(self):

        for mappings in self._get_mapping_data():

            """fetch the source mapping """
            mapping_source_id = mappings.get("mapping_source")
            mapping_destination_id = mappings.get("mapping_destination")
            mapping_transform_id = mappings.get("mapping_transform")
            mapping_table = mappings.get("destination_table")

            mapping_source_data = self.source_instance.get_data_by_id(id=mapping_source_id)
            transform_data = self.transform_instance.get_data_by_id(id=mapping_transform_id)

            """Fetch Source  field Name"""
            source_field_name = mapping_source_data.get("source_field_name")

            """if field given is not present incoming json """
            if source_field_name not in self.input_json.keys():
                if mapping_source_data.get("is_required"):
                    raise Exception(
                        "Alert ! Field {} is not present in JSON please FIX mappings ".format(source_field_name))
                else:
                    pass

            else:
                source_data_value = JsonQuery(
                    json_path=mapping_source_data.get("source_field_mapping"),
                    json_data=self.input_json
                ).get()

                """check the data type for source if matches with what we have """
                if mapping_source_data.get("source_field_type") != type(source_data_value).__name__:
                    if source_data_value is not None:
                        _message = (
                            "Alert ! Source Field :{} Datatype has changed from {} to {} ".format(source_field_name,
                                                                                                  mapping_source_data.get(
                                                                                                      "source_field_type"),
                                                                                                  type(
                                                                                                      source_data_value).__name__))
                        print(_message)
                        raise Exception(_message)

                """Query and fetch the Destination | target """
                destination_mappings_json_object = self.destination_instance.get_data_by_id(
                    id=mappings.get("mapping_destination"))

                destination_field_name = destination_mappings_json_object.get("destination_field_name")
                destination_field_type = destination_mappings_json_object.get("destination_field_type")
                self.to_table[destination_field_name] = mapping_table

                dtypes = [str, float, list, int, set, dict]

                for dtype in dtypes:

                    """Datatype Conversion """
                    if destination_field_type == str(dtype.__name__):

                        """is source is none insert default value"""
                        if source_data_value is None:
                            self.json_data_transformed[destination_field_name] = dtype.__call__(
                                destination_mappings_json_object.get("default_value")
                            )

                        else:
                            """check if you have items to transform"""
                            if transform_data is not None:
                                """ check for invalid mask name """
                                if transform_data.get("transform_mask") not in list(self.look_up_mask.keys()):
                                    raise Exception(
                                        f"Specified Transform {transform_data.get('transform_mask')} is not available please select from following Options :{list(self.look_up_mask.keys())}")
                                else:
                                    mask_apply = self.look_up_mask.get(transform_data.get("transform_mask"))
                                    converted_dtype = dtype.__call__(source_data_value)
                                    mask = f'converted_dtype{mask_apply}'
                                    curated_value = eval(mask)
                                    self.json_data_transformed[destination_field_name] = curated_value

                            else:
                                self.json_data_transformed[destination_field_name] = dtype.__call__(source_data_value)

        return self.json_data_transformed

In [134]:
transformed_data = []
for item in data:
    helper = STTM(input_json=item)
    response = helper.get_transformed_data()
    transformed_data.append(response)
    print(response)

AttributeError: ignored

In [None]:
mapping

In [None]:
df = pd.DataFrame(transformed_data)
#df.to_excel('A.xlsx')

In [None]:
aaa = {
    "A": 2,
    "B": 5
}
aaa

In [None]:
aaa["C"] = 7
aaa

In [None]:
aaa["B"] = 10

In [None]:
aaa

In [None]:
aaa["B"] = [5,10]
aaa