In [1]:
!pip install jsonpath_ng

Collecting jsonpath_ng
  Downloading jsonpath_ng-1.5.3-py3-none-any.whl (29 kB)
Collecting ply (from jsonpath_ng)
  Downloading ply-3.11-py2.py3-none-any.whl (49 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/49.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.6/49.6 kB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: ply, jsonpath_ng
Successfully installed jsonpath_ng-1.5.3 ply-3.11


In [2]:
import os
import json
import numpy as np
import pandas as pd
import sqlite3
import functools as ft
import matplotlib.pyplot as plt
from abc import ABC, abstractmethod
from jsonpath_ng import parse
from enum import Enum
%matplotlib inline

### **Starting ETL Process**

### Data

In [3]:
with open("Amazon_database.json", "r") as file:
    data = json.loads(json.load(file))
    for item in data:
      item["product_parent"]=str(item["product_parent"])
      item["star_rating"]=str(item["star_rating"])
      item["helpful_votes"]=str(item["helpful_votes"])
      item["total_votes"]=str(item["total_votes"])
      item["review_date"]=str(item["review_date"])
      item["customer_id"]=str(item["customer_id"])

data[:10]

[{'customer_id': '1797882',
  'review_id': 'R3I2DHQBR577SS',
  'product_id': 'B001ANOOOE',
  'product_parent': '2102612',
  'product_title': 'The Naked Bee Vitmin C Moisturizing Sunscreen SPF 30, 5.5 oz (163 ml.)',
  'star_rating': '5',
  'helpful_votes': '0',
  'total_votes': '0',
  'vine': 'N',
  'verified_purchase': 'Y',
  'review_headline': 'Five Stars',
  'review_body': 'Love this, excellent sun block!!',
  'review_date': '1440979200000'},
 {'customer_id': '18381298',
  'review_id': 'R1QNE9NQFJC2Y4',
  'product_id': 'B0016J22EQ',
  'product_parent': '106393691',
  'product_title': 'Alba Botanica Sunless Tanning Lotion, 4 Ounce',
  'star_rating': '5',
  'helpful_votes': '0',
  'total_votes': '0',
  'vine': 'N',
  'verified_purchase': 'Y',
  'review_headline': 'Thank you Alba Bontanica!',
  'review_body': "The great thing about this cream is that it doesn't smell weird like all those chemical laden ones.  I get a nice healthy un-fake looking tan that isn't orange and it makes my ski

###  Abstract Base Calss (ABC) for mutual methods

In [4]:
class Interface(ABC):
    @abstractmethod
    def get_data_by_field(self, field_name):
        """Fetch the data by given feild name """

    @abstractmethod
    def get_data_by_id(self, id):
        """Fetch the data by given ID  """

    @abstractmethod
    def get(self):
        """Fetch all data """

### Transform Operations
inherithed from Enum - class that automatic enumrate the variables

In [5]:
class TransformMask(Enum):
    CAPITAL_LETTER = ".strip().upper()"


### Database Class - Define Common Properties for Source, Target, Mapping

In [6]:
columns = data[0].keys()
fact = ['review_id','product_id','review_date','customer_id','vine',
        'star_rating','helpful_votes','total_votes','verified_purchase']
dimProduct= ['product_id', 'product_parent','product_title']
dimReviewInfo= ['review_id', 'review_headline','review_body']

fact_numeric = ['star_rating','helpful_votes','total_votes']
fact_categorial = ['review_id','product_id','review_date','customer_id','vine','verified_purchase']
dimProduct_numeric= ['product_parent']
dimProduct_categorial= ['product_id','product_title']
dimReviewInfo_categorial= ['review_id','review_headline','review_body']
capital = ['vine','verified_purchase','product_id']


In [7]:
class Database:
    def __init__(self):
        self.db = {
             "source" : [],
             "destination": [],
             "transform" : [],
             "mapping" : []
        }
        self.add_fact_sources()
        self.add_fact_destination()
        self.add_fact_transform()
        self.add_fact_mapping()


    def add_fact_sources(self):
      i = 1
      for column in fact:
          self.add_source(i, column, "str", True)
          i += 1
      for column in dimProduct:
          self.add_source(i, column, "str", True)
          i += 1

      for column in dimReviewInfo:
          self.add_source(i, column, "str", True)
          i += 1

    def add_fact_destination(self):
      i=1
      for column in fact_numeric:
          self.add_destination(i, column, "int", 0, "fact")
          i += 1

      for column in fact_categorial:
          self.add_destination(i, column, "str", "n/a", "fact")
          i += 1

      for column in dimProduct_numeric:
          self.add_destination(i, column, "int", 0, "dimProduct")
          i += 1

      for column in dimProduct_categorial:
          self.add_destination(i, column, "str", "n/a", "dimProduct")
          i += 1

      for column in dimReviewInfo_categorial:
          self.add_destination(i, column, "str", "n/a", "dimReviewInfo")
          i += 1

    def add_fact_transform(self):
      i=1
      for column in capital:
          self.add_transform(i,'CAPITAL_LETTER')
          i += 1

    def add_fact_mapping(self):
      self.add_mapping(1,1,4, "fact")
      self.add_mapping(2,2,5, "fact")
      self.add_mapping(3,3,6, "fact")

      self.add_mapping(4,4,7, "fact")
      self.add_mapping(5,5,8, "fact",1)
      self.add_mapping(6,6,1, "fact")
      self.add_mapping(7,7,2, "fact")
      self.add_mapping(8,8,3, "fact")
      self.add_mapping(9,9,9, "fact",2)
      self.add_mapping(10,2,11, "dimProduct",3)
      self.add_mapping(11,11,10, "dimProduct")
      self.add_mapping(12,12,12, "dimProduct")
      self.add_mapping(13,1,13, "dimReviewInfo")
      self.add_mapping(14,14,14, "dimReviewInfo")
      self.add_mapping(15,15,15, "dimReviewInfo")

    def add_source(self, id, field, ty,is_required):
      new_source = { "id": id,
                    "source_field_name": field,
                   "source_field_mapping": f"$.{field}",
                    "source_field_type": ty, # use python types
                    "is_required":is_required }
      self.db["source"].append(new_source)


    def add_destination(self, id, field, ty, default_value, table):
      new_destination = { "id": id,
                    "destination_field_name": field,
                    "destination_field_mapping": f"$.{field}",
                    "destination_field_type": ty, # use python types
                    "default_value": default_value,
                    "destination_table": table}
      self.db["destination"].append(new_destination)


    def add_transform(self, id, transform_mask):
      new_transform = { "id": id,
                    "transform_mask": transform_mask,}
      self.db["transform"].append(new_transform)

    def add_mapping(self, id, mapping_source, mapping_destination, table, mapping_transform = None):
      new_mapping = {"id": id,
                    "mapping_source": mapping_source,
                    "mapping_destination": mapping_destination,
                    "destination_table": table,
                    "mapping_transform":mapping_transform
                     }
      self.db["mapping"].append(new_mapping)

    @property
    def get_data_source_target_mapping(self):
      return self.db

### Source class

Inherited from Interface for the common methods and from Database for common variables

In [8]:
class Source(Interface, Database):
    def __init__(self):
        Database.__init__(self)

    # should be implemented - inherited from Interface
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("source")

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id") == self.id:
                return x
        return None

### Target class

Inherited from Interface for the common methods and from Database for common variables

In [9]:
class Target(Interface, Database):

    def __init__(self):
        Database.__init__(self)

    # should be implemented - inherited from Interface
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("destination")

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id").__str__() == self.id.__str__():
                return x
        return None

### Transform Class

Inherited from Interface for the common methods and from Database for common variables

In [10]:
class Transform(Interface, Database):

    def __init__(self):
        Database.__init__(self)

    # should be implemented - inherited from Interface
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("transform", [])

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id").__str__() == self.id.__str__():
                return x
        return None

### Mapping class

Inherited from Interface for the common methods and from Database for common variables

In [11]:
class Mappings(Interface, Database):

    def __init__(self):
        Database.__init__(self)

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("mapping")

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id").__str__() == self.id.__str__():
                return x
        return None

    def get_data_by_field(self, field_name):
        return None

### Format Class - JSON

Search the source data value inside a JSON file

In [12]:
class JsonQuery:
    def __init__(self, json_path, json_data):
        self.json_path = json_path
        self.json_data = json_data

    def get(self):
        jsonpath_expression = parse(self.json_path)
        match = jsonpath_expression.find(self.json_data)
        source_data_value = match[0].value
        return source_data_value

### Combine it All - STTM

In [13]:
class STTM:
    def __init__(self, input_json):
        self.input_json = input_json
        self.mapping_instance = Mappings()
        self.source_instance = Source()
        self.destination_instance = Target()
        self.transform_instance = Transform()
        self.look_up_mask = {i.name: i.value for i in TransformMask}
        self.json_data_transformed = {}
        self.to_table= {}

    def _get_mapping_data(self):
        return self.mapping_instance.get

    def _get_mapping_source_data(self):
        return self.source_instance.get

    def get_transformed_data(self):

        for mappings in self._get_mapping_data():

            """fetch the source mapping """
            mapping_source_id = mappings.get("mapping_source")
            mapping_destination_id = mappings.get("mapping_destination")
            mapping_transform_id = mappings.get("mapping_transform")
            mapping_destination_table = mappings.get("destination_table")


            mapping_source_data = self.source_instance.get_data_by_id(id=mapping_source_id)
            transform_data = self.transform_instance.get_data_by_id(id=mapping_transform_id)

            """Fetch Source  field Name"""
            source_field_name = mapping_source_data.get("source_field_name")

            """if field given is not present incoming json """
            if source_field_name not in self.input_json.keys():
                if mapping_source_data.get("is_required"):
                    raise Exception(
                        "Alert ! Field {} is not present in JSON please FIX mappings ".format(source_field_name))
                else:
                    pass

            else:
                source_data_value = JsonQuery(
                    json_path=mapping_source_data.get("source_field_mapping"),
                    json_data=self.input_json
                ).get()

                """check the data type for source if matches with what we have """
                if mapping_source_data.get("source_field_type") != type(source_data_value).__name__:
                    if source_data_value is not None:
                        _message = (
                            "Alert ! Source Field :{} Datatype has changed from {} to {} ".format(source_field_name,
                                                                                                  mapping_source_data.get(
                                                                                                      "source_field_type"),
                                                                                                  type(
                                                                                                      source_data_value).__name__))
                        print(_message)
                        raise Exception(_message)

                """Query and fetch the Destination | target """
                destination_mappings_json_object = self.destination_instance.get_data_by_id(
                    id=mappings.get("mapping_destination"))

                destination_field_name = destination_mappings_json_object.get("destination_field_name")
                destination_field_type = destination_mappings_json_object.get("destination_field_type")

                self.to_table[destination_field_name]= mapping_destination_table

                dtypes = [str, float, list, int, set, dict]

                for dtype in dtypes:

                    """Datatype Conversion """
                    if destination_field_type == str(dtype.__name__):

                        """is source is none insert default value"""
                        if source_data_value is None:
                            self.json_data_transformed[destination_field_name] = dtype.__call__(
                                destination_mappings_json_object.get("default_value")
                            )

                        else:
                            """check if you have items to transform"""
                            if transform_data is not None:
                                """ check for invalid mask name """
                                if transform_data.get("transform_mask") not in list(self.look_up_mask.keys()):
                                    raise Exception(
                                        f"Specified Transform {transform_data.get('transform_mask')} is not available please select from following Options :{list(self.look_up_mask.keys())}")
                                else:
                                    mask_apply = self.look_up_mask.get(transform_data.get("transform_mask"))
                                    converted_dtype = dtype.__call__(source_data_value)
                                    mask = f'converted_dtype{mask_apply}'
                                    curated_value = eval(mask)
                                    self.json_data_transformed[destination_field_name] = curated_value

                            else:
                                self.json_data_transformed[destination_field_name] = dtype.__call__(source_data_value)

        return self.json_data_transformed, self.to_table

In [14]:
transformed_data = []

for item in data[:10000]:
    helper = STTM(input_json=item)
    response, mapping = helper.get_transformed_data()
    transformed_data.append(response)
    print(response)
print(mapping)

{'review_id': 'R3I2DHQBR577SS', 'product_id': 'B001ANOOOE', 'review_date': '1440979200000', 'customer_id': '1797882', 'vine': 'N', 'star_rating': 5, 'helpful_votes': 0, 'total_votes': 0, 'verified_purchase': 'Y', 'product_parent': 2102612, 'product_title': 'The Naked Bee Vitmin C Moisturizing Sunscreen SPF 30, 5.5 oz (163 ml.)', 'review_headline': 'Five Stars', 'review_body': 'Love this, excellent sun block!!'}
{'review_id': 'R1QNE9NQFJC2Y4', 'product_id': 'B0016J22EQ', 'review_date': '1440979200000', 'customer_id': '18381298', 'vine': 'N', 'star_rating': 5, 'helpful_votes': 0, 'total_votes': 0, 'verified_purchase': 'Y', 'product_parent': 106393691, 'product_title': 'Alba Botanica Sunless Tanning Lotion, 4 Ounce', 'review_headline': 'Thank you Alba Bontanica!', 'review_body': "The great thing about this cream is that it doesn't smell weird like all those chemical laden ones.  I get a nice healthy un-fake looking tan that isn't orange and it makes my skin soft too."}
{'review_id': 'R3LI

KeyboardInterrupt: ignored

In [15]:
mapping

{'review_id': 'dimReviewInfo',
 'product_id': 'dimProduct',
 'review_date': 'fact',
 'customer_id': 'fact',
 'vine': 'fact',
 'star_rating': 'fact',
 'helpful_votes': 'fact',
 'total_votes': 'fact',
 'verified_purchase': 'fact',
 'product_parent': 'dimProduct',
 'product_title': 'dimProduct',
 'review_headline': 'dimReviewInfo',
 'review_body': 'dimReviewInfo'}

### **Creating 3 seperate tables and printing Fact table**

In [16]:
# Create separate lists to hold data for each table
dimReviewInfo_data = []
dimProduct_data = []
fact_data = []

# Iterate over each transformed data dictionary
for data in transformed_data:
    # Extract the relevant columns based on the table specified in the mapping
    dimReviewInfo_row = [data[column] for column, table in mapping.items() if table == 'dimReviewInfo']
    dimProduct_row = [data[column] for column, table in mapping.items() if table == 'dimProduct']
    fact_row = [data[column] for column, table in mapping.items() if table == 'fact']

    # Append the rows to the respective tables' data lists
    dimReviewInfo_data.append(dimReviewInfo_row)
    dimProduct_data.append(dimProduct_row)
    fact_data.append(fact_row)

# Create DataFrames for each table
dimReviewInfo_df = pd.DataFrame(dimReviewInfo_data, columns=[column for column, table in mapping.items() if table == 'dimReviewInfo'])
dimProduct_df = pd.DataFrame(dimProduct_data, columns=[column for column, table in mapping.items() if table == 'dimProduct'])
fact_df = pd.DataFrame(fact_data, columns=[column for column, table in mapping.items() if table == 'fact'])

# Add 'review_id' and 'product_id' columns to the fact_data DataFrame
#fact_df['review_id'] = dimReviewInfo_df['review_id']
#fact_df['product_id'] = dimProduct_df['product_id']
fact_df.insert(0, 'review_id', dimReviewInfo_df['review_id'])
fact_df.insert(1, 'product_id', dimProduct_df['product_id'])
# Print the FACT table
fact_df


Unnamed: 0,review_id,product_id,review_date,customer_id,vine,star_rating,helpful_votes,total_votes,verified_purchase
0,R3I2DHQBR577SS,B001ANOOOE,1440979200000,1797882,N,5,0,0,Y
1,R1QNE9NQFJC2Y4,B0016J22EQ,1440979200000,18381298,N,5,0,0,Y
2,R3LIDG2Q4LJBAO,B00HU6UQAG,1440979200000,19242472,N,5,0,0,Y
3,R3KSZHPAEVPEAL,B002HWS7RM,1440979200000,19551372,N,5,0,0,Y
4,RAI2OIG50KZ43,B00SM99KWU,1440979200000,14802407,N,5,0,0,Y
5,R1R30FA4RB5P54,B000NYL1Z6,1440979200000,2909389,N,4,0,0,Y
6,R30IJKCGJBGPJH,B001SYWTFG,1440979200000,19397215,N,5,0,0,Y
7,R18GLJJPVQ1OVH,B005F2EVMQ,1440979200000,3195210,N,5,0,0,Y
8,R8TVYIJXLYJT0,B00M1SUW7K,1440979200000,52216383,N,5,0,0,Y
9,R1CJGF6M3PVHEZ,B001KYQA1S,1440979200000,10278216,N,1,0,2,Y


### **Printing product table**

In [17]:
print("\ndimProduct_df:")
dimProduct_df




dimProduct_df:


Unnamed: 0,product_id,product_parent,product_title
0,B001ANOOOE,2102612,The Naked Bee Vitmin C Moisturizing Sunscreen ...
1,B0016J22EQ,106393691,"Alba Botanica Sunless Tanning Lotion, 4 Ounce"
2,B00HU6UQAG,375449471,"Elysee Infusion Skin Therapy Elixir, 2oz."
3,B002HWS7RM,255651889,"Diane D722 Color, Perm And Conditioner Process..."
4,B00SM99KWU,116158747,Biore UV Aqua Rich Watery Essence SPF50+/PA+++...
5,B000NYL1Z6,166146615,Murad Clarifying Cleanser
6,B001SYWTFG,111742328,CoverGirl Queen Collection Perfect Point Plus ...
7,B005F2EVMQ,255803087,"Bifesta Mandom Eye Makeup Remover, 145ml"
8,B00M1SUW7K,246816549,Can You Handlebar All-Natural Secondary Mousta...
9,B001KYQA1S,9612905,"Maybelline Great Lash Washable Mascara, Clear ..."


### **Printing Review table**

In [18]:
print("\ndimReviewInfo_df:")
dimReviewInfo_df


dimReviewInfo_df:


Unnamed: 0,review_id,review_headline,review_body
0,R3I2DHQBR577SS,Five Stars,"Love this, excellent sun block!!"
1,R1QNE9NQFJC2Y4,Thank you Alba Bontanica!,The great thing about this cream is that it do...
2,R3LIDG2Q4LJBAO,Five Stars,"Great Product, I'm 65 years old and this is al..."
3,R3KSZHPAEVPEAL,GOOD DEAL!,I use them as shower caps & conditioning caps....
4,RAI2OIG50KZ43,this soaks in quick and provides a nice base f...,This is my go-to daily sunblock. It leaves no ...
5,R1R30FA4RB5P54,Four Stars,Good
6,R30IJKCGJBGPJH,Good buy,"Great eyeliner, does run just a little bit at ..."
7,R18GLJJPVQ1OVH,Five Stars,Best makeup remover！
8,R8TVYIJXLYJT0,Tame the wild mustache,This is a great product. I'm on my latest stab...
9,R1CJGF6M3PVHEZ,but it's like having nothing on them at all,I thought it would darken the lashes even thou...


### **Export tables to csv file**

In [19]:
dimReviewInfo_df.to_csv('dimReviewInfo.csv', index=False)
dimProduct_df.to_csv('dimProduct.csv', index=False)
fact_df.to_csv('fact.csv', index=False)


### **SQL Queries**

**1**
SELECT star_rating, COUNT(*) AS verified_purchase_count
FROM fact
WHERE verified_purchase = 'Y'
GROUP BY star_rating
ORDER BY star_rating;

**2**
SELECT DISTINCT verified_purchase, AVG(star_rating)
over (partition by (verified_purchase)) AS AvgRating
FROM fact;

**3**
SELECT DISTINCT verified_purchase, COUNT(*)
over (partition by verified_purchase) as Counter
FROM fact
WHERE helpful_votes>2;

**4**
SELECT DISTINCT verified_purchase, AVG(star_rating)
over (partition by (verified_purchase)) AS AvgRating
FROM fact AS f JOIN dimReviewInfo as dRI
on f.review_id = dRI.review_id
WHERE LENGTH(dRI.review_body) > (
  SELECT AVG(LENGTH(review_body))
  FROM dimReviewInfo
);

**5**
SELECT DISTINCT verified_purchase, AVG(star_rating)
over (partition by (verified_purchase)) AS AvgRating
FROM fact
WHERE helpful_votes>2;

**6**
SELECT p.product_id, p.product_title, AVG(f.star_rating) AS average_rating
FROM dimProduct AS p
INNER JOIN fact AS f ON p.product_id = f.product_id
WHERE f.verified_purchase = 'Y'
GROUP BY p.product_id, p.product_title
HAVING COUNT(*) >= 10  -- Limit to products with at least 10 comments
ORDER BY average_rating ASC
LIMIT 10;

**7**
SELECT DISTINCT verified_purchase, COUNT(*)
over (partition by verified_purchase) as Counter
FROM dimReviewInfo AS ri JOIN fact AS f ON ri.review_id = f.review_id
WHERE LOWER(ri.review_body) LIKE '%love%';

**8**
SELECT p.product_title,
       COUNT(*) AS total_comments,
       ROUND((COUNT(*) FILTER (WHERE f.star_rating > 4) * 100.0 / COUNT(*)), 2) AS percentage_positive
FROM dimProduct AS p
JOIN fact AS f ON p.product_id = f.product_id
GROUP BY p.product_title
ORDER BY COUNT(*) DESC
LIMIT 10;

**9**
SELECT DISTINCT fact.product_id, COUNT(*) over (PARTITION BY fact.product_id) AS Occurrences
FROM fact
JOIN dimReviewInfo ON fact.review_id = dimReviewInfo.review_id
WHERE dimReviewInfo.review_headline LIKE 'great%' and fact.verified_purchase = 'N'
ORDER BY Occurrences DESC;

**10**
SELECT f1.customer_id,
       AVG(f1.star_rating) OVER (PARTITION BY f1.customer_id) AS average_rating,
       COUNT(f2.customer_id) AS purchase_count
FROM fact f1
JOIN fact f2 ON f1.customer_id = f2.customer_id
WHERE f1.verified_purchase = 'Y'
GROUP BY f1.customer_id
order by purchase_count DESC;