# Distributed Genex
This notebook implements redistributed Genex for faster and more reliable processing.

## Get the input list
Use the generate_source call from genex.preprocess to get our input list, which is a key-value pair with key being the
features of the time series and the value is time series itself, or, the raw data.

For now, we only take the first 50 time series to speed it up.

In [1]:
from genex.parse import generate_source
fn = 'SART2018_HbO.csv'

input_list = generate_source(fn, feature_num=5)
input_list = input_list[:50]

# print the keys of the  input_list
for time_series in input_list: print(time_series[0])

('101-SART-June2018-AS', 'target correct', 'Channel-1 HbO', '126468', '167986')
('101-SART-June2018-AS', 'target correct', 'Channel-1 HbO', '274131', '315653')
('101-SART-June2018-AS', 'target correct', 'Channel-1 HbO', '403678', '445179')
('101-SART-June2018-AS', 'target correct', 'Channel-1 HbO', '600024', '641559')
('101-SART-June2018-AS', 'target correct', 'Channel-1 HbO', '624914', '666433')
('101-SART-June2018-AS', 'target correct', 'Channel-1 HbO', '631505', '673041')
('101-SART-June2018-AS', 'target correct', 'Channel-1 HbO', '667507', '709026')
('101-SART-June2018-AS', 'target correct', 'Channel-1 HbO', '777318', '818835')
('101-SART-June2018-AS', 'target correct', 'Channel-1 HbO', '789029', '830533')
('101-SART-June2018-AS', 'target correct', 'Channel-1 HbO', '794000', '835502')
('101-SART-June2018-AS', 'target correct', 'Channel-1 HbO', '836659', '878161')
('101-SART-June2018-AS', 'target correct', 'Channel-1 HbO', '854762', '896297')
('101-SART-June2018-AS', 'target correct

## Normalization
Globally min-max normalize the input-list

In [2]:
from genex.preprocess import min_max_normalize
normalized_input_list, global_max, global_min = min_max_normalize(input_list)

## Configure Spark
Create the spark context with which the input_list will be processed, setting the number of CPU (cores), driver memory
and max result size. Those configurations depends on the specs on the cluster.

In [3]:
from pyspark import SparkContext, SparkConf

num_cores = 32

conf = SparkConf(). \
    setMaster("local[" + str(num_cores) + "]"). \
    setAppName("Genex").set('spark.driver.memory', '31G'). \
    set('spark.driver.maxResultSize', '31G')
sc = SparkContext(conf=conf)

## Distribute the normalized input list
distribute the input list onto the works, each worker will have two partitions

In [5]:
input_rdd = sc.parallelize(normalized_input_list, numSlices= num_cores)
partition_input = input_rdd.glom().collect()

## Grouping the data
We group the time series on each worker. Setting loi to be 5 so to 
ignore sequences with a length less than 120.

In [6]:
from genex.preprocess import all_sublists_with_id_length
group_rdd = input_rdd.flatMap(
    lambda x: all_sublists_with_id_length(x, [120]))
partition_group = group_rdd.glom().collect()

# Cluster the data with Gcluster
Now the time series on each core has been broken down into subsequence. We will proceed
with node-wise clustering with the Genex algorithm.

Note that this implementation uses a different clustering than the previous implementation.

At the end of the operation, we cache the result so that once computed, the cluster_rbb would be retained in the pre-set


In [None]:
from genex.cluster import filter_cluster

cluster_rdd = group_rdd.mapPartitions(lambda x: filter_cluster(groups=x, st=0.05, log_level=1), preservesPartitioning=False).cache()
cluster_partition = cluster_rdd.glom().collect()