In [0]:
# '''
# Sync Command:
# aws s3 sync s3://datastream-prd-client-alphaedison s3://ae-disqo-data-storage-oregan/raw_data/Disqo_Dataset_Synced --source-region us-east-1 --region us-west-2
# '''

In [0]:
from collections import defaultdict
import re
from textdistance import smith_waterman, jaccard
from math import ceil
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql.functions import to_timestamp, current_date, lit, year, month, col, count, to_date, floor as floor_, datediff, row_number, when, avg
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType, LongType
import zipcodes
from datetime import date, timedelta, datetime
from pytz import timezone
import pytz

In [0]:
# dbutils.fs.ls("/mnt/delta/Disqo_Dataset_Synced/active-users/")

In [0]:
# dbutils.fs.ls("/mnt/delta/general_data/")

# Demo Data Processing

In [0]:
def convertState(x):
  if x is None: return x
  x = x.strip()
  return zipcodes.matching(x)[0]['state'] if len(x) == 5 and len(zipcodes.matching(x))>0 else None
def convertCity(x):
  if x is None: return x
  x = x.strip()
  return zipcodes.matching(x)[0]['city'] if len(x) == 5 and len(zipcodes.matching(x))>0 else None

def convertUrbanization(x, urban_rural_mapping):
  if x is None: return 'rural'
  try:
    x = int(x.strip())
  except:
    return None
  return urban_rural_mapping[x] if x in urban_rural_mapping else 'rural'

def household_income_process(x):
  if x is None: return "#N_A"
  if '$' in x: return x
  else: return '#N_A'
  
def region_process(x):
  if x in set(['WA', 'OR', 'AK', 'HI', 'MT', 'WY', 'ID', 'CA', 'NV', 'UT', 'CO']):
    return 'WEST'
  elif x in set(['AZ', 'NM', 'TX', 'OK']):
    return 'SOUTHWEST'
  elif x in set(['ND', 'SD', 'NE', 'KS', 'MN', 'IA', 'MO', 'WI', 'IL', 'IN', 'MI', 'OH']):
    return 'MIDWEST'
  elif x in set(['AR', 'LA', 'MS', 'AL', 'TN', 'KY', 'WV', 'DC', 'VA', 'NC', 'SC', 'GA', 'FL', 'DE', 'MD']):
    return 'SOUTHEAST'
  elif x in set(['PA', 'NY', 'NJ', 'CT', 'VT', 'ME', 'NH', 'MA', 'RI']):
    return 'NORTHEAST'
  
import pandas as pd
df = pd.read_csv("/dbfs/FileStore/tables/disqo_dataset/rural_urban.txt")
urban_rural_mapping = {}
for row in df.iterrows():
  urban_rural_mapping[row[1]['ZCTA5']] = row[1]['urban_rural']

stateUDF = udf(lambda z: convertState(z),StringType())
cityUDF = udf(lambda z: convertCity(z),StringType())
urbanizationUDF = udf(lambda z: convertUrbanization(z, urban_rural_mapping), StringType())
household_incomeUDF = udf(lambda z: household_income_process(z), StringType())
regionUDF = udf(lambda z: region_process(z), StringType())

# # load and read raw demo data
demo_data_path = "/mnt/delta/raw_data/Disqo_Dataset_Synced/v2/active-users/*/*.csv.gz"
demo_data = spark.read.option("header", "true").csv(demo_data_path).dropDuplicates()

# # Manipulate date of birth and age
demo_data = demo_data.withColumn("age", year(current_date()) - year(demo_data.birth_year))
demo_data = demo_data.withColumn("state", stateUDF(col("zip_postal_code")))
demo_data = demo_data.withColumn("city", cityUDF(col("zip_postal_code")))
demo_data = demo_data.withColumn("urbanization", urbanizationUDF(col("zip_postal_code")))
demo_data = demo_data.withColumn("region", regionUDF(col("state")))

demo_data = demo_data.withColumn("household_income", household_incomeUDF(col('household_income')))
demo_data = demo_data.select(col('*'), row_number().over(Window.partitionBy("user_id").orderBy(col("household_income").desc())).alias('rowNumber'))
demo_data = demo_data.filter(col('rowNumber')==1).drop('rowNumber')

# save this demo data into delta format
main_demo_data_delta_file_path = "/mnt/delta/general_data/Disqo_Dataset_Processed_Delta/disqo_demo_data"
demo_data.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(main_demo_data_delta_file_path)

In [0]:
%sql
drop table disqo_demo_data;
-- Headsup: Delete the exsisting table before creating this one
create TABLE disqo_demo_data 
USING delta
LOCATION "/mnt/delta/general_data/Disqo_Dataset_Processed_Delta/disqo_demo_data"

In [0]:
def preprocess_income(income):
  if income == "Under $5,000":
    return '0-40k'
  elif income == "Under $10,000":
    return '0-40k'
  elif income == "$10,000 - $14,999":
    return '0-40k'
  elif income == "$15,000 - $19,999":
    return '0-40k'
  elif income == "$10,000 - $14,999":
    return '0-40k'
  elif income == "$20,000 - $24,999":
    return '0-40k'
  elif income == "$25,000 - $29,999":
    return '0-40k'
  elif income == "$30,000 - $34,999":
    return '0-40k'
  elif income == "$35,000 - $39,999":
    return '0-40k'
  elif income == "$40,000 - $44,999":
    return '40k-100k'
  elif income == "$45,000 - $49,999":
    return '40k-100k'
  elif income == "$50,000 - $54,999":
    return '40k-100k'
  elif income == "$55,000 - $59,999":
    return '40k-100k'
  elif income == "$60,000 - $64,999":
    return '40k-100k'
  elif income == "$65,000 - $69,999":
    return '40k-100k'
  elif income == "$65,000 - $69,999":
    return '40k-100k'
  elif income == "$70,000 - $74,999":
    return '40k-100k'
  elif income == "$75,000 - $79,999":
    return '40k-100k'
  elif income == "$80,000 - $84,999":
    return '40k-100k'
  elif income == "$85,000 - $89,999":
    return '40k-100k'
  elif income == "$90,000 - $94,999":
    return '40k-100k'
  elif income == "$100,000 - $124,999":
    return '100k-200k'
  elif income == "$125,000 - $149,999":
    return '100k-200k'
  elif income == "$150,000 - $174,999":
    return '100k-200k'
  elif income == "$175,000 - $199,999":
    return '100k-200k'
  elif income == "$200,000 - $249,999":
    return '200k+'
  elif income == "More than $250,000":
    return '200k+'
  else:
    return '40k-100k'

def process_presence_of_children(children):
  if children == 'ZERO':
    return 'no_children'
  elif children == 'ONE':
    return "1_child"
  elif children == 'TWO':
    return '2-3_children'
  elif children == 'THREE':
    return '2-3_children'
  elif children == 'FOUR':
    return '>3_children'
  elif children == 'FIVE':
    return '>3_children'
  elif children == 'SIX':
    return '>3_children'
  elif children == 'SEVEN':
    return '>3_children'
  elif children == 'EIGHT':
    return '>3_children'
  else: 
    return 'no_children'

def process_education_level(level):
  if level == 'THIRD_GRADE_OR_LESS' or level == 'MIDDLE_SCHOOL_GRADES_FOUR_TO_EIGHT' or level ==  'COMPLETED_SOME_HIGH_SCHOOL':
    return 'BelowHighSchool' # Less than a high school degree
  elif level == 'HIGH_SCHOOL_GRADUATE' or level == 'VOCATIONAL_TRAINING_OR_TRADE_SCHOOL' or level == 'SOME_COLLEGE_OR_UNIVERSITY':
    return 'HighSchool' # High school degree
  elif level == 'ASSOCIATE_TWO_YEAR_DEGREE' or level == 'BACHELOR_DEGREE' or level == 'SOME_POSTGRADUATE_STUDY':
    return 'AssociateOrBachelor' # Associate Degree or Bachelor’s degree
  elif level == 'MASTER_DEGREE' or level == 'DOCTORATE_OR_PHD':
    return 'MasterOrDoctor' # Master’s degree oor Doctorate's degree
  else:
    return 'UnkownEducationLevel'
  
def process_employment_status(level):
  if level == 'STUDENT_PART_TIME' or level == 'STUDENT_FULL_TIME':
    return 'STUDENT'
  elif level == 'ACTIVE_MILITARY' or level ==  'INACTIVE_MILITARY_OR_VETERAN' or level == 'HOMEMAKER_OR_STAY_AT_HOME_PARENT' or level == 'RETIRED' or 'PERMANENTLY_UNEMPLOYED_OR_DISABLED':
    return 'Out_of_WORK_FORCE'
  elif level == 'SELF_EMPLOYED_PART_TIME' or level == 'SELF_EMPLOYED_FULL_TIME' or level == 'EMPLOYED_PART_TIME' or level == 'EMPLOYED_FULL_TIME':
    return 'EMPLOYED'
  elif level == 'UNEMPLOYED':
    return 'UNEMPLOYED'
  else:
    return 'UNKNOWN_OR_OTHER'

def process_marital_status(level):
  if level == 'SEPARATED' or level == 'SINGLE' or level == 'DIVORCED' or level == 'WIDOWED':
    return 'LiveAlone'  # Live Alone
  elif level == 'DOMESTIC_PARTNERSHIP' or level ==  'MARRIED':
    return 'LiveWithPartners'  # Live with Partners
  else:
    return 'UnkownMartitalStatus'

def process_living_status(level):
  if level == 'RENT_APARTMENT_OR_CONDO' or level == 'RENT_HOME' or level == 'UNIVERSITY_RESIDENCE' or level == 'RENT':
    return 'RENTER'
  elif level == 'OWN_HOME' or level ==  'OWN_APARTMENT_OR_CONDO':
    return 'OWNER'
  elif level == 'LIVE_WITH_PARENTS_OR_RELATIVES':
    return 'WITH_FAMILY'
  else:
    return 'UNKOWN_OR_FARM'

def process_ethnicity(level):
  if level == 'ASIAN_VIETNAMESE' or level == 'ASIAN_KOREAN' or level == 'ASIAN_JAPANESE' or level == 'ASIAN_INDIAN' or level == 'ASIAN_FILIPINO' or level == 'ASIAN_CHINESE' or level == 'ASIAN_OTHER' or level ==  'ASIAN':
    return 'ASIAN'
  elif level == 'WHITE_OR_CAUCASIAN':
    return 'WHITE_OR_CAUCASIAN'
  elif level == 'BLACK_OR_AFRICAN_AMERICAN':
    return 'BLACK_OR_AFRICAN_AMERICAN'
  elif level == 'PACIFIC_ISLANDER_SAMOAN' or level == 'PACIFIC_ISLANDER_NATIVE_HAWAIIAN' or level == 'PACIFIC_ISLANDER_GUAMANIAN' or level == 'PACIFIC_ISLANDER_OTHER' or level == 'MIDDLE_EASTERN' or level == 'AMERICAN_INDIAN_OR_ALASKAN_NATIVE' or level == 'MIXED_OR_OTHER_RACE':
    return 'MIXED_OR_OTHER_RACE'
  else:
    return 'UNKNOWN'
  
def process_age(level):
  if level <= 27:
    return '<28'
  elif  level <= 40:
    return '28-40'
  elif level <= 58:
    return '41-58'
  elif level > 58:
    return '>58'
  return '28-40' 

def process_age_group_for_weighting(level):
    if level <= 10:
        return '0-10'
    elif level <= 17:
        return '11-17'
    elif level <= 20:
        return '18-20'
    elif level <= 30:
        return '21-30'
    elif level <= 40:
        return '31-40'
    elif level <= 50:
        return '41-50'
    elif level <= 60:
        return '51-60'
    elif level <= 70:
        return '61-70'
    elif level <= 80:
        return '71-80'
    elif level <= 200:
        return '81-90'
    return random.choice(['18-20', '21-30', '31-40', '41-50'])

In [0]:
# UDF functions
preprocess_income_UDF = udf(lambda z: preprocess_income(z), StringType())  
process_education_level_UDF = udf(lambda z: process_education_level(z), StringType())  
process_employment_status_UDF = udf(lambda z: process_employment_status(z), StringType())  
process_living_status_UDF = udf(lambda z: process_living_status(z), StringType())  
process_ethnicity_UDF = udf(lambda z: process_ethnicity(z), StringType())  
process_presence_of_children_UDF = udf(lambda z: process_presence_of_children(z), StringType())  
process_age_UDF = udf(lambda z: process_age(z), StringType()) 
process_age_group_for_weighting_UDF = udf(lambda x: process_age_group_for_weighting(x), StringType())

In [0]:
processed_demo_data_df = demo_data.withColumn('household_income', preprocess_income_UDF(col('household_income')))
processed_demo_data_df = processed_demo_data_df.na.fill({'age': processed_demo_data_df.select(avg('age')).collect()[0]["avg(age)"]})
processed_demo_data_df = processed_demo_data_df.withColumn('age_group', process_age_group_for_weighting_UDF(col('age')))
processed_demo_data_df = processed_demo_data_df.withColumn('age', process_age_UDF(col('age')))
processed_demo_data_df = processed_demo_data_df.withColumn('education_level', process_education_level_UDF(col('education_level')))
processed_demo_data_df = processed_demo_data_df.withColumn('employment_status', process_employment_status_UDF(col('employment_status')))
processed_demo_data_df = processed_demo_data_df.withColumn('living_status', process_living_status_UDF(col('living_status')))
processed_demo_data_df = processed_demo_data_df.withColumn('ethnicity', process_ethnicity_UDF(col('ethnicity')))
processed_demo_data_df = processed_demo_data_df.withColumn('presence_of_children', process_presence_of_children_UDF(col('number_of_children')))

dashboard_demo_data_delta_file_path = "/mnt/delta/general_data/Disqo_Dataset_Processed_Delta/dashboard_demo_data"
processed_demo_data_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(dashboard_demo_data_delta_file_path)

In [0]:
%sql
drop table if exists disqo_dashboard_db.dashboard_demo_data;
create TABLE disqo_dashboard_db.dashboard_demo_data 
USING delta
LOCATION "/mnt/delta/general_data/Disqo_Dataset_Processed_Delta/dashboard_demo_data"

# Amazon Events Data Processing

In [0]:
def get_asin_hierarchy_mapping(asin_mapping):
#   asin_dict = defaultdict(str)
#   asin_category = ""
#   for row in asin_mapping:
  hierarchy = ""
#   if asin_mapping is not None:
  hierarchy = asin_mapping.lstrip('"[""').rstrip('""]"').replace(";", ",")
  hierarchy = re.sub('\\\\n#[0-9]+ in\\xa0[a-zA-Z-&,\s]+', ' ', hierarchy)
  hierarchy = re.sub('\((.*)\)', "", hierarchy)
  hierarchy = hierarchy.strip().replace('","', ' -> ').replace(" > ", ' -> ')
  #  for simplicity, only add the longest one as the candidate
#     if len(hierarchy) > len(asin_category):
#       asin_category = hierarchy
  return hierarchy

def get_node_reference(bn_df_list):
  reference = {}
  for row in bn_df_list:
    keys = row["Name"].lower()
    value = row["Breadcrumb"].lower()
    value_regex = value.replace(' > ', ' ').replace(', ', ' ').replace(" - ", ' ')
    revised_keys = keys.replace(", ", " & ").split(" & ")
    revised_keys.append(keys)
    for key in revised_keys:
      if key not in reference:
        reference[key] = []
      reference[key].append((value, value_regex))
  return reference

def jaccard(categoery_line, refer_s):
  s1 = set(categoery_line.split(' '))
  s2 = set(refer_s.split(' '))
  score = round(len(s1.intersection(s2))*1.0 / len(s1.union(s2)), 2)
  return score

def smith_waterman_similarity(categoery_line, refer_s, n):
  score = round(smith_waterman.similarity(categoery_line, refer_s)*1.0 / n, 2)
  return score


def category_linking(asin, category, reference):
  max_score = 0
  selected_path = ''
  # 
  # if this 'category' is right in the reference key
  if category in reference:
      for path, revised_path in reference[category]:
        score = jaccard(category, revised_path)
        if score > max_score:
          max_score = score
          selected_path = path
  # this 'category' is not in the reference key
  else:
    len_category = len(category)
    for subcategory in category.split(" -> ")[::-1]:
      # check if the whole subcategory is in the reference, break if we find any one available
      if subcategory in reference:
        for path, revised_path in reference[subcategory]:
          score = jaccard(category, revised_path)
          if score > max_score:
            max_score = score
            selected_path = path
        break
      else:
        split_subcategory = subcategory.split(" ")[::-1]
      
        combined_category = ''
        # from right to left to form new candiate category key to see if it is in the reference
        for word in split_subcategory:
          combined_category = word if combined_category == '' else word + ' ' + combined_category
          # check if the combined subcategory keword is in the reference, break if we find any one available
          if (not combined_category.startswith("&")) and combined_category in reference:
            for path, revised_path in reference[combined_category]:
              score = smith_waterman_similarity(category, revised_path, len_category)
              if score > max_score:
                max_score = score
                selected_path = path
            break
      
  # create 8 columns as 8 levels for each asin 
  levels = [None, None, None, None, None, None, None, None]
  if selected_path is not None:
    for i, level in enumerate(selected_path.split(' > ')[:8]):
      levels[i] = level
        
  return asin, levels[0], levels[1], levels[2]

def process_brand_name(name):
  if name is None: return None
  regexp1 = re.compile(r"brand: ([\w\s]+)")
  regexp2 = re.compile(r"visit the ([\w\s]+) store")
  if regexp1.search(name) is not None:
    return regexp1.search(name).group(1)
  elif regexp2.search(name) is not None:
    return regexp2.search(name).group(1)
  else:
    return name

process_brand_name_udf = udf(lambda z: process_brand_name(z), StringType())  

### Import Amazon Hierarchy Category Data as Reference

In [0]:
# Aamazon Hierarchy Data
file_location = "/FileStore/tables/amazon/amazon_browsenode.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
bn_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

# keywords -> path mapping reference processed from bn_df
node_reference = get_node_reference(bn_df.collect())

### Import raw events data

In [0]:
yesterday_date = 'date=' + (datetime.now().astimezone(timezone('US/Pacific')) - timedelta(days=1)).strftime("%Y-%m-%d")

event_data_path = "/mnt/delta/raw_data/Disqo_Dataset_Synced/v2/daily-export-data/"+yesterday_date+"/*.snappy.parquet"
event_data = spark.read.option("header", "true").parquet(event_data_path)
# Manipulate timestamp
event_data_latest = event_data.withColumn("timestamp", to_timestamp(col('timestamp'), "yyyy-MM-dd HH"))
event_data_latest = event_data_latest.withColumn("date", to_date(col('timestamp')))
event_data_latest  = event_data_latest.withColumn("week_from_record", floor_(datediff(col('date'),lit('2019-01-01'))/7)+1)
event_data_latest = event_data_latest.withColumn("product_brand", process_brand_name_udf(col('product_brand')))
# # ********************************remedy part if daily update is broken sometime*********************************************
# event_data_path = "/mnt/delta/raw_data/Disqo_Dataset_Synced/v2/daily-export-data/*/*.snappy.parquet"
# event_data = spark.read.option("header", "true").parquet(event_data_path)

# # Manipulate timestamp
# event_data_latest = event_data.withColumn("timestamp", to_timestamp(col('timestamp'), "yyyy-MM-dd HH"))
# event_data_latest = event_data_latest.withColumn("date", to_date(col('timestamp')))
# event_data_latest = event_data_latest.withColumn("product_brand", process_brand_name_udf(col('product_brand')))
# event_data_latest  = event_data_latest.withColumn("week_from_record", floor_(datediff(col('date'),lit('2019-01-01'))/7)+1)
# #Only select data from the latest part for the further merging operation
# event_data_latest = event_data_latest.filter((col("timestamp") > lit("2023-01-04")))
# #***********remedy part if daily update is broken sometime*********************************************

In [0]:
disqo_us_file_path = "/mnt/delta/general_data/Disqo_Dataset_Processed_Delta/disqo_us"
disqo_us_df = event_data_latest.filter(((col('timezone')=='America/New_York')|
                                         (col('timezone')=='America/Chicago')|
                                         (col('timezone')=='America/Los_Angeles')|
                                         (col('timezone')=='America/Denver')|
                                         (col('timezone')=='America/Indianapolis')|
                                         (col('timezone')=='America/Anchorage')|
                                         (col('timezone')=='America/Boise')|
                                         (col('timezone')=='America/Indiana/Indianapolis')))

disqo_us_df.write.format("delta").mode('append').save(disqo_us_file_path)

In [0]:
%sql
drop table if exists disqo_us;
CREATE TABLE disqo_us 
USING delta
LOCATION "/mnt/delta/general_data/Disqo_Dataset_Processed_Delta/disqo_us"

In [0]:
# %sql
# drop VIEW if exists disqo_dashboard_db.disqo_us_with_demo;
# CREATE VIEW disqo_dashboard_db.disqo_us_with_demo as (
#   select * from disqo_us left join disqo_demo_data using (user_id)
# )

In [0]:
# %sql
# drop table if exists disqo_dashboard_db.disqo_us_mau_weekly;
# create table disqo_dashboard_db.disqo_us_mau_weekly as(
# with week_table as
# (
# select floor(datediff(date(timestamp),'2019-01-01')/7)+1 as week_from_record, min(date(timestamp)) as first_date_of_week
# from disqo_us
# group by floor(datediff(date(timestamp),'2019-01-01')/7)+1
# ), daily_user_table as (
# select distinct date(timestamp) as date, user_id from disqo_us
# )
# select w.week_from_record, w.first_date_of_week, count(distinct user_id) as prior_mau
# from week_table w left join daily_user_table d 
# on datediff(w.first_date_of_week, d.date) > 1 and datediff(w.first_date_of_week, d.date) <= 30
# group by w.week_from_record, w.first_date_of_week
# order by w.week_from_record
# );
# update disqo_dashboard_db.disqo_us_mau_weekly
# SET prior_mau = 40000
# WHERE week_from_record = 1

In [0]:
# %sql
# drop table if exists disqo_dashboard_db.disqo_us_mau_monthly;
# create table disqo_dashboard_db.disqo_us_mau_monthly as(
# select year(timestamp) as year, month(timestamp) as month, min(date(timestamp)) as first_date_of_month, count(distinct user_id) as mau
# from disqo_us
# group by year(timestamp), month(timestamp)
# )

num_affected_rows,num_inserted_rows


### Load reference data

In [0]:
# load the asin hierarchy mapping data
main_asin_hierarchy_mapping_path =  "/mnt/delta/general_data/Disqo_Dataset_Processed_Delta/asin_hierarchy_mapping"
main_asin_hierarchy_mapping_df = spark.read.option("header", "true").format("delta").load(main_asin_hierarchy_mapping_path)

In [0]:
expr1 = r'^[0-9A-Z]+$'
asin_hierarchy_list = event_data_latest.filter((event_data_latest.page_domain == 'amazon.com') & (event_data_latest.asin.rlike(expr1)) & (event_data_latest.category_hierarchy.isNotNull()))\
                                    .select(col("asin"), col("category_hierarchy"))\
                                       .drop_duplicates()     

### Select only new data to be processed

In [0]:
# # Only get new asin to be processed
main_asin_keys = main_asin_hierarchy_mapping_df.select("asin")
asin_keys = asin_hierarchy_list.select("asin")
new_asin = asin_keys.subtract(main_asin_keys)
print("There are {} asin records in the database".format(main_asin_keys.distinct().count()))
print("There are {} asin records for this new batch".format(asin_keys.distinct().count()))
n_new_asin = new_asin.count()
print("There are {} new asin that are to be processed".format(n_new_asin))
asin_rdd  = asin_hierarchy_list.join(new_asin, 'asin', 'inner').rdd

### Processing of category linking

In [0]:
# Do the matching process for the incremental part of the asins
asin_hierarchy = asin_rdd.map(lambda x: (x["asin"], get_asin_hierarchy_mapping(x["category_hierarchy"])))\
                            .reduceByKey(lambda x, y: x if len(x)>len(y) else y)\
                                  .map(lambda row: category_linking(row[0], row[1], node_reference))
asinSchema = StructType([       
    StructField('asin', StringType(), True),
    StructField('level_0', StringType(), True),
    StructField('level_1', StringType(), True),
    StructField('level_2', StringType(), True)
])
asin_hierarchy_mapping_df = spark.createDataFrame(asin_hierarchy, schema = asinSchema)

### Update asin - hierarchy mapping data on exisiting dataset

In [0]:
# event_data_delta_file_path = "/mnt/delta/general_data/Disqo_Dataset_Processed_Delta/amazon_with_category"
main_asin_hierarchy_mapping_df = main_asin_hierarchy_mapping_df.union(asin_hierarchy_mapping_df)
asin_hierarchy_mapping_df.write.format("delta").mode("append").save(main_asin_hierarchy_mapping_path)

### Append new batch of event data into the original event dataset

In [0]:
# temperarily processing whole bactch data
# Join this new event data with the asin hierarchy mapping to get complete new chunk of dataset
amazon_df = event_data_latest.filter(((event_data.timezone=='America/New_York')|
                                 (event_data.timezone=='America/Chicago')|
                                 (event_data.timezone=='America/Los_Angeles')|
                                 (event_data.timezone=='America/Denver')|
                                 (event_data.timezone=='America/Indianapolis')|
                                 (event_data.timezone=='America/Anchorage')|
                                 (event_data.timezone=='America/Boise')|
                                 (event_data.timezone=='America/Indiana/Indianapolis')) & (event_data.page_domain=='amazon.com'))\
                          .drop('category_categoryRanking')\
                                .dropDuplicates()
amazon_with_category_levels = amazon_df.join(main_asin_hierarchy_mapping_df, ['asin'], "left")                                                                 

In [0]:
event_data_delta_file_path = "/mnt/delta/general_data/Disqo_Dataset_Processed_Delta/amazon_with_category"
amazon_with_category_levels.write.format("delta").mode("append").save(event_data_delta_file_path)

In [0]:
%sql
drop table if exists amazon_with_hierarchy;
-- Headsup: Delete the exsisting table before creating this one
CREATE TABLE amazon_with_hierarchy
USING delta
LOCATION "/mnt/delta/general_data/Disqo_Dataset_Processed_Delta/amazon_with_category"

In [0]:
%sql
DROP VIEW if exists disqo_dashboard_db.amazon_with_hierarchy_with_demo;
CREATE VIEW disqo_dashboard_db.amazon_with_hierarchy_with_demo as (
  select * from amazon_with_hierarchy left join disqo_demo_data using (user_id)
)