In [0]:
!pip install pystardog
!pip install rdflib

In [0]:
import json
import rdflib
from rdflib import Literal, Namespace
from rdflib.namespace import RDF,RDFS,XSD
import stardog
from stardog.content import Raw
from stardog.exceptions import StardogException
from time import time

In [0]:
start_time=time()
with open("Stardog_Credentials.json") as f:
  config = json.load(f)

endpoint = config['endpoint']
username = config['username']
password = config['password']
db=config['Graph_database']
ont_ns=config['Ontology_namespace']
data_ns=config['Data_namespace']

In [0]:
# Namespaces
ont = Namespace(ont_ns+'#')
data = Namespace(data_ns+'#')

connection_details = {
    'endpoint': endpoint,
    'username': username,
    'password': password
}

In [0]:
def insert_data(graph):
    with stardog.Connection(db, **connection_details) as conn:
        print("Connection Successful.")
        ttl_data=graph.serialize(format='turtle')
        # print(ttl_data)
        conn.begin()
        # print(data_ns)
        conn.add(Raw(ttl_data, 'text/turtle'), graph_uri=data_ns)
        conn.commit()
        print("Data inserted successfully.")

### category tree

In [0]:
data_catagory = spark.table("{c}.{s}.silver_category_tree".format(c=config['Spark_catalog'], s=config['Spark_Schema'])).collect() 
# print(data_catagory)
print(data_catagory[0])

g = rdflib.Graph()

for i in data_catagory:
    data_uri = data[f"{i.id}"]
    g.add((data_uri, RDF.type, ont.CategoryTree))
    g.add((data_uri, ont.categorytree_name, Literal(i.name, datatype=XSD.string)))
    g.add((data_uri, RDFS.label, Literal(i.name, datatype=XSD.string)))

# Serialize_debugging
# print(g.serialize(format='turtle'))
insert_data(g)   # Uncomment to insert data

### category

In [0]:
data_catagory = spark.table("{c}.{s}.silver_category".format(c=config['Spark_catalog'], s=config['Spark_Schema'])).collect() 
# print(data_catagory)
print(data_catagory[0])
g = rdflib.Graph()
for i in data_catagory:
    # print(i)
    data_uri = data[f"{i.category_id}"]
    g.add((data_uri, RDF.type, ont.Category))
    if(i.parent_id==None):
        parent_uri = data[f"{i.tree_id}"]
        g.add((data_uri, ont.hasparentcategoryintree, parent_uri))
    else:
        parent_uri = data[f"{i.parent_id}"]
        g.add((data_uri, ont.hasparentcategory, parent_uri))
    # print(i.tree_id,i.parent_id)
    g.add((data_uri, ont.category_name, Literal(i.name, datatype=XSD.string)))
    g.add((data_uri, ont.description_category, Literal(i.description, datatype=XSD.string)))
    g.add((data_uri, ont.search_keywords_category, Literal(i.search_keywords, datatype=XSD.string)))
    g.add((data_uri, RDFS.label, Literal(i.name, datatype=XSD.string)))


# Serialize_debugging
# print(g.serialize(format='turtle'))
insert_data(g)   # Uncomment to insert data

### Brand

In [0]:
data_brand = spark.table("{c}.{s}.silver_brand".format(c=config['Spark_catalog'], s=config['Spark_Schema'])).collect() 
# print(data_catagory)
print(data_brand[0])
g = rdflib.Graph()
for i in data_brand:
    # print(i)
    data_uri = data[f"{i.brand_id}"]
    g.add((data_uri, RDF.type, ont.Brand))
    g.add((data_uri, ont.brand_name, Literal(i.name, datatype=XSD.string)))
    g.add((data_uri, ont.description_brand, Literal(i.description, datatype=XSD.string)))
    g.add((data_uri, RDFS.label, Literal(i.name, datatype=XSD.string)))

# Serialize_debugging
# print(g.serialize(format='turtle'))
insert_data(g)   # Uncomment to insert data

### Product

In [0]:
data_prod = spark.table("{c}.{s}.silver_product".format(c=config['Spark_catalog'], s=config['Spark_Schema'])).collect() 
# print(data_catagory)
print(data_prod[0])
g = rdflib.Graph()
for i in data_prod:
    data_uri = data[f"{i.product_id}"]
    g.add((data_uri, RDF.type, ont.Product))
    g.add((data_uri, ont.product_name, Literal(i.name, datatype=XSD.string)))
    g.add((data_uri, ont.description_product, Literal(i.description, datatype=XSD.string)))
    g.add((data_uri, ont.type, Literal(i.type, datatype=XSD.string)))
    g.add((data_uri, ont.sku, Literal(i.sku, datatype=XSD.string)))
    g.add((data_uri, ont.fixed_cost_shipping_price, Literal(i.fixed_cost_shipping_price, datatype=XSD.decimal)))
    g.add((data_uri, ont.is_free_shipping_product, Literal(i.is_free_shipping, datatype=XSD.boolean)))
    g.add((data_uri, ont.upc_product, Literal(i.upc, datatype=XSD.string)))
    g.add((data_uri, ont.search_keywords_product, Literal(i.search_keywords, datatype=XSD.string)))
    g.add((data_uri, ont.availability, Literal(i.availability, datatype=XSD.string)))
    g.add((data_uri, ont.sort_order, Literal(i.sort_order, datatype=XSD.integer)))
    g.add((data_uri, ont.order_quantity_minimum, Literal(i.order_quantity_minimum, datatype=XSD.integer)))
    g.add((data_uri, ont.order_quantity_maximum, Literal(i.order_quantity_maximum, datatype=XSD.integer)))
    g.add((data_uri, ont.gtin_product, Literal(i.gtin, datatype=XSD.string)))
    g.add((data_uri, ont.mpn_product, Literal(i.mpn, datatype=XSD.string)))
    g.add((data_uri, ont.reviews_rating_sum, Literal(i.reviews_rating_sum, datatype=XSD.integer)))
    g.add((data_uri, ont.reviews_count, Literal(i.reviews_count, datatype=XSD.integer)))
    g.add((data_uri, ont.total_sold, Literal(i.total_sold, datatype=XSD.integer)))
    g.add((data_uri, ont.date_created, Literal(i.date_created.isoformat(), datatype=XSD.dateTime)))
    g.add((data_uri, ont.date_modified, Literal(i.date_modified.isoformat(), datatype=XSD.dateTime)))

    # Dimensions
    g.add((data_uri, ont.weight_product, Literal(i.weight, datatype=XSD.decimal)))
    g.add((data_uri, ont.width_product, Literal(i.width, datatype=XSD.decimal)))
    g.add((data_uri, ont.depth_product, Literal(i.depth, datatype=XSD.decimal)))
    g.add((data_uri, ont.height_product, Literal(i.height, datatype=XSD.decimal)))

    # Price-related literals
    g.add((data_uri, ont.price_product, Literal(i.price, datatype=XSD.decimal)))
    g.add((data_uri, ont.cost_price_product, Literal(i.cost_price, datatype=XSD.decimal)))
    g.add((data_uri, ont.retail_price_product, Literal(i.retail_price, datatype=XSD.decimal)))
    g.add((data_uri, ont.sale_price_product, Literal(i.sale_price, datatype=XSD.decimal)))

    # Relationships to category and brand
    category_uri = data[f"{i.category_id}"]
    brand_uri = data[f"{i.brand_id}"]
    g.add((data_uri, ont.belongsto, category_uri))
    g.add((data_uri, ont.producedby, brand_uri))

    g.add((data_uri, RDFS.label, Literal(i.name, datatype=XSD.string)))

# Serialize for debugging
# print(g.serialize(format='turtle'))

insert_data(g)   # Uncomment to insert data

### Product Variant

In [0]:
data_prod_var = spark.table("{c}.{s}.silver_product_variant".format(c=config['Spark_catalog'], s=config['Spark_Schema'])).collect() 
# print(data_catagory)
print(data_prod_var[0])
g = rdflib.Graph()

for pv in data_prod_var:
    variant_uri = data[f"{pv.prod_var_id}"]
    product_uri = data[f"{pv.product_id}"]
    
    g.add((variant_uri, RDF.type, ont.ProductVariant))
    g.add((variant_uri, ont.sku_variant, Literal(pv.sku, datatype=XSD.string)))
    g.add((variant_uri, ont.sku_id, Literal(pv.sku_id, datatype=XSD.integer)))
    g.add((variant_uri, ont.cost_price_variant, Literal(pv.cost_price, datatype=XSD.decimal)))
    g.add((variant_uri, ont.price_variant, Literal(pv.price, datatype=XSD.decimal)))
    g.add((variant_uri, ont.sale_price_variant, Literal(pv.sale_price, datatype=XSD.decimal)))
    g.add((variant_uri, ont.retail_price_variant, Literal(pv.retail_price, datatype=XSD.decimal)))
    g.add((variant_uri, ont.weight_variant, Literal(pv.weight, datatype=XSD.decimal)))
    g.add((variant_uri, ont.width_variant, Literal(pv.width, datatype=XSD.decimal)))
    g.add((variant_uri, ont.height_variant, Literal(pv.height, datatype=XSD.decimal)))
    g.add((variant_uri, ont.depth_variant, Literal(pv.depth, datatype=XSD.decimal)))
    g.add((variant_uri, ont.is_free_shipping_variant, Literal(pv.is_free_shipping, datatype=XSD.boolean)))
    g.add((variant_uri, ont.upc_variant, Literal(pv.upc, datatype=XSD.string)))
    g.add((variant_uri, ont.mpn_variant, Literal(pv.mpn, datatype=XSD.string)))
    g.add((variant_uri, ont.gtin_variant, Literal(pv.gtin, datatype=XSD.string)))

    g.add((variant_uri, RDFS.label, Literal(pv.sku, datatype=XSD.string)))

    # Link variant to its product
    g.add((variant_uri, ont.variantof, product_uri))

# Serialize for debugging

# print(g.serialize(format='turtle'))
insert_data(g)   # Uncomment to insert data

In [0]:
print("Time taken : ", time() - start_time)