In [4]:
!pip install jsonpath_ng

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [50]:
import os
import json
import numpy as np 
import pandas as pd 
import sqlite3
import functools as ft
import matplotlib.pyplot as plt
from abc import ABC, abstractmethod
from jsonpath_ng import parse
from enum import Enum
%matplotlib inline 


In [67]:
with open("nett.json", "r") as file:
    data = json.load(file)
    for item in data:
        item["Title"] = str(item["Title"])
        item["IMDbScore"] = str(item["IMDbScore"])
        item["IMDbVotes"] = str(item["IMDbVotes"])
        item["AwardsReceived"] = str(item["AwardsReceived"])
        item["NetflixReleaseDate"] = str(item["NetflixReleaseDate"])
        item["ProductionHouse"] = str(item["ProductionHouse"])

        
data[:5]

[{'Title': 'Lets Fight Ghost',
  'Genre': 'Crime, Drama, Fantasy, Horror, Romance',
  'Languages': 'Swedish, Spanish',
  'SeriesOrMovie': 'Series',
  'HiddenGemScore': 4.3,
  'Runtime': '< 30 minutes',
  'Director': 'Tomas Alfredson',
  'Writer': 'John Ajvide Lindqvist',
  'Actors': 'Lina Leandersson, Kåre Hedebrant, Per Ragnar, Henrik Dahl',
  'ViewRating': 'R',
  'IMDbScore': '7.9',
  'RottenTomatoesScore': 98,
  'MetacriticScore': 82,
  'AwardsReceived': '74',
  'AwardsNominatedFor': 57,
  'Boxoffice': 2122065,
  'ReleaseDate': '##########',
  'NetflixReleaseDate': '2021-03-04',
  'ProductionHouse': 'Canal+, Sandrew Metronome',
  'NetflixLink': 'https://www.netflix.com/watch/81415947',
  'IMDbLink': 'https://www.imdb.com/title/tt1139797',
  'IMDbVotes': '205926',
  'Image': 'https://occ-0-4708-64.1.nflxso.net/dnm/api/v6/evlCitJPPCVCry0BZlEFb5-QjKc/AAAABcmgLCxN8dNahdY2kgd1hhcL2a6XrE92x24Bx5h6JFUvH5zMrv6lFWl_aWMt33b6DHvkgsUeDx_8Q1rmopwT3fuF8Rq3S1hrkvFf3uzVv2sb3zrtU-LM1Zy1FfrAKD3nKNyA_

In [68]:
class Interface(ABC):

    @abstractmethod
    def get_data_by_field(self, field_name):
        """Fetch the data by given feild name """

    @abstractmethod
    def get_data_by_id(self, id):
        """Fetch the data by given ID  """

    @abstractmethod
    def get(self):
        """Fetch all data """

In [69]:
class TransformMask(Enum):
    CLEAN_STRING = ".strip().lower()" 
    CAPITAL_LETTER = ".strip().lower().title()"
    


In [70]:
keysList = ['Title', 'IMDbScore', 'IMDbVotes', 'AwardsReceived', 'NetflixReleaseDate','ProductionHouse'] 

class Database:
    def __init__(self):
        self.db = {}

    def add_source(self):
        self.db["source"] = []
        for i in range(1, len(keysList) + 1):
            self.db["source"].append({
                "id": i,
                "source_field_name": keysList[i - 1],
                "source_field_mapping": keysList[i - 1],
                "source_field_type": "str",  
                "is_required": True,
            })

    def add_destination(self):
        self.db["destination"] = []
        for i in range(1, len(keysList) + 1):
            self.db["destination"].append({
                "id": i,
                "destination_field_name": keysList[i - 1],
                "destination_field_mapping": keysList[i - 1],
                "destination_field_type": "str",
                "default_value": "n/a",
            })

    def add_transform(self):
        self.db["transform"] = [
            {
                "id": 1,
                "transform_mask": 'CAPITAL_LETTER'
            },
            {
                "id": 2,
                "transform_mask": 'CLEAN_STRING'
            }
        ]

    def add_mapping(self):
        self.db["mapping"] = []
        for i in range(1, len(keysList) + 1):
            self.db["mapping"].append(
                {
                    "id": i,
                    "mapping_source": i,
                    "mapping_destination": i,
                    "mapping_transform": 1
                }
            )

    def data_source_target_mapping(self):
        self.add_source()
        self.add_destination()
        self.add_transform()
        self.add_mapping()

    @property
    def get_data_source_target_mapping(self):
        self.data_source_target_mapping()
        return self.db
      
    

In [71]:
class Source(Interface, Database):
    def __init__(self):
        Database.__init__(self)
    
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("source")

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id") == self.id:
                return x
        return None

In [72]:
class Target(Interface, Database):

    def __init__(self):
        Database.__init__(self)
    
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None


    @property
    def get(self):
        return self.get_data_source_target_mapping.get("destination")
    

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id").__str__() == self.id.__str__():
                return x
        return None

In [73]:
class Transform(Interface, Database):

    def __init__(self):
        Database.__init__(self)
    
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("transform", [])

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id").__str__() == self.id.__str__():
                return x
        return None

In [74]:
class Mappings(Interface, Database):

    def __init__(self):
        Database.__init__(self)
    
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None
    @property
    def get(self):
        return self.get_data_source_target_mapping.get("mapping")

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id").__str__() == self.id.__str__():
                return x
        return None

    def get_data_by_field(self, field_name):
        return None

In [75]:
class JsonQuery:
    def __init__(self, json_path, json_data):
        self.json_path = json_path
        self.json_data = json_data

    def get(self):
        jsonpath_expression = parse(self.json_path)
        match = jsonpath_expression.find(self.json_data)
        source_data_value = match[0].value
        return source_data_value

In [76]:
class STTM:
    def __init__(self, input_json):
        self.input_json = input_json
        self.mapping_instance = Mappings()
        self.source_instance = Source()
        self.destination_instance = Target()
        self.transform_instance = Transform()
        self.look_up_mask = {i.name: i.value for i in TransformMask}
        self.json_data_transformed = {}

    def _get_mapping_data(self):
        return self.mapping_instance.get

    def _get_mapping_source_data(self):
        return self.source_instance.get

    def get_transformed_data(self):

        for mappings in self._get_mapping_data():

            """fetch the source mapping """
            mapping_source_id = mappings.get("mapping_source")
            mapping_destination_id = mappings.get("mapping_destination")
            mapping_transform_id = mappings.get("mapping_transform")

            mapping_source_data = self.source_instance.get_data_by_id(id=mapping_source_id)
            transform_data = self.transform_instance.get_data_by_id(id=mapping_transform_id)

            """Fetch Source  field Name"""
            source_field_name = mapping_source_data.get("source_field_name")

            """if field given is not present incoming json """
            if source_field_name not in self.input_json.keys():
                if mapping_source_data.get("is_required"):
                    raise Exception(
                        "Alert ! Field {} is not present in JSON please FIX mappings ".format(source_field_name))
                else:
                    pass

            else:
                source_data_value = JsonQuery(
                    json_path=mapping_source_data.get("source_field_mapping"),
                    json_data=self.input_json
                ).get()

                """check the data type for source if matches with what we have """
                if mapping_source_data.get("source_field_type") != type(source_data_value).__name__:
                    if source_data_value is not None:
                        _message = (
                            "Alert ! Source Field :{} Datatype has changed from {} to {} ".format(source_field_name,
                                                                                                  mapping_source_data.get(
                                                                                                      "source_field_type"),
                                                                                                  type(
                                                                                                      source_data_value).__name__))
                        print(_message)
                        raise Exception(_message)

                """Query and fetch the Destination | target """
                destination_mappings_json_object = self.destination_instance.get_data_by_id(
                    id=mappings.get("mapping_destination"))

                destination_field_name = destination_mappings_json_object.get("destination_field_name")
                destination_field_type = destination_mappings_json_object.get("destination_field_type")

                dtypes = [str, float, list, int, set, dict]

                for dtype in dtypes:

                    """Datatype Conversion """
                    if destination_field_type == str(dtype.__name__):

                        """is source is none insert default value"""
                        if source_data_value is None:
                            self.json_data_transformed[destination_field_name] = dtype.__call__(
                                destination_mappings_json_object.get("default_value")
                            )

                        else:
                            """check if you have items to transform"""
                            if transform_data is not None:
                                """ check for invalid mask name """
                                if transform_data.get("transform_mask") not in list(self.look_up_mask.keys()):
                                    raise Exception(
                                        f"Specified Transform {transform_data.get('transform_mask')} is not available please select from following Options :{list(self.look_up_mask.keys())}")
                                else:
                                    mask_apply = self.look_up_mask.get(transform_data.get("transform_mask"))
                                    converted_dtype = dtype.__call__(source_data_value)
                                    mask = f'converted_dtype{mask_apply}'
                                    curated_value = eval(mask)
                                    self.json_data_transformed[destination_field_name] = curated_value

                            else:
                                self.json_data_transformed[destination_field_name] = dtype.__call__(source_data_value)

        return self.json_data_transformed

In [77]:
transformed_data = []
for item in data:
    helper = STTM(input_json=item)
    response = helper.get_transformed_data()
    transformed_data.append(response)
    print(response)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
{'Title': 'Happy Anniversary', 'IMDbScore': '5.7', 'IMDbVotes': '3838', 'AwardsReceived': 'None', 'NetflixReleaseDate': '2018-03-30', 'ProductionHouse': 'Industry Entertainment'}
{'Title': 'Sofía Niño De Rivera: Selección Natural', 'IMDbScore': '6.8', 'IMDbVotes': '205', 'AwardsReceived': 'None', 'NetflixReleaseDate': '2018-03-30', 'ProductionHouse': 'None'}
{'Title': 'First Match', 'IMDbScore': '6.4', 'IMDbVotes': '1980', 'AwardsReceived': '2', 'NetflixReleaseDate': '2018-03-30', 'ProductionHouse': 'None'}
{'Title': 'The China Hustle', 'IMDbScore': '7.2', 'IMDbVotes': '2881', 'AwardsReceived': 'None', 'NetflixReleaseDate': '2018-03-30', 'ProductionHouse': 'Jigsaw Productions, S.J. Gibson Films, Kennedy/Marshall'}
{'Title': 'Kill Me Heal Me', 'IMDbScore': '8.3', 'IMDbVotes': '3807', 'AwardsReceived': '8', 'NetflixReleaseDate': '2018-03-30', 'ProductionHouse': 'None'}
{'Title': 'Historietas Assombradas: O Filme', 'IMDbScor

In [78]:
pd.DataFrame(transformed_data)

Unnamed: 0,Title,IMDbScore,IMDbVotes,AwardsReceived,NetflixReleaseDate,ProductionHouse
0,Lets Fight Ghost,7.9,205926,74,2021-03-04,"Canal+, Sandrew Metronome"
1,How To Build A Girl,5.8,2838,1,2021-03-04,"Film 4, Monumental Pictures, Lionsgate"
2,The Con-Heartist,7.4,131,,2021-03-03,
3,Gleboka Woda,7.5,47,2,2021-03-03,
4,Only A Mother,6.7,88,2,2021-03-03,
...,...,...,...,...,...,...
9420,13 Going On 30,6.2,167842,,2015-04-14,"Revolution Studios, Thirteen Productions Llc"
9421,Life 2.0,6.2,878,1,2015-04-14,
9422,Brand New Day,7.3,14,,2015-04-14,
9423,Daniel Arends: Blessuretijd,7.8,174,,2015-04-14,
