# Load data into DeltaLake

## Includes

In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Column
from pyspark.sql import Row
from delta import *
import pyspark.sql.functions as F
import pyspark.sql.types as T
import pyspark.sql.utils
import datetime as D

import automate.schema as ts
import include.stdhll as H
from math import floor
import sys


# replace with your path
parquet_src_path = "s3a://example-2024/type=histogram/"


## Function: Load data in parquet files from S3

In [3]:
def load_data_5m(bpath, starttime):

    periodicity = 300
    maxint64 = 9223372036854775807
    minint64 = -9223372036854775808
    
    try:
        min = starttime.minute
        if min not in [0,5,10,15,20,25,30,35,40,45,50,55,60]:
            raise ValueError
        time_from = starttime.isoformat('T', 'milliseconds')
        time_to = (starttime + D.timedelta(seconds=periodicity)).isoformat(' ', 'milliseconds')
    except ValueError:
        print(f"The start time {repr(starttime)} isn't valid")
        return False

    files = []
    dt = starttime - D.timedelta(minutes=10)  

    
    for idx in range(25):
        yy = dt.year
        mm = dt.month
        dd = dt.day
        hh = dt.hour
        min = dt.minute
        p = f"{bpath}year={yy:4}/month={mm:02}/day={dd:02}/hour={hh:02}/minute={min:02}/second=*/creator=*/id=*"
        hpath = sc._jvm.org.apache.hadoop.fs.Path(p)
        fs = hpath.getFileSystem(sc._jsc.hadoopConfiguration())
        if len(fs.globStatus(hpath)) > 0:
            files.append(p)
        dt = dt + D.timedelta(minutes=1)

    if not files:
        return False
                      
        
    df0 = spark.read\
               .format("parquet")\
               .option("basePath", bpath)\
               .option("mergeSchema", "true")\
               .option("ignoreCorruptFiles", "true")\
               .option("ignoreMissingFiles", "true")\
               .load(files)

    if df0.isEmpty():
        return False

    # MAP
    # Collect all data for the relevant timeframe
    df1 = df0.where((F.col("start_time") >= F.lit(f"{time_from}")) & 
                    (F.col("start_time") < F.lit(f"{time_to}")))\
            .select(
                
                # FQDN from labels
                F.concat_ws(
                    '',
                    F.concat(F.col("label9"), F.lit('.')),
                    F.concat(F.col("label8"), F.lit('.')),
                    F.concat(F.col("label7"), F.lit('.')),
                    F.concat(F.col("label6"), F.lit('.')),
                    F.concat(F.col("label5"), F.lit('.')),
                    F.concat(F.col("label4"), F.lit('.')),
                    F.concat(F.col("label3"), F.lit('.')),
                    F.concat(F.col("label2"), F.lit('.')),
                    F.concat(F.col("label1"), F.lit('.')),
                    F.concat(F.col("label0"), F.lit('.'))
                ).alias("fqdn"),

                # Include Creator from path
                F.col("creator").cast(T.StringType()).alias("creator"),

                # Labels
                F.col("label0").cast(T.StringType()).alias("l0"),
                F.col("label1").cast(T.StringType()).alias("l1"),
                F.col("label2").cast(T.StringType()).alias("l2"),
                F.col("label3").cast(T.StringType()).alias("l3"),
                F.col("label4").cast(T.StringType()).alias("l4"),
                F.col("label5").cast(T.StringType()).alias("l5"),
                F.col("label6").cast(T.StringType()).alias("l6"),
                F.col("label7").cast(T.StringType()).alias("l7"),
                F.col("label8").cast(T.StringType()).alias("l8"),
                F.col("label9").cast(T.StringType()).alias("l9"),

                # Counts
                F.col("a_count").cast(T.IntegerType()).alias("count_a"),
                F.col("aaaa_count").cast(T.IntegerType()).alias("count_aaaa"),
                F.col("mx_count").cast(T.IntegerType()).alias("count_mx"),
                F.col("ns_count").cast(T.IntegerType()).alias("count_ns"),
                F.col("other_type_count").cast(T.IntegerType()).alias("count_other_type"),
                F.col("non_in_count").cast(T.IntegerType()).alias("count_non_in"),
                F.col("ok_count").cast(T.IntegerType()).alias("count_ok"),
                F.col("nx_count").cast(T.IntegerType()).alias("count_nx"),
                F.col("fail_count").cast(T.IntegerType()).alias("count_fail"),
                F.col("other_rcode_count").cast(T.IntegerType()).alias("count_other_rcode"),
                F.col("v4client_count"),
                F.col("v6client_count"),

                # Approximate v4 client count
                H.hll_cardinality(
                    F.col("v4client_count")
                ).cast(T.IntegerType()).alias("v4clients"),

                # Approximate v6 client count
                H.hll_cardinality(
                    F.col("v6client_count")
                ).cast(T.IntegerType()).alias("v6clients"),

                # Sum of client counts
                # Fix for off-by-one
                F.try_add(
                    F.try_add(
                        F.col("v4clients"),
                        F.col("v6clients")
                    ),
                    F.lit(-1)
                ).cast(T.IntegerType()).alias("clients"),

                # OK status by clients
                F.try_divide(
                    F.col("count_ok"),
                    F.col("clients")
                ).cast(T.IntegerType()).alias("ok_pc"),

                # NXDOMAIN by clients
                F.try_divide(
                    F.col("count_nx"), 
                    F.col("clients")
                ).cast(T.IntegerType()).alias("nx_pc"),   
                
                # SERVFAIL by clients
                F.try_divide(
                    F.col("count_fail"), 
                    F.col("clients")
                ).cast(T.IntegerType()).alias("fail_pc"),

                # SERVFAIL by clients
                F.try_divide(
                    F.col("count_other_rcode"), 
                    F.col("clients")
                ).cast(T.IntegerType()).alias("o_rcode_pc"),

                
                # Other query types by clients
                F.try_divide(
                    F.col("count_other_type"), 
                    F.col("clients")
                ).cast(T.IntegerType()).alias("o_type_pc"),

                # Other query class by clients
                F.try_divide(
                    F.col("count_non_in"), 
                    F.col("clients")
                ).cast(T.IntegerType()).alias("non_in_pc"),

                
                F.col("v4clients").cast(T.IntegerType()).alias("v4_cc"), 
                F.col("v6clients").cast(T.IntegerType()).alias("v6_cc"),

                F.try_subtract(
                    F.col("start_time"),
                    F.try_to_timestamp(F.lit(f"{time_from}"))
                ).cast(T.IntegerType()).alias("ts_delta"),
                
                F.struct(
                    F.col("ts_delta"), 
                    F.col("ok_pc"),
                    F.col("nx_pc"),
                    F.col("fail_pc"),
                    F.col("o_rcode_pc"),
                    F.col("o_type_pc"),
                    F.col("non_in_pc"),
                    F.col("v4_cc"),
                    F.col("v6_cc")
               
                ).alias("timedelta"),
                
                # Generate string with single character TAGS
                # Fix for uint64 tag field
                # Check for bit 63
                F.when(
                    F.col("edm_status_bits") > maxint64,
                    True
                ).otherwise(False).alias("bit63"),

                # Ignore negative numbers and fit to 63 bits
                F.when(
                    F.col("edm_status_bits") < 0, 
                    F.lit(0)
                ).when(
                    F.col("edm_status_bits") > maxint64, 
                    F.try_add(
                        F.col("edm_status_bits"), 
                        F.lit(minint64))
                ).otherwise(F.col("edm_status_bits")).cast(T.LongType()).alias("tags"),

                # Generate string representation of tag field
                F.concat_ws('',
                    F.when(F.col("tags").bitwiseAND(pow(2, 0)) != 0, 'A'),
                    F.when(F.col("tags").bitwiseAND(pow(2, 1)) != 0, 'B'),
                    F.when(F.col("tags").bitwiseAND(pow(2, 2)) != 0, 'C'),
                    F.when(F.col("tags").bitwiseAND(pow(2, 3)) != 0, 'D'),
                    F.when(F.col("tags").bitwiseAND(pow(2, 4)) != 0, 'E'),
                    F.when(F.col("tags").bitwiseAND(pow(2, 5)) != 0, 'F'),
                    F.when(F.col("tags").bitwiseAND(pow(2, 6)) != 0, 'G'),
                    F.when(F.col("tags").bitwiseAND(pow(2, 7)) != 0, 'H'),
                    F.when(F.col("tags").bitwiseAND(pow(2, 8)) != 0, 'I'),
                    F.when(F.col("tags").bitwiseAND(pow(2, 9)) != 0, 'J'),
                    F.when(F.col("tags").bitwiseAND(pow(2,10)) != 0, 'K'),
                    F.when(F.col("tags").bitwiseAND(pow(2,11)) != 0, 'L'),
                    F.when(F.col("tags").bitwiseAND(pow(2,12)) != 0, 'M'),
                    F.when(F.col("tags").bitwiseAND(pow(2,13)) != 0, 'N'),
                    F.when(F.col("tags").bitwiseAND(pow(2,14)) != 0, 'O'),
                    F.when(F.col("tags").bitwiseAND(pow(2,15)) != 0, 'P'),
                    F.when(F.col("tags").bitwiseAND(pow(2,16)) != 0, 'Q'),
                    F.when(F.col("tags").bitwiseAND(pow(2,17)) != 0, 'R'),
                    F.when(F.col("tags").bitwiseAND(pow(2,18)) != 0, 'S'),
                    F.when(F.col("tags").bitwiseAND(pow(2,19)) != 0, 'T'),
                    F.when(F.col("tags").bitwiseAND(pow(2,20)) != 0, 'U'),
                    F.when(F.col("tags").bitwiseAND(pow(2,21)) != 0, 'V'),
                    F.when(F.col("tags").bitwiseAND(pow(2,22)) != 0, 'W'),
                    F.when(F.col("tags").bitwiseAND(pow(2,23)) != 0, 'X'),
                    F.when(F.col("tags").bitwiseAND(pow(2,24)) != 0, 'Y'),
                    F.when(F.col("tags").bitwiseAND(pow(2,25)) != 0, 'Z'),
                    F.when(F.col("tags").bitwiseAND(pow(2,26)) != 0, 'a'),
                    F.when(F.col("tags").bitwiseAND(pow(2,27)) != 0, 'b'),
                    F.when(F.col("tags").bitwiseAND(pow(2,28)) != 0, 'c'),
                    F.when(F.col("tags").bitwiseAND(pow(2,29)) != 0, 'd'),
                    F.when(F.col("tags").bitwiseAND(pow(2,30)) != 0, 'e'),
                    F.when(F.col("tags").bitwiseAND(pow(2,31)) != 0, 'f'),
                    F.when(F.col("tags").bitwiseAND(pow(2,32)) != 0, 'g'),
                    F.when(F.col("tags").bitwiseAND(pow(2,33)) != 0, 'h'),
                    F.when(F.col("tags").bitwiseAND(pow(2,34)) != 0, 'i'),
                    F.when(F.col("tags").bitwiseAND(pow(2,35)) != 0, 'j'),
                    F.when(F.col("tags").bitwiseAND(pow(2,36)) != 0, 'k'),
                    F.when(F.col("tags").bitwiseAND(pow(2,37)) != 0, 'l'),
                    F.when(F.col("tags").bitwiseAND(pow(2,38)) != 0, 'm'),
                    F.when(F.col("tags").bitwiseAND(pow(2,39)) != 0, 'n'),
                    F.when(F.col("tags").bitwiseAND(pow(2,40)) != 0, 'o'),
                    F.when(F.col("tags").bitwiseAND(pow(2,41)) != 0, 'p'),
                    F.when(F.col("tags").bitwiseAND(pow(2,42)) != 0, 'q'),
                    F.when(F.col("tags").bitwiseAND(pow(2,43)) != 0, 'r'),
                    F.when(F.col("tags").bitwiseAND(pow(2,44)) != 0, 's'),
                    F.when(F.col("tags").bitwiseAND(pow(2,45)) != 0, 't'),
                    F.when(F.col("tags").bitwiseAND(pow(2,46)) != 0, 'u'),
                    F.when(F.col("tags").bitwiseAND(pow(2,47)) != 0, 'v'),
                    F.when(F.col("tags").bitwiseAND(pow(2,48)) != 0, 'w'),
                    F.when(F.col("tags").bitwiseAND(pow(2,49)) != 0, 'x'),
                    F.when(F.col("tags").bitwiseAND(pow(2,50)) != 0, 'y'),
                    F.when(F.col("tags").bitwiseAND(pow(2,51)) != 0, 'z'),
                    F.when(F.col("tags").bitwiseAND(pow(2,52)) != 0, '0'),
                    F.when(F.col("tags").bitwiseAND(pow(2,53)) != 0, '1'),
                    F.when(F.col("tags").bitwiseAND(pow(2,54)) != 0, '2'),
                    F.when(F.col("tags").bitwiseAND(pow(2,55)) != 0, '3'),
                    F.when(F.col("tags").bitwiseAND(pow(2,56)) != 0, '4'),
                    F.when(F.col("tags").bitwiseAND(pow(2,57)) != 0, '5'),
                    F.when(F.col("tags").bitwiseAND(pow(2,58)) != 0, '6'),
                    F.when(F.col("tags").bitwiseAND(pow(2,59)) != 0, '7'),
                    F.when(F.col("tags").bitwiseAND(pow(2,60)) != 0, '8'),
                    F.when(F.col("tags").bitwiseAND(pow(2,61)) != 0, '9'),
                    F.when(F.col("tags").bitwiseAND(pow(2,62)) != 0, '+'),
                    F.when(F.col("bit63") == True, '/'),
                ).alias("tagstring")
            ).drop("tags").drop("clients").drop("ok_pc").drop("nx_pc")\
             .drop("fail_pc").drop("v4_cc").drop("v6_cc")
    
    if df1.isEmpty():
        return False

    # return df1
    
    yy = starttime.year
    mm = starttime.month
    dd = starttime.day
    hh = starttime.hour
    min = starttime.minute

    # REDUCE
    # Aggregate to 5-minute chunks
    df2 = df1.groupBy(
                F.col("fqdn"), 
                F.col("creator"), 
                F.col("tagstring")
            ).agg(
                # Create date and time
                F.make_date(
                    F.lit(f"{yy:4}"),
                    F.lit(f"{mm:02}"),
                    F.lit(f"{dd:02}")
                ).cast(T.DateType()).alias("date"),
                F.lit(hh).cast(T.ByteType()).alias("hour"),
                F.lit(min).cast(T.ByteType()).alias("minute"),                   

                # Labels
                # These *should* all be the same, take last
                F.last(F.col("l0")).alias("label0"), 
                F.last(F.col("l1")).alias("label1"), 
                F.last(F.col("l2")).alias("label2"), 
                F.last(F.col("l3")).alias("label3"), 
                F.last(F.col("l4")).alias("label4"), 
                F.last(F.col("l5")).alias("label5"), 
                F.last(F.col("l6")).alias("label6"), 
                F.last(F.col("l7")).alias("label7"), 
                F.last(F.col("l8")).alias("label8"), 
                F.last(F.col("l9")).alias("label9"),                  

                # Counts
                F.sum(F.col("count_a")).alias("a_count"),
                F.sum(F.col("count_aaaa")).alias("aaaa_count"),
                F.sum(F.col("count_mx")).alias("mx_count"),
                F.sum(F.col("count_ns")).alias("ns_count"),
                F.sum(F.col("count_other_type")).alias("other_type_count"),
                F.sum(F.col("count_non_in")).alias("non_in_count"),
                F.sum(F.col("count_ok")).alias("ok_count"),
                F.sum(F.col("count_nx")).alias("nx_count"),
                F.sum(F.col("count_fail")).alias("fail_count"),     
                F.sum(F.col("count_other_rcode")).alias("other_rcode_count"),

                # HLL sketches
                H.hll_merge(F.col("v4client_count")).alias("v4clients_hll"),
                H.hll_merge(F.col("v6client_count")).alias("v6clients_hll"),

                # Calculate approximations here, for reasons
                H.hll_cardinality(
                    F.col("v4clients_hll")
                ).cast(T.IntegerType()).alias("v4clients_count"),
                H.hll_cardinality(
                    F.col("v6clients_hll")
                ).cast(T.IntegerType()).alias("v6clients_count"),

                # Return delta array
                F.array_sort(
                    F.collect_list(
                        F.col("timedelta")
                    )
                ).alias("deltas"),

            )


    if df2.isEmpty():
        return False

    df3 = df2.select(
                F.col("date"),
                F.col("creator"),
                F.col("label0"),
                F.col("label1"),
                F.col("label2"),
                F.col("label3"),
                F.col("label4"),
                F.col("label5"),
                F.col("label6"),
                F.col("label7"),
                F.col("label8"),
                F.col("label9"),
                F.col("hour"),
                F.col("minute"),
                F.col("tagstring"),
                F.col("fqdn"),

                # R_FQDN from labels
                F.concat_ws(
                    '',
                    F.concat(F.col("label0"), F.lit('.')),
                    F.concat(F.col("label1"), F.lit('.')),
                    F.concat(F.col("label2"), F.lit('.')),
                    F.concat(F.col("label3"), F.lit('.')),
                    F.concat(F.col("label4"), F.lit('.')),
                    F.concat(F.col("label5"), F.lit('.')),
                    F.concat(F.col("label6"), F.lit('.')),
                    F.concat(F.col("label7"), F.lit('.')),
                    F.concat(F.col("label8"), F.lit('.')),
                    F.concat(F.col("label9"), F.lit('.'))
                ).alias("r_fqdn"),

                # IDN FQDN
                # Placeholder for UDF
                F.lit(None).cast(T.StringType()).alias("idn_fqdn"),

                # Counters
                F.col("a_count"),
                F.col("aaaa_count"),
                F.col("mx_count"),
                F.col("ns_count"),
                F.col("other_type_count"),
                F.col("non_in_count"),
                F.col("ok_count"),
                F.col("nx_count"),
                F.col("fail_count"),
                F.col("other_rcode_count"),
                 
                # Slopes for counts per user
                #F.col("deltas"),
                F.col("deltas.ts_delta").alias("deltas"),
                F.col("deltas.ok_pc").alias("ok_per_client"),
                F.col("deltas.nx_pc").alias("nx_per_client"),
                F.col("deltas.fail_pc").alias("fail_per_client"),
                F.col("deltas.o_rcode_pc").alias("other_rcode_per_client"),
                F.col("deltas.o_type_pc").alias("other_type_per_client"),
                F.col("deltas.non_in_pc").alias("non_in_per_client"),
                F.col("deltas.v4_cc").alias("v4_clients"),
                F.col("deltas.v6_cc").alias("v6_clients"),
                
                # HLL sketches
                F.col("v4clients_hll"),
                F.col("v6clients_hll"),

                # Client count approximations
                F.col("v4clients_count"),
                F.col("v6clients_count"),
        
            )

    if df3.isEmpty():
        return False
        
    return df3


## Function: Run aggregation

In [4]:
def run_aggregation_5m(time_from, src_path, overwrite=False):
    
    dt = time_from
        
    count = 0
    df = load_data_5m(src_path, dt)
    
    try:

        if df:    
            count = df.count()
            print(f"count: {count}")

            df.show()
            df.printSchema()
        else:
            print("No data loaded")
            count = 0
    
    except Exception as e:        
        print("No data")    
                

## Main

In [None]:
if __name__ == '__main__':
      
    # Initiate spark
    spark = SparkSession.builder \
            .appName('foo') \
            .getOrCreate()
    
    H.register_hll_functions(spark)
    spark.conf.set("spark.databricks.delta.optimize.preserveInsertionOrder", "false")
    sc = spark.sparkContext
                
    # Get current time and round to previous 5 min chuck 
    t5m = D.timedelta(minutes=5)
    start_time = (D.datetime.min + ((D.datetime.now() - D.datetime.min) // t5m) * t5m) - D.timedelta(minutes=10)
    
    try:             
        run_aggregation_5m(
                start_time, 
                parquet_src_path,                 
           )

    except Exception as e:
        print(f"fail: Load failed:", start_time)
     