In [75]:
import pyspark
from bs4 import BeautifulSoup
import requests
import json
import csv
import re
from pyspark.sql import *
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
import pandas as pd
import datetime
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, TimestampType, IntegerType
import pandas as pd
import pymongo
from pyspark.sql.functions import current_date, date_format, cast

In [2]:
spark = SparkSession.builder.getOrCreate()

In [64]:
def TIKI_CRAWLING(product_category, url):
    product_url = "https://tiki.vn/api/v2/products/{}"
    headers = {"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 11_1_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.96 Safari/537.36"}
    flatten_field = [ "badges", "inventory", "categories", "rating_summary",
                      "brand", "seller_specifications", "current_seller", "other_sellers",
                      "configurable_options",  "configurable_products", "specifications", "product_links",
                      "services_and_promotions", "promotions", "stock_item", "installment_info" ]

    def crawl_product_id():
        product_list = []
        i = 1
        while (True):
            response = requests.get(url.format(i), headers=headers)
            if (response.status_code != 200):
                break
            products = json.loads(response.text)["data"]
            if (len(products) == 0):
                break
            for product in products:
                product_id = str(product["id"])
                product_list.append(product_id)
            i += 1
        return product_list, i

    def crawl_product(product_list=[]):
        product_detail_list = []
        for product_id in product_list:
            response = requests.get(product_url.format(product_id), headers=headers)
            if (response.status_code == 200):
                product_detail_list.append(response.text)
        return product_detail_list

    def adjust_product(product):
        try:
            e = json.loads(product)
            if not e.get("id", False):
                return None

            for field in flatten_field:
                if field in e:
                    e[field] = json.dumps(e[field], ensure_ascii=False).replace('\n','')

            return e
        except json.JSONDecodeError:
            return None
        
    product_list, page = crawl_product_id()
    product_list = crawl_product(product_list)
    product_json_list = list(filter(lambda p: p is not None, list(map(lambda p: adjust_product(p), product_list))))
    json_string = json.dumps(product_json_list)

    #initiate spark session
    #spark = SparkSession.builder.getOrCreate()

    #loading data into spark df
    data = spark.read.json(spark.sparkContext.parallelize([json_string]))

    #restructure columns
    data = spark.read.json(spark.sparkContext.parallelize([json_string]))
    data = data.withColumn("brand", F.from_json(data["brand"], "struct<id string, name string, slug string>"))
    data = data.withColumn("current_seller", F.from_json(data["current_seller"], "struct<name string, id string, product_id string, store_id string>"))
    data = data.withColumn("categories", F.from_json(data["categories"], "struct<id string, name string, product_id string>"))
    data = data.withColumn("inventory", F.from_json(data["inventory"], "struct<fulfillment_type string>"))

    data = data.select('name', 'type', 'price', 'list_price', 'discount', \
                           'discount_rate', 'rating_average', 'review_count', 'day_ago_created', 'all_time_quantity_sold', \
                           col('brand.name').alias("brand_name"), col('current_seller.name').alias('seller_name'),  col('current_seller.id').alias('seller_id'), \
                           col('current_seller.product_id').alias('product_id'))

    data = data.withColumn('product_category', lit(product_category))
    data = data.fillna('Unknown')

    return data


In [65]:
product_dict = {"may chay bo":"https://tiki.vn/api/personalish/v1/blocks/listings?limit=40&include=advertisement&aggregations=2&version=home-persionalized&trackity_id=fc020932-9f61-2c32-9b72-3068c435d2ff&category=24130&page={}&urlKey=may-chay-bo", \
        "xe dap tap":"https://tiki.vn/api/personalish/v1/blocks/listings?limit=40&include=advertisement&aggregations=2&version=home-persionalized&trackity_id=fc020932-9f61-2c32-9b72-3068c435d2ff&category=24132&page={}&urlKey=xe-dap-tap", \
        "gian ta da nang":"https://tiki.vn/api/personalish/v1/blocks/listings?limit=40&include=advertisement&aggregations=2&version=home-persionalized&trackity_id=fc020932-9f61-2c32-9b72-3068c435d2ff&category=24136&page={}&urlKey=gian-ta-da-nang", \
        "xa don":"https://tiki.vn/api/personalish/v1/blocks/listings?limit=40&include=advertisement&aggregations=2&version=home-persionalized&trackity_id=fc020932-9f61-2c32-9b72-3068c435d2ff&category=24138&page={}&urlKey=xa-don", \
        "xa kep":"https://tiki.vn/api/personalish/v1/blocks/listings?limit=40&include=advertisement&aggregations=2&version=home-persionalized&trackity_id=fc020932-9f61-2c32-9b72-3068c435d2ff&category=24140&page={}&urlKey=xa-kep", \
        "ta tap": "https://tiki.vn/api/personalish/v1/blocks/listings?limit=40&include=advertisement&aggregations=2&version=home-persionalized&trackity_id=fc020932-9f61-2c32-9b72-3068c435d2ff&category=3323&page={}&urlKey=ta-tay-ta-mieng"}

schema = StructType([
    StructField("name", StringType(), False),
    StructField("type", StringType(), False),
    StructField("price", LongType(), True),
    StructField("list_price", LongType(), True),
    StructField("discount", LongType(), True),
    StructField("discount_rate", LongType(), True),
    StructField("rating_average", DoubleType(), True),
    StructField("review_count", LongType(), True),
    StructField("day_ago_created", LongType(), True),
    StructField("all_time_quantity_sold", LongType(), True),
    StructField("brand_name", StringType(), False),
    StructField("seller_name", StringType(), False),
    StructField("seller_id", StringType(), False),
    StructField("product_id", StringType(), False),
    StructField("product_category", StringType(), False)
])

combined_df = spark.createDataFrame([], schema)

In [66]:
for product_category, url in dict.items():
    data = TIKI_CRAWLING(product_category, url)
    combined_df = combined_df.unionAll(data)

In [127]:
combined_df.show()

+--------------------+------------+--------+----------+--------+-------------+--------------+------------+---------------+----------------------+----------+--------------------+---------+----------+----------------+
|                name|        type|   price|list_price|discount|discount_rate|rating_average|review_count|day_ago_created|all_time_quantity_sold|brand_name|         seller_name|seller_id|product_id|product_category|
+--------------------+------------+--------+----------+--------+-------------+--------------+------------+---------------+----------------------+----------+--------------------+---------+----------+----------------+
|[TIKI TRỢ GIÁ-QUÀ...|      simple| 9800000|  13000000| 3200000|           25|           4.7|           3|            794|                    11| KingSport|KING SPORT OFFICI...|     3775| 125284614|     may chay bo|
|[TIKI TRỢ GIÁ, TẶ...|      simple| 8589000|  12000000| 3411000|           28|           0.0|           0|            137|              

IMPORT TO MONGODB

In [128]:
rdd = combined_df.rdd.toDF()
data = rdd.collect()

In [129]:
data[1]

Row(name='[TIKI TRỢ GIÁ, TẶNG QUÀ 830K] Máy Chạy Bộ Kingsport BK-8000 Đơn Năng - Sự Lựa Chọn Xứng Đáng Cho Sức Khỏe', type='simple', price=8589000, list_price=12000000, discount=3411000, discount_rate=28, rating_average=0.0, review_count=0, day_ago_created=137, all_time_quantity_sold=None, brand_name='KingSport', seller_name='KING SPORT OFFICIAL STORE', seller_id='3775', product_id='270127241', product_category='may chay bo')

In [134]:
current_date = datetime.datetime.now()

# Format the current date to 'yyyyMMdd'
today_yyyyMMdd = current_date.strftime('%Y%m%d')

# Convert the formatted date to a float
today_yyyyMMdd_float = int(today_yyyyMMdd)

print(today_yyyyMMdd_float)

20231125


In [135]:
dict = []
for row in data:
    dict.append({
        "name" : row.name,
        "type" : row.type,
        "price" : row.price,
        "list_price" : row.list_price,
        "discount" : row.discount,
        "discount_rate" : row.discount_rate,
        "rating_average" : row.rating_average,
        "review_count" : row.review_count,
        "day_ago_created" : row.day_ago_created,
        "all_time_quantity_sold" : row.all_time_quantity_sold,
        "brand_name" : row.brand_name,
        "seller_name" : row.seller_name,
        "seller_id" : row.seller_id,
        "product_id" : row.product_id,
        "product_category" : row.product_category,
        "DATA_DATE": today_yyyyMMdd_float
    })

In [98]:
client = pymongo.MongoClient('localhost', 27018)

# Select the database you want to connect to
db = client['DElongth']

# Get a collection from the database
collection = db['TIKI_gym_gear']

In [143]:
for document in dict:
    collection.insert_one(document)

IMPORT DATA TO POWER BI SERVICE

In [142]:
pbi_url = 'https://api.powerbi.com/beta/399232fb-17d1-45ca-bda6-5b540441bd62/datasets/335ba2c8-ab2e-45e9-8012-f01897910b97/rows?experience=power-bi&key=lrt40YpLV%2BVPNDQ7dhVnzvfB%2FGCoqchkj93F35N3dZZofqdg2bSJWo5zJH2BWKYmX3yi2sxRVgD53H5hMp82Cw%3D%3D'
headers = {'Content-Type':'application/json'}

# Convert Pandas DataFrame to a list of dictionaries
data_json = json.dumps(dict)

# Uploading data
response = requests.post(pbi_url, headers=headers, data=data_json)
response.close()

In [141]:
data_json

'[{"name": "[TIKI TR\\u1ee2 GI\\u00c1-QU\\u00c0 T\\u1eb6NG 830K ] M\\u00e1y ch\\u1ea1y b\\u1ed9 t\\u1ea1i nh\\u00e0 KingSport BK-8000 \\u0110a N\\u0103ng c\\u00f3 th\\u1ea3m ch\\u1ea1y r\\u1ed9ng ch\\u1ed1ng tr\\u01a1n tr\\u01b0\\u1ee3t, ch\\u1ed1ng \\u1ed3n, k\\u00e8m theo \\u0111ai massage v\\u00e0 thanh g\\u1eadp b\\u1ee5ng", "type": "simple", "price": 9800000, "list_price": 13000000, "discount": 3200000, "discount_rate": 25, "rating_average": 4.7, "review_count": 3, "day_ago_created": 794, "all_time_quantity_sold": 11, "brand_name": "KingSport", "seller_name": "KING SPORT OFFICIAL STORE", "seller_id": "3775", "product_id": "125284614", "product_category": "may chay bo", "DATA_DATE": 20231125}, {"name": "[TIKI TR\\u1ee2 GI\\u00c1, T\\u1eb6NG QU\\u00c0 830K] M\\u00e1y Ch\\u1ea1y B\\u1ed9 Kingsport BK-8000 \\u0110\\u01a1n N\\u0103ng - S\\u1ef1 L\\u1ef1a Ch\\u1ecdn X\\u1ee9ng \\u0110\\u00e1ng Cho S\\u1ee9c Kh\\u1ecfe", "type": "simple", "price": 8589000, "list_price": 12000000, "discou