<a href="https://colab.research.google.com/github/FedorTaggenbrock/data_intensive_systems/blob/main/notebooks/main_tests_notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Handle importing/installing, both local and on Colab**

In [None]:
import sys
ON_COLAB = 'google.colab' in sys.modules
if ON_COLAB:
    # Do stuff that only needs to happen on colab
    !pip install pyspark  # noqa
    !pip install ijson
    !pip install ipython-autotime
    %load_ext autotime
    pass
else:
    # Do stuff that only needs to happen on local computer
    pass

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
time: 487 µs (started: 2023-06-13 15:58:20 +00:00)


Rerun the code cell below to use the latest version of the python files!

Test all functions you want inside the run_all_tests() during development, for small sample sizes.


Code below is for actual result generation later, so that we can easily reuse intermediate values.

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)
import sys
sys.path.append('/content/drive/MyDrive/colab_notebooks/data_intensive_systems/src')

from pyspark.sql import SparkSession

Mounted at /content/drive
time: 3.74 s (started: 2023-06-13 15:58:20 +00:00)


In [None]:
spark = SparkSession.builder.appName("Clustering").getOrCreate()

time: 8.73 s (started: 2023-06-13 15:58:24 +00:00)


In [None]:
import json
import ijson
import pandas as pd
from pyspark.sql.functions import collect_list, udf
from pyspark.sql import SparkSession
from pyspark.sql.types import MapType, IntegerType, StringType
import warnings
import math
from pyspark.sql.types import StructType, StructField, MapType, StringType, IntegerType, ArrayType


def parse_json_data(json_path, debug_flag=False):
    """Parse the data from the json file to a pandas df."""
    warnings.filterwarnings("ignore", category=FutureWarning)
    num_routes = 0
    from_tos = set()
    products = set()
    with open(json_path, 'rb') as f:
        for row in ijson.items(f, "item"):
            num_routes += 1
            for trip in row['route']:
              from_to = trip['from']+"-"+trip['to']
              from_tos.add(from_to) #does not add duplicate from_to's
              for prod in trip["merchandise"]:
                products.add(prod)
    from_tos = list(from_tos)
    products = list(products)
    #Integers relate to specific products.
    #For bookkeeping we store a dictionary which store the index of a specific product.
    product_mapping = {}
    for i,v in enumerate(products):
      product_mapping[v] = i

    print("product_mapping: ", product_mapping)

    df_rows = []
    with open(json_path, 'rb') as f:
      for row in ijson.items(f, "item"):
        new_row = {"id-sr": str(row["id"])+" "+ str(row["sr"])}
        for trip in row['route']:
            from_to = trip['from']+"-"+trip['to']
            merch_dict = dict(map(lambda x: (product_mapping[x[0]], x[1]), trip["merchandise"].items()))
            trip_merch = {from_to: merch_dict}
            new_row.update(trip_merch)
        df_rows.append(new_row)
    df = pd.DataFrame(df_rows, columns=["id-sr"] +from_tos)
    df = df.applymap(lambda x: {} if pd.isna(x) else x)
    return df, from_tos, products, num_routes

def get_data(spark, path, clustering_settings):
  df, from_tos, products, num_routes = parse_json_data(path, clustering_settings["debug_flag"])
  schema = StructType([StructField("id-sr",StringType())] + [StructField(from_to, MapType(IntegerType(), IntegerType())) for from_to in from_tos]  )
  spark_df = spark.createDataFrame(df, schema = schema)
  if clustering_settings["debug_flag"]:
    spark_df.show()
  clustering_settings["Products"] = products
  clustering_settings["num_routes"] = num_routes
  return spark_df.rdd





time: 392 ms (started: 2023-06-13 15:59:06 +00:00)


In [34]:
from copy import copy
from statistics import mode
from pyspark import RDD
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from functools import reduce

import numpy as np
import math
from collections import Counter
import random


def run_clustering(clustering_settings: dict, data: RDD) -> list[tuple]:
    '''Define variables to store results.'''
    # E.g. for kmodes: [(predicted_centroids, (k, init_mode)), ...]
    results = []

    # Check which clustering algortihm to run
    if clustering_settings['clustering_algorithm'] == 'kmodes':
        for current_k in clustering_settings['k_values']:
            # TODO in the future add other parameters here.
            # Run clustering with current parameters
            print("Performing clustering with k= ", current_k)
            predicted_centroids = kModes(
                data=data,
                k=current_k,
                clustering_settings=clustering_settings
            )

            # Store the settings, model, and metrics
            results.append((predicted_centroids, {'k': current_k}))
            if clustering_settings["debug_flag"]:
                print("The centroids for  k = ", current_k, " are given by: ", [c[0] for c in predicted_centroids] )
    else:
        print("Clustering algorithm setting not recognized in run_and_tune().")
    if clustering_settings["debug_flag"]:
        print("The output results for multiple k is given by:", results)
    return results


def kModes(data: RDD, k: int, clustering_settings):
    # Painfull code duplication which is the only way I managed to make all the spark dependencies work
    def dictionary_distance(dict1, dict2):
        # This function computes the normalized euclidean distance (in 0-1) for dict representations of (sparse) vectors.
        norm_dict1 = math.sqrt(np.sum(
            [int(float(v)) ** 2 for k, v in dict1.items()]))
        norm_dict2 = math.sqrt(np.sum(
            [int(float(v)) ** 2 for k, v in dict2.items()]))
        return math.sqrt(np.sum(
            [(int(float(dict1.get(product, 0))) - int(float(dict2.get(product, 0)))) ** 2 for product in
             set(dict1) | set(dict2)])) / (norm_dict1 + norm_dict2)

    def route_distance(route1, route2):
      columns = route1.__fields__[1:]
      intersection = 0
      union = 0
      intersecting_dist = 0
      # Preferably vectorize this
      for column in columns:
          trip1 = route1[column]
          trip2 = route2[column]
          if trip1 or trip2:
              union += 1
              if trip1 and trip2:
                  intersection += (1 - dictionary_distance(trip1, trip2))
      if union != 0:
          dist = 1 - intersection / union
      else:
          dist = 1
      return dist


    def assign_row_to_centroid_key(row, centroids):
      random_centroid = random.choice(centroids)
      min_centroid = min(centroids, key=lambda centroid: route_distance(row, centroid))
      if route_distance(row, random_centroid) == route_distance(row, min_centroid):
          return (random_centroid["id-sr"], row)
      else:
          return (min_centroid["id-sr"], row)

    def create_centroid(set_of_rows):
        size_of_set = len(set_of_rows)
        trips_to_keep = []
        first_row = True
        for row in set_of_rows:
            if first_row:
                trips_to_keep = np.zeros(len(row))
                first_row = False
            for it, trip in enumerate(row):
                if trip:
                    trips_to_keep[it] += 1
        trips_to_keep = trips_to_keep >= size_of_set // 2
        row_scores = []
        for row in set_of_rows:
            row_score = 0
            for it, trip in enumerate(row):
                if it != 0 and trip and trips_to_keep[it]:
                    row_score += 1
            row_scores.append(row_score)
        max_score = -1
        for it, row in enumerate(set_of_rows):
            if row_scores[it] > max_score:
                best_row = row
                max_score = row_scores[it]
            if row_scores[it] == max_score:
              if random.random() <= 0.2:
                best_row = row

        # if clustering_settings["debug_flag"]:
        #   print("trips_to_keep", [int(bl) for bl in trips_to_keep])
        #   print("row_scores", row_scores)

        return best_row

    centroids = data.takeSample(withReplacement=False, num=k)
    if clustering_settings["debug_flag"]:
        print("centroids = ",  [c[0] for c in centroids])

    # Iterate until convergence or until the maximum number of iterations is reached
    for i in range(clustering_settings["max_iterations"]):
        if clustering_settings["debug_flag"]:
          print("iteration ", i, ": ")
        # Assign each point to the closest centroid
        clusters = data.map(lambda row: assign_row_to_centroid_key(row, centroids)).groupByKey()

        if clustering_settings["debug_flag"]:
          print("Mapped rows to existing centroids")
          ls_set_of_rows = list(clusters.take(k))
          for i in range(len(ls_set_of_rows)):
            print("number of rows in the", i, "-th cluster per st route:" , Counter([row_[0].split()[1] for row_ in ls_set_of_rows[i][1]]) )
          print("Computing the new centroid for the first cluster:")
          print("new_c= ", create_centroid(ls_set_of_rows[0][1]))

        centroids = clusters.map(lambda key_rows: create_centroid(key_rows[1])).collect()

        if clustering_settings["debug_flag"]:
            print("centroids = ",  [c[0] for c in centroids])

    return [list(x) for x in centroids]

time: 4.05 ms (started: 2023-06-13 16:59:30 +00:00)


In [None]:
clustering_settings = {
    'clustering_algorithm': 'kmodes',
    'k_values': [10],
    'max_iterations': 20,
    'debug_flag': True
}
clustering_settings_big = {
    'clustering_algorithm': 'kmodes',
    'k_values': [10],
    'max_iterations': 20,
    'debug_flag': True
}

time: 866 µs (started: 2023-06-13 15:58:32 +00:00)


In [None]:
spark_rdd = get_data(spark,"/content/drive/MyDrive/colab_notebooks/data_intensive_systems/data/1000_0.25_actual_routes.json", clustering_settings)

product_mapping:  {'product_425': 0, 'product_116': 1, 'product_443': 2, 'product_286': 3, 'product_611': 4, 'product_83': 5, 'product_204': 6, 'product_967': 7, 'product_584': 8, 'product_276': 9, 'product_782': 10, 'product_597': 11, 'product_918': 12, 'product_809': 13, 'product_422': 14, 'product_726': 15, 'product_609': 16, 'product_406': 17, 'product_416': 18, 'product_386': 19, 'product_179': 20, 'product_123': 21, 'product_258': 22, 'product_380': 23, 'product_64': 24, 'product_412': 25, 'product_745': 26, 'product_418': 27, 'product_6': 28, 'product_108': 29, 'product_218': 30, 'product_901': 31, 'product_155': 32, 'product_626': 33, 'product_483': 34, 'product_496': 35, 'product_453': 36, 'product_639': 37, 'product_230': 38, 'product_768': 39, 'product_292': 40, 'product_171': 41, 'product_570': 42, 'product_737': 43, 'product_62': 44, 'product_485': 45, 'product_544': 46, 'product_212': 47, 'product_269': 48, 'product_822': 49, 'product_734': 50, 'product_938': 51, 'product

In [None]:
results = run_clustering(
    data=spark_rdd,
    clustering_settings=clustering_settings
    )

Performing clustering with k=  10
centroids =  ['27 4', '71 4', '42 7', '28 1', '73 3', '94 8', '50 2', '58 6', '56 0', '65 7']
iteration  0 : 
Mapped rows to existing centroids
number of rows in the 0 -th cluster per st route: Counter({'1': 13, '3': 13, '4': 12, '6': 12, '9': 12, '8': 12, '0': 11, '2': 11, '5': 11, '7': 11})
number of rows in the 1 -th cluster per st route: Counter({'8': 14, '5': 13, '2': 12, '4': 12, '9': 12, '1': 10, '7': 10, '0': 9, '3': 9, '6': 5})
number of rows in the 2 -th cluster per st route: Counter({'0': 14, '8': 13, '6': 12, '9': 12, '3': 11, '2': 10, '5': 10, '7': 10, '4': 8, '1': 7})
number of rows in the 3 -th cluster per st route: Counter({'1': 11, '5': 11, '9': 11, '6': 10, '3': 9, '4': 9, '2': 8, '0': 6, '7': 4, '8': 3})
number of rows in the 4 -th cluster per st route: Counter({'0': 15, '2': 12, '6': 11, '7': 10, '9': 10, '3': 9, '8': 9, '1': 8, '4': 8, '5': 8})
number of rows in the 5 -th cluster per st route: Counter({'6': 13, '7': 12, '0': 11, '8

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: ignored

time: 6.33 s (started: 2023-06-13 16:15:48 +00:00)


In [None]:
spark_rdd_big = get_data(spark,"/content/drive/MyDrive/colab_notebooks/data_intensive_systems/data/10000_0.25_actual_routes.json", clustering_settings_big)

product_mapping:  {'product_425': 0, 'product_116': 1, 'product_443': 2, 'product_286': 3, 'product_611': 4, 'product_83': 5, 'product_584': 6, 'product_967': 7, 'product_204': 8, 'product_276': 9, 'product_782': 10, 'product_597': 11, 'product_918': 12, 'product_809': 13, 'product_422': 14, 'product_726': 15, 'product_609': 16, 'product_406': 17, 'product_416': 18, 'product_386': 19, 'product_179': 20, 'product_123': 21, 'product_258': 22, 'product_380': 23, 'product_64': 24, 'product_412': 25, 'product_218': 26, 'product_901': 27, 'product_6': 28, 'product_108': 29, 'product_483': 30, 'product_745': 31, 'product_155': 32, 'product_626': 33, 'product_418': 34, 'product_678': 35, 'product_453': 36, 'product_496': 37, 'product_289': 38, 'product_292': 39, 'product_768': 40, 'product_737': 41, 'product_171': 42, 'product_570': 43, 'product_639': 44, 'product_230': 45, 'product_62': 46, 'product_485': 47, 'product_544': 48, 'product_212': 49, 'product_269': 50, 'product_822': 51, 'product

In [None]:
results_big = run_clustering(
    data=spark_rdd_big,
    clustering_settings=clustering_settings_big
    )

Performing clustering with k=  10
centroids =  ['155 2', '288 7', '649 6', '90 0', '872 3', '326 7', '767 3', '958 6', '905 8', '803 0']
iteration  0 : 
Mapped rows to existing centroids
number of rows in the 0 -th cluster per st route: Counter({'0': 233, '1': 113, '5': 101, '9': 100, '4': 84})
number of rows in the 1 -th cluster per st route: Counter({'8': 1000, '5': 110, '4': 99, '1': 94, '9': 91})
number of rows in the 2 -th cluster per st route: Counter({'3': 670, '4': 117, '9': 110, '5': 104, '1': 90})
number of rows in the 3 -th cluster per st route: Counter({'6': 261, '4': 124, '1': 106, '9': 104, '5': 101})
number of rows in the 4 -th cluster per st route: Counter({'3': 330, '1': 111, '9': 106, '5': 98, '4': 98})
number of rows in the 5 -th cluster per st route: Counter({'0': 767, '4': 114, '9': 98, '1': 94, '5': 78})
number of rows in the 6 -th cluster per st route: Counter({'6': 739, '9': 102, '5': 101, '1': 98, '4': 84})
number of rows in the 7 -th cluster per st route: Coun