In [12]:
import ray
from collections import Counter
import time


print("The top 10 customers are ...")

ds = ray.data.read_csv("../min-sales-data.csv")


"""
Function, welche die Purchases per Customer zählt
"""
@ray.remote
def count_orders_per_customers(s):
    counter = Counter()
    customer_order = None
    for record in s.iter_rows():
        if customer_order == None:
            customer_order = (record["CustomerId"], record["OrderId"])
        elif customer_order[1] != record["OrderId"]:
            counter.update({customer_order[0]: 1})
            customer_order = (record["CustomerId"], record["OrderId"])
        
    counter.update({customer_order[0]: 1})
    return counter


"""Function, welche Counter addiert"""
@ray.remote
def add_counters(fst, snd):
    counter1 = Counter(fst)
    counter2 = Counter(snd)
    return counter1 + counter2


"""
Function, welche die Top 10 Customer findet
"""
@ray.remote
def find_top_10_customers(customers):
    counter = Counter(customers)
    return counter.most_common(10)

start = time.time()

"""
Split the input dataset into 12 parts to be processed in parallel
"""
def splitInput(data):
    split_indices = []
    n = 12
    for i in range(1,n):
        split_indices.append(int(i*data.count()/n))
    return ds.split_at_indices(split_indices)


worker_counts = []
for split in splitInput(ds):
    x = count_orders_per_customers.remote(split)
    worker_counts.append(x)
    
for cnt in worker_counts:
    

while len(worker_counts) > 1:
    worker_counts = worker_counts[2:] + [add_counters.remote(worker_counts[0], worker_counts[1])]

#print(ray.get(customers))
ys = find_top_10_customers.remote(worker_counts[0])
print("Getting")
ys = ray.get(ys)

print(ys)


print(ds.count())

print('duration: ',time.time() - start)

IndentationError: expected an indented block (<ipython-input-12-acfc20f5d632>, line 66)

In [16]:
import ray
from collections import Counter
import time


print("Testrun")

ds = ray.data.read_csv("../sales-data.csv")
print("Input read: ", ds)

"""
Function, welche die Purchases per Customer zählt
"""
@ray.remote
def count_customers(s):
    counter = Counter()
    customer_order = None
    for record in s.iter_rows():
        if customer_order == None:
            customer_order = (record["CustomerId"], record["OrderId"])
        elif customer_order[1] != record["OrderId"]:
            counter.update({customer_order[0]: 1})
            customer_order = (record["CustomerId"], record["OrderId"])
        
    counter.update({customer_order[0]: 1})
    """
    dictionary = {}

    for record in s.iter_rows():
        if record["CustomerId"] in dictionary:
            dictionary[record["CustomerId"]] = dictionary.get(record["CustomerId"]) + 1
        else:
            dictionary[record["CustomerId"]] = 1
    
    counter = Counter(dictionary)
    """
    return counter

"""Function, welche Counter addiert"""
@ray.remote
def add_counters(fst, snd):
    counter1 = Counter(fst)
    counter2 = Counter(snd)
    return counter1 + counter2


"""
Function, welche die Top 10 Customer findet
"""
@ray.remote
def find_top_10_customers(customers):
    counter = Counter(customers)
    return counter.most_common(10)

start = time.time()
customer_lists = []

print("Splitting")
split_indices = []
n = 12
for i in range(1,n):
    split_indices.append(int(i*ds.count()/n))
    
def shift_split(ds, split_index):
    i = 0
    prev_record = None
    for record in ds.iter_rows():
        if i + 1 == split_index:
            prev_record = (record["CustomerId"], record["OrderId"])
        elif i == split_index:
            current_record = (record["CustomerId"], record["OrderId"])
            if prev_record == current_record:
                split_index += 1
                prev_record = current_record
            else:
                return split_index
        i += 1

@ray.remote
def shift_all_splits(ds, split_indices):
    shifted = []
    for split in split_indices:
        shifted.append(shift_split(ds, split))
    return shifted
    
print(split_indices)
shifted_split_indices = ray.get(shift_all_splits.remote(ds, split_indices))
print(shifted_split_indices)
    
splits = ds.split_at_indices(shifted_split_indices)
print("Functioning")

for split in splits:
    counter = count_customers.remote(split)
    customer_lists.append(counter)
    
@ray.remote
def print_parts(cl):
    cl = ray.get(cl)
    for part in cl:
        print(part)
        #print(Counter(part).most_common(100))

print_parts.remote(customer_lists)

while len(customer_lists) > 1:
    customer_lists = customer_lists[2:] + [add_counters.remote(customer_lists[0], customer_lists[1])]

#print(ray.get(customers))
ys = find_top_10_customers.remote(customer_lists[0])
print("Getting")
res = ray.get(ys)

print(res)


print(ds.count())

print('duration: ',time.time() - start)

Testrun
Input read:  Dataset(num_blocks=1, num_rows=None, schema={CustomerId: int64, OrderId: int64, ProductId: int64, ProductGroupId: int64, Quantity: double, OrderDate: date32[day]})
Splitting
[440351, 880703, 1321055, 1761407, 2201759, 2642111, 3082463, 3522815, 3963167, 4403519, 4843871]


KeyboardInterrupt: 