In [20]:
import json
import ipaddress

from pyspark.sql import SparkSession
from pyspark.sql.functions import (col, lit, max as colmax, min as colmin, split, concat, date_format,
                                   to_timestamp, to_date, regexp_extract, when, udf, size)
from pyspark.sql.types import StructType, StructField, IntegerType
from datetime import datetime, timedelta
from delta import DeltaTable, configure_spark_with_delta_pip

In [3]:
builder = (
    SparkSession
    .builder
    .master("spark://spark-master:7077")
    .config("spark.jars", "/jars/postgresql-42.5.0.jar,/jars/delta-core_2.12-1.0.0.jar")
    .config("spark.sql.warehouse.dir", "/mnt/warehouse")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)
    
spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [4]:
table_path = "/mnt/g_layer/sales"
schema_path = "/mnt/schemas/sales.json"
file = open(schema_path, "r")
schema = json.load(file)

table_exists = DeltaTable.isDeltaTable(spark, table_path)

if not table_exists:
    builder = (
        DeltaTable
        .createOrReplace(spark)
        .partitionedBy("year_partition", "month_partition")
        .location(f"{table_path}")
    )
    for metadata in schema["fields"]:
        builder = builder.addColumn(metadata["name"], metadata["type"])

    builder.execute()

                                                                                

In [7]:
# table_exists = False

In [5]:
date = datetime.today().strftime("%Y-%m") + "-01"
month = date[5:7]
year = date[0:4]

credentials = {"user":"docker", "password": "docker"}

orders_predicates = ["1 = 1"]
if table_exists:
    orders_predicates = [f"date(OrderDate) >= '{date}'"]

# Event oriented tables (constant changes over time):

df_orders = (
    spark
    .read
    .option("driver", "org.postgresql.Driver")
    .jdbc(
        url = "jdbc:postgresql://postgres-b2b/b2b-platform",
        table = "orders",
        predicates = orders_predicates,
        properties = credentials
    )
)

row = df_orders.select(colmin("OrderId"), colmax("OrderId")).collect()

minOrderId = row[0][0]
maxOrderId = row[0][1] + 1

df_orderitems = (
    spark
    .read
    .option("driver", "org.postgresql.Driver")
    .jdbc(
        url = "jdbc:postgresql://postgres-b2b/b2b-platform",
        table = "orderitems",
        column = "orderId",
        numPartitions = 1,
        lowerBound = minOrderId,
        upperBound = maxOrderId,
        properties = credentials
    )
)

                                                                                

In [6]:
# Tables with domain values (not constant changes over time):

df_catalog = (
    spark
    .read
    .option("driver", "org.postgresql.Driver")
    .jdbc(
        url = "jdbc:postgresql://postgres-b2b/b2b-platform",
        table = "catalog",
        properties = credentials
    )
)

In [7]:
cols = [
    col("orderItemId").cast("integer"),
    col("orderId").cast("integer"),
    col("price"),
    col("orderDate"),
    date_format(col("orderDate"), "yyyy").alias("year_partition"),
    date_format(col("orderDate"), "MM").alias("month_partition")
]

df = (
    df_orderitems.alias("orderitems")
    .join(df_orders.alias("order"), on = "OrderId", how = "left")
    .join(df_catalog.alias("catalog"), on = "CatalogId", how = "left")
    .select(*cols)
    .orderBy(col("orderItemId").asc())
)

In [8]:
df.toPandas()

                                                                                

Unnamed: 0,orderItemId,orderId,price,orderDate,year_partition,month_partition
0,1,1,300.5,2016-12-07 18:07:39,2016,12
1,2,1,990.2,2016-12-07 18:07:39,2016,12
2,3,1,908.2,2016-12-07 18:07:39,2016,12
3,4,1,584.9,2016-12-07 18:07:39,2016,12
4,5,1,212.5,2016-12-07 18:07:39,2016,12
...,...,...,...,...,...,...
29185,29186,5321,174.5,2022-09-24 22:22:47,2022,09
29186,29187,5321,902.2,2022-09-24 22:22:47,2022,09
29187,29188,5321,1018.5,2022-09-24 22:22:47,2022,09
29188,29189,5322,776.2,2022-09-12 00:51:56,2022,09


In [10]:
df_disc = DeltaTable.forPath(spark, table_path)

condition = """
    disc.OrderItemId = delta.OrderItemId
"""

(
    df_disc.alias("disc")
    .merge(df.alias("delta"), condition)
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

                                                                                

In [11]:
table_path_devices = "/mnt/g_layer/devices"
schema_path_devices = "/mnt/schemas/devices.json"
file = open(schema_path_devices, "r")
schema_devices = json.load(file)

table_exists = DeltaTable.isDeltaTable(spark, table_path_devices)

if not table_exists:
    builder = (
        DeltaTable
        .createOrReplace(spark)
        .location(f"{table_path_devices}")
    )
    for metadata in schema_devices["fields"]:
        builder = builder.addColumn(metadata["name"], metadata["type"])

    builder.execute()

                                                                                

In [12]:
table_path_products = "/mnt/g_layer/products"
schema_path_products = "/mnt/schemas/products.json"
file = open(schema_path_products, "r")
schema_products = json.load(file)

table_exists = DeltaTable.isDeltaTable(spark, table_path_products)

if not table_exists:
    builder = (
        DeltaTable
        .createOrReplace(spark)
        .location(f"{table_path_products}")
    )
    for metadata in schema_products["fields"]:
        builder = builder.addColumn(metadata["name"], metadata["type"])

    builder.execute()

                                                                                

In [13]:
path_logs = "/mnt/landing/weblogs"
if table_exists:
    path_logs = f"/mnt/landing/weblogs/year={year}/month={month}"
    
schema_path_weblogs = "/mnt/schemas/weblogs-schema.json"
file = open(schema_path_weblogs, "r")
dict_schema = json.load(file)
schema = StructType.fromJson(dict_schema)

In [14]:
ipLength = udf(lambda x: int(ipaddress.ip_address(x)), IntegerType())

df_ipkeyslist = (
    spark
    .read
    .option("driver", "org.postgresql.Driver")
    .jdbc(
        url = "jdbc:postgresql://postgres-b2b/b2b-platform",
        table = "ipsrangelist",
        properties = credentials
    )
    .withColumn("ipNumbers", split(col("iprange"), "\."))
    .select(col("*"), concat(col("ipnumbers")[0], col("ipnumbers")[1]).alias("ipKey"), split(col("iprange"), "-").alias("ipboundary"))
    .distinct()
)

In [15]:
cols = [
    col("logs.host").alias("HostIp"),
    col("ips.country").alias("Country"),
    col("logs.username").alias("Username"),
    to_timestamp(concat(col("logs.date"), lit(" "), col("logs.time")), "yyyy-MM-dd HH:mm:ss").alias("VisitedAt"),
    when(col("logs.PageType") == "products", lit("product")).when(col("logs.PageType") == "orders", lit("order")).alias("PageType"),
    split(col("logs.referer"), "/").getItem(4).alias("Id"),
    when(
        col("logs.DeviceSpec").getItem(1).isNotNull(), col("logs.DeviceSpec").getItem(size("DeviceSpec") - 1)
    ).otherwise(col("logs.DeviceSpec").getItem(0)).alias("Device"),
    date_format(col("date"), "yyyy").alias("year_partition"),
    date_format(col("date"), "MM").alias("month_partition")
]

df_weblogs = (
    spark
    .read
    .csv(path_logs, sep = " ", schema = schema)
    .withColumn("pageType", split(col("referer"), "/").getItem(3))
    .withColumn("DeviceSpec", split(regexp_extract(col("useragent"), "\\((.*?)\\)", 1), ";"))
    .withColumn("ipNumbers", split(col("host"), "\."))
    .select(col("*"), concat(col("ipnumbers")[0], col("ipnumbers")[1]).alias("ipKey")).alias("logs")
    .join(df_ipkeyslist.alias("ips"), on = "ipKey", how = "inner")
    .filter(ipLength(col("Host")).between(ipLength(col("IpBoundary")[0]), ipLength(col("IpBoundary")[1])))
    .select(*cols)
)

In [16]:
df_weblogs.toPandas()

                                                                                

Unnamed: 0,HostIp,Country,Username,VisitedAt,PageType,Id,Device,year_partition,month_partition
0,8.24.242.146,brazil,unknown,2018-01-23 21:48:15,product,1,x64,2018,01
1,8.24.242.43,brazil,customer.j,2018-01-23 21:48:15,order,1013,rv:31.0,2018,01
2,8.24.242.43,brazil,customer.j,2018-01-23 21:18:47,product,6,rv:31.0,2018,01
3,8.24.242.43,brazil,customer.j,2018-01-23 21:45:33,product,5,rv:31.0,2018,01
4,8.24.242.43,brazil,customer.j,2018-01-23 21:32:52,product,24,rv:31.0,2018,01
...,...,...,...,...,...,...,...,...,...
38295,67.73.184.171,colombia,customer.q,2021-04-05 09:17:14,product,13,X11,2021,04
38296,67.73.184.171,colombia,customer.q,2021-04-05 09:32:12,product,8,X11,2021,04
38297,67.73.184.171,colombia,customer.q,2021-04-05 09:25:53,product,7,X11,2021,04
38298,67.73.184.171,colombia,customer.q,2021-04-05 09:13:49,product,3,X11,2021,04


In [17]:
cols = [
    col("Username"),
    col("VisitedAt"),
    col("Id").alias("OrderId"),
    col("Device")
]

df_logs_devices = df_weblogs.filter(col("PageType") == "order").select(*cols)

In [19]:
df_disc = DeltaTable.forPath(spark, table_path_devices)

condition = """
    disc.OrderId = delta.OrderId
"""

(
    df_disc.alias("disc")
    .merge(df_logs_devices.alias("delta"), condition)
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

                                                                                

In [None]:
df_products = (
    spark
    .read
    .option("driver", "org.postgresql.Driver")
    .jdbc(
        url = "jdbc:postgresql://postgres-b2b/b2b-platform",
        table = "products",
        properties = credentials
    )
)

cols = [
    col("HostIp"),
    col("Country"),
    col("VisitedAt"),
    col("Id").alias("ProductId"),
    col("ProductName")    
]

df_logs_products = ( 
    df_weblogs.alias("logs")
    .filter(col("PageType") == "product")
    .join(df_products.alias("prod"), on = col("logs.Id") == col("prod.ProductId"))
    .select(*cols)
    .distinct()
)

In [None]:
df_disc = DeltaTable.forPath(spark, table_path_products)

condition = """
    disc.HostIp = delta.HostIp
    and disc.VisitedAt = delta.VisitedAt
    and disc.ProductId = delta.ProductId
"""

(
    df_disc.alias("disc")
    .merge(df_logs_products.alias("delta"), condition)
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)