In [63]:
from pyspark.sql.functions import col, lit, explode, monotonically_increasing_id, posexplode
from pyspark.sql.functions import udf,struct, collect_list
from pyspark.sql.functions import sum as fsum
import random
import numpy as np
import math
import heapq
import json
import time
from tqdm import tqdm
import gc
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType

In [64]:
#
# cose da implementare:
# nota: numerics_domains dovrebbe essere un dataframe con colonne "idx"(indice)
# "nome_numerics1"... e le colonne dei numerics devono essere ordinate 
#  dataframe con valori unici dei numerici indicizzati (v)
#  funzione per calcolare la qualità dei pattern (v)
    # modificare il dataset in modo da inserire gli indici al posto dei valori singoli (v)
    # modificare funzione di campionamento valori (v)
    # aggiungere conteggi globali delle classi (v)
#  adattare il tutto sul distribuito
    # gestire la memoria meglio
# funzione per codificare il dataset
#  funzioni per il training + grid search
#  funzioni per l'esplorazione dei dati

In [65]:
def filter_data(data, target_class, target_col): #t
    return data.filter(col(target_col) == target_class)

def seq_scout(data, data_plus,target_class, numerics_max, top_k, iterations, theta, alpha): #t
    data_support = data.count()
    class_support = data_plus.count()
    # create priority queue for patterns to be stored
    pi = PriorityQueue(k=top_k, theta=theta, cap_length=True) 
    
    # create priority queue for storing each class sequence and its UCB score
    scores = PriorityQueue(data_plus)
    #N = 1
    for N in tqdm(range(1,iterations+1)):
        _, Ni, mean_quality, sequence = scores.pop_first() # pop the sequence to be generalized
        print
        # generalize the sequence and add it to the patterns
        gen_seq, new_qual = play_arm(sequence, data, target_class, numerics_max, alpha, data_support, class_support)
        pi.add((-new_qual, to_imm_pattern(gen_seq)))
        # update the quality and put back the sequence in the priority queue
        updated_quality = (Ni * mean_quality + new_qual) / (Ni + 1)
        ucb_score = compute_ucb(updated_quality, Ni + 1, N)
        scores.add((-ucb_score, Ni + 1, updated_quality, sequence))
        
        #N += 1
    
    return pi.get_top_k() # priority queue filters automatically if theta <1

def play_arm(sequence, data, target_class, numerics_max, alpha, data_support, class_support): 
    sequence_m = mutable_seq_copy(sequence)
    # get the number of button pressed in the sequence
    tot_num_inputs = sum([len(state[0]) for state in sequence])
    # get a random number of input to be removed
    input_to_remove = random.randint(0, tot_num_inputs-1)

    for i in range(input_to_remove):
        selected_state_idx = random.randint(0, len(sequence_m)-1)
        selected_state = sequence_m[selected_state_idx][0] # we take the input itemset
        
        selected_state.remove(random.choice(list(selected_state))) # remove an element
        
        if len(selected_state) == 0: # if the state looses all the inputs, then it is removed
            sequence_m.pop(selected_state_idx)
    for _, numerics in sequence_m:
        for kind, value in numerics.items():
            # first we decide whether to remove the constraint or not
            if random.random() < alpha:
                numerics[kind] = [-float('inf'), float('inf')]
            else:              
                left_value = random.randint(0, value)
                right_value = random.randint(value, numerics_max[kind]-1)

                
                numerics[kind] = [left_value, right_value]

    # now we compute the quality measure
    quality = compute_WRAcc(data, sequence_m, target_class, data_support, class_support)

    return sequence_m, quality

def compute_ucb(score, Ni, N):
    # we choose C = 0.5
    return (score + 0.25) * 2 + 0.5 * math.sqrt(2 * math.log(N) / Ni)


def compute_WRAcc(data, subsequence, target_class, data_support, class_support): 
    # data support and class support were passed as it is useless to compute them everytime
    schema = StructType([
        StructField("sub_support", IntegerType(),False),
        StructField("sub_sup_c", IntegerType(), False)
    ])
    udf_subsequence = udf(lambda x,y,z: is_subsequence(subsequence,target_class, x, y, z), schema)
    support_data = data.select(udf_subsequence(data.input_sequence,
                                               data.enc_num_sequence,
                                              col("class")).alias("tmp")).select(fsum("tmp.sub_support").alias("sub_support"),
                                                                                 fsum("tmp.sub_sup_c").alias("sub_sup_c"))
    sums = support_data.head()
    support = sums["sub_support"]
    class_pattern_count = sums["sub_sup_c"]

    del(sums)
    del(support_data)
    try:
        class_pattern_ratio = class_pattern_count / support
    except ZeroDivisionError:
        return -0.25

    class_data_ratio = class_support / data_support
    if support>1:
        print(f"class_pattern_count {class_pattern_count}")
        print(f"support {support}")
        print(f"class_pattern_ratio {class_pattern_ratio}")
        print(f"class_data_ratio {class_data_ratio}")
    wracc = support / data_support * (class_pattern_ratio - class_data_ratio)
    return wracc

def is_subsequence(subsequence,classsub, sequence_input, sequence_num, classsuper):
    # sequence input is a list of lists of strings
    # sequence num is a list of rows
    i_sub = 0
    i_seq = 0
    while i_sub<len(subsequence) and i_seq<len(sequence_input):
        if subsequence[i_sub][0].issubset(sequence_input[i_seq]):
            if all([value >= subsequence[i_sub][1][numeric][0] and value <= subsequence[i_sub][1][numeric][1] for numeric, value in
                    sequence_num[i_seq].asDict().items()]):
                i_sub += 1
        i_seq += 1
        
    if i_sub == len(subsequence):
        is_sub = 1
    else:
        is_sub = 0
    
    if classsub is not None:
        if is_sub == 1 and classsub == classsuper:
            return (is_sub,1)
        else:
            return (is_sub,0)
    else:
        return is_sub
    
#1: function SEQSCOUT(budget)
#2: 	π ← PriorityQueue()
#3: 	scores ← PriorityQueue() # ! sfruttare i dataframe distribuiti di spark

#8: 	|while budget do 
#9: 	|	seq, qual, Ni ← scores.bestUCB()
#10: |	seqp, qualp ← PlayArm(seq) #calcolo qualità parallelizzabile
#11: |	π.add(seqp, qualp)
#12: |	scores.update(seq,Ni*qual+qualp/Ni+1 , Ni + 1)
#3: |end while # while eseguito per ogni top esempio - non parallelizzabile?
#4:  
#15: return π.topKNonRedundant() # filtering (remove similar starting from the beginning)
#16: end function

#- il filtering dei dati penso possa essere fatto automaticamente con una bella filter
#- controlla come funziona la max del DataFrame
#- possibile parallelizzazione 1 per ogni classe (a livello di container -> 7 esecutori max)
#- possibile parallelizzazione sul calcolo della metrica come map + reduce
#- priority queue con i dataframe distribuiti non ha senso, ma pi può essere implementata easy come una lista
#	che flitra automaticamente i migliori k

In [27]:
def read_dataset(path):
    DISCRETE_INPUTS = {'up', 'accelerate', 'slow', 'goal', 'left', 'boost', 'camera', 'down', 'right', 'slide', 'jump'}
    data = []
    with open(path, "r") as file:
        dict_headers = next(file).split()
        new_line = dict()
        for line in file:
            if len(line.split()) <= 1:
                if new_line:
                    data.append(new_line)
                new_line = {"input_sequence": [] ,"num_sequence":[],"class": line.strip()}
            else:
                if len(dict_headers) != len(line.split()):
                    raise ValueError('Number of data and variables do not match')

                numerics = {}
                buttons = []

                for i, value in enumerate(line.split()):
                    if dict_headers[i] in DISCRETE_INPUTS:
                        if value == '1':
                            buttons.append(dict_headers[i])
                    else:
                        numerics[dict_headers[i]] = float(value)

                #state = [buttons, numerics]
                new_line["input_sequence"].append(buttons)
                new_line["num_sequence"].append(numerics)
        data.append(new_line)
    return data


In [28]:
def get_numerics(df):
    subfields = df.schema["num_sequence"].dataType.elementType.fieldNames()
    numerics_domains = {}
    numerics_max = {}
    for c in subfields:
        field = "num_sequence." + c
        no_idx = df.select(explode(field).alias(c)).distinct().orderBy(c)
        numerics_domains[c] = no_idx.withColumn("idx", monotonically_increasing_id())
        numerics_max["idx"+c] = numerics_domains[c].count()
    return numerics_domains, numerics_max

def convert_numerics(df, numerics_domains):
    workdf = df.select(col("id").alias("_id"),posexplode("num_sequence").alias("pos","exp")).select("_id", "pos", "exp.*")
    needed_columns = [i for i in numerics_domains.keys()]
    needed_columns.append("pos")
    needed_columns.append("_id")
    for kind, unique_df in numerics_domains.items():
        print("processing " + kind + "...")
        expr1 = kind + " as _" + kind
        expr2 = "idx as idx" + kind
        workdf = workdf.join(unique_df.selectExpr(expr1, expr2), col(kind)==col("_"+kind))
        needed_columns.remove(kind)
        needed_columns.append("idx"+kind)
        workdf = workdf.select(needed_columns)
    needed_columns.remove("_id")
    needed_columns.remove("pos")
    workdf = workdf.orderBy("_id", "pos")
    workdf = workdf.groupBy("_id", "pos").agg(collect_list(struct([col(i) for i in needed_columns])).alias("enc_num_sequence"))
    return workdf.groupBy("_id").agg(collect_list(col("enc_num_sequence")[0]).alias("enc_num_sequence"))

In [29]:
def import_imm_sequence(seq):
    return tuple([tuple([frozenset(seq[0][i]), tuple(sorted(seq[1][i].asDict().items()))]) for i in range(len(seq[0]))])
def mutable_seq_copy(seq):
    copy = []
    for i in seq:
        input_set = set(i[0])
        num_dict = {j[0] : j[1] for j in i[1]}
        copy.append([input_set, num_dict])
    return copy
        
def to_imm_pattern(pattern):
    return tuple([tuple([frozenset(i[0]), tuple(sorted([(key, tuple(value)) for key, value in i[1].items()]))]) for i in
                  pattern])

In [49]:
class PriorityQueue(object):
    def __init__(self, data=None, k=1,theta=1, cap_length=False):
        self.k = k
        self.theta=theta
        self.cap_length=cap_length if k is not None else False
        if data is not None:  
            self.heap = [tuple([-float('inf'), 0, 0, import_imm_sequence((x["input_sequence"], x["enc_num_sequence"]))]) for x in data.collect()]
            heapq.heapify(self.heap)
            if cap_length and len(self.heap)>self.k:
                self.heap = heapq.nlargest(self.k, self.heap)
            self.seq_set = set([i[-1] for i in self.heap])
        else:
            self.heap = []
            self.seq_set = set()

    def add(self, elem):
        if elem[-1] not in self.seq_set:
            heapq.heappush(self.heap, elem)
            self.seq_set.add(elem[-1])
            if self.cap_length and len(self.heap)>self.k:
                self.heap = heapq.nsmallest(self.k, self.heap)
                self.seq_set = set([i[-1] for i in self.heap])
                #TODO add filtering if necessary
    def pop_first(self):
        head = heapq.heappop(self.heap)
        self.seq_set.remove(head[-1])
        return head
    
    def get_top_k(self):
        if self.theta == 1:
            return heapq.nsmallest(self.k, self.heap)
        else:
            return 0
            #TODO add filtering
    
    
    
        

In [31]:
data = read_dataset("/vagrant/rocket_league_skillshots.data")
# in case of bigger datasets, single splits could be generated on different nodes
# and after joined as single json file

In [9]:

spark = SparkSession.builder.appName("RocketLeagueFE").getOrCreate()
with open("source.json", "w") as s:
    s.write(json.dumps(data))

23/02/07 21:35:15 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [10]:
df = spark.read.format("json").load("source.json")

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [11]:
df.printSchema()
df.show()

root
 |-- class: string (nullable = true)
 |-- input_sequence: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- num_sequence: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- BallAcceleration: double (nullable = true)
 |    |    |-- BallSpeed: double (nullable = true)
 |    |    |-- DistanceBall: double (nullable = true)
 |    |    |-- DistanceCeil: double (nullable = true)
 |    |    |-- DistanceWall: double (nullable = true)
 |    |    |-- PlayerSpeed: double (nullable = true)
 |    |    |-- Time: double (nullable = true)

+-----+--------------------+--------------------+
|class|      input_sequence|        num_sequence|
+-----+--------------------+--------------------+
|    6|[[right, jump], [...|[{1636.7987723122...|
|   -1|[[boost, right, j...|[{0.0, 33685.8395...|
|   -1|[[right, jump], [...|[{124246.29375405...|
|   -1|[[right, slide, j...|[{-8210.634011562...|
|

In [12]:
df = df.withColumn("id", monotonically_increasing_id())
numerics_domains, numerics_max = get_numerics(df)
encoded_numerics = convert_numerics(df, numerics_domains)
dfj = df.join(encoded_numerics, col("id")==col("_id")).select("id","input_sequence" ,"enc_num_sequence", "class")
dfj.printSchema()

processing BallAcceleration...
processing BallSpeed...
processing DistanceBall...
processing DistanceCeil...
processing DistanceWall...
processing PlayerSpeed...
processing Time...
root
 |-- id: long (nullable = false)
 |-- input_sequence: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- enc_num_sequence: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- idxBallAcceleration: long (nullable = false)
 |    |    |-- idxBallSpeed: long (nullable = false)
 |    |    |-- idxDistanceBall: long (nullable = false)
 |    |    |-- idxDistanceCeil: long (nullable = false)
 |    |    |-- idxDistanceWall: long (nullable = false)
 |    |    |-- idxPlayerSpeed: long (nullable = false)
 |    |    |-- idxTime: long (nullable = false)
 |-- class: string (nullable = true)



In [13]:
encoded_numerics.printSchema()

root
 |-- _id: long (nullable = false)
 |-- enc_num_sequence: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- idxBallAcceleration: long (nullable = false)
 |    |    |-- idxBallSpeed: long (nullable = false)
 |    |    |-- idxDistanceBall: long (nullable = false)
 |    |    |-- idxDistanceCeil: long (nullable = false)
 |    |    |-- idxDistanceWall: long (nullable = false)
 |    |    |-- idxPlayerSpeed: long (nullable = false)
 |    |    |-- idxTime: long (nullable = false)



In [36]:
print(numerics_max)

{'idxBallAcceleration': 5747, 'idxBallSpeed': 5958, 'idxDistanceBall': 6762, 'idxDistanceCeil': 3631, 'idxDistanceWall': 5721, 'idxPlayerSpeed': 5942, 'idxTime': 5903}


5400

Exception in thread "serve-DataFrame" java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.socketAccept(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:565)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:533)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:64)


In [66]:
seq_scout(dfj, filter_data(dfj, "1", "class"),"1", numerics_max, 30, 1000, 1, 0.5)

  6%|█████▏                                                                           | 64/1000 [02:12<32:48,  2.10s/it]ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark-3.3.1-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark-3.3.1-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt
  6%|█████▏                                                                           | 64/1000 [02:13<32:34,  2.09s/it]
                                                                                

KeyboardInterrupt: 