## Init: configure executors based on our cluster size

docs: 
- https://learn.microsoft.com/en-us/azure/databricks/clusters/cluster-config-best-practices
- https://spark.apache.org/docs/latest/configuration.html

In [0]:
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
import pyspark.sql.functions as F

conf = SparkConf()
conf.setMaster("local").setAppName("My app")
conf.set("spark.executor.instances", "256")
conf.set("spark.executor.cores", "2")
conf.set("spark.dynamicAllocation.initialExecutors",16)


conf.getAll()

Out[1]: [('spark.master', 'local'),
 ('spark.executor.memory', '18409m'),
 ('spark.executor.extraClassPath',
  '/databricks/spark/dbconf/log4j/executor:/databricks/spark/dbconf/jets3t/:/databricks/spark/dbconf/hadoop:/databricks/hive/conf:/databricks/jars/*'),
 ('spark.executor.instances', '256'),
 ('spark.executor.extraJavaOptions',
  '-Djava.io.tmpdir=/local_disk0/tmp -XX:ReservedCodeCacheSize=512m -XX:+UseCodeCacheFlushing -XX:PerMethodRecompilationCutoff=-1 -XX:PerBytecodeRecompilationCutoff=-1 -Djava.security.properties=/databricks/spark/dbconf/java/extra.security -XX:-UseContainerSupport -XX:+PrintFlagsFinal -XX:+PrintGCDateStamps -XX:+PrintGCDetails -verbose:gc -Xss4m -Djava.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni -Djavax.xml.datatype.DatatypeFactory=com.sun.org.apache.xerces.internal.jaxp.datatype.DatatypeFactoryImpl -Djavax.xml.parsers.DocumentBuilderFac

source files: 
- raw patient sequences (one record --> one  patient)
- crossjoin files (one record --> one pair of patients)
- computed distances (dlcs)

In [0]:
raw_sampled_100    = '/FileStore/tables/patients-sequences/patients_df_sampled_100.csv'
raw_sampled_1000   = '/FileStore/tables/patients-sequences/patients_df_sampled_1000.csv'
raw_sampled_10000  = '/FileStore/tables/patients-sequences/patients_df_sampled_10000.csv'
# raw_sampled_100000 = '/FileStore/tables/patients-sequences/patients_df_sampled_100000.csv'

crossjoin_partitioned_100    = '/FileStore/tables/patients-sequences/crossjoin_partitioned_100.parquet'
crossjoin_partitioned_1000   = '/FileStore/tables/patients-sequences/crossjoin_partitioned_1000.parquet'
crossjoin_partitioned_10000  = '/FileStore/tables/patients-sequences/crossjoin_partitioned_10000.parquet'
# crossjoin_partitioned_100000 = '/FileStore/tables/patients-sequences/crossjoin_partitioned_100000.parquet'

dlcs_results_100    = '/FileStore/tables/patients-sequences/dlcs_partitioned_100.parquet'
dlcs_results_1000   = '/FileStore/tables/patients-sequences/dlcs_partitioned_1000.parquet'
dlcs_results_10000  = '/FileStore/tables/patients-sequences/dlcs_partitioned_10000.parquet'
# dlcs_results_100000 = '/FileStore/tables/patients-sequences/dlcs_partitioned_100000.parquet'

underpinning dLCS function. in terms of analytics, this is a "heavyweight" UDF that is applied to each record in any of the datasets above

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType, BooleanType, IntegerType
 

In [0]:
## simple example to show UDF registration mechanims

def testF (x,y):
    return len(x) > len(y)

testF_udf = F.udf(testF, BooleanType())

In [0]:
## this is the distance function 
## approx matching of two sequences of tokens
## input tokelist{1,2} are two lists of tokens. 
## return int distance between the two lists

def dlcs(tokenList1, tokenList2):
    # find the length of the lists
    m = len(tokenList1)
    n = len(tokenList2)

    # declaring the array for storing the dp values
    L = [[None]*(n + 1) for i in range(m + 1)]
 
    """Following steps build L[m + 1][n + 1] in bottom up fashion
    Note: L[i][j] contains length of LCS of X[0..i-1]
    and Y[0..j-1]"""
    for i in range(m + 1):
        for j in range(n + 1):
            if i == 0 or j == 0 :
                L[i][j] = 0
            elif tokenList1[i-1] == tokenList2[j-1]:
                L[i][j] = L[i-1][j-1]+1
            else:
                L[i][j] = max(L[i-1][j], L[i][j-1])
 
    # L[m][n] contains the length of LCS of X[0..n-1] & Y[0..m-1]
    return m+n- 2 * L[m][n]


dlcs_udf = F.udf(dlcs, IntegerType())

#### set the desired scale for the next experiment

In [0]:
raw_input_dataset = raw_sampled_1000
crossjoin_dataset = crossjoin_partitioned_1000
results_dataset   = dlcs_results_1000

## Spark `DataFrames` approach

so we manage relational tables throughout

the key is to partition the data so that each executor can operate on one partition, concurrently and independently from the others. 
note there are no "reduce" steps here, ideally the set of all distances can be computed in an "embarassingly parallel" fashion

in theory, it should be possible to scale up the resources with the size of the input dataframe. As we will see below, in practice there are limitations

### Data partitioning

firstly, we need to prepare the raw dataset (dataframe). 

This involves two steps:
- computing the crossjoin (cartesian product) of the dataset with itself, producing records of pairs

- partitioning the crossjoin dataset , so that each executor can operate on one partition independently of the others.

We save the partitioned version of the raw dataset to DBFS on the assumption that it will be used multiple times. 

Partitoning is an expensive operation but we only need to perform it once for each input dataset (100, 1000, etc).

#### Steps:
1. create DataFrame from raw csv file
2. lazy compute cartesian product (N^2 records)
3. repartition the product dataset in memory
4. save crossjoin file to dbfs --> **we use the more efficient binary parquet format**

#### 1. create DataFrame from raw csv file

In [0]:
sequences_df = spark.read.csv(raw_input_dataset, header=True, inferSchema=True)
sequences_df.schema

Out[7]: StructType([StructField('_c0', IntegerType(), True), StructField('patient_id', IntegerType(), True), StructField('read_2', StringType(), True)])

In [0]:
## "clean up" the dataframe schema

sequences_1 = sequences_df.drop('_c0')
sequences_2 = sequences_1.withColumnRenamed('patient_id', 'patient_id_1').withColumnRenamed('read_2', 'read_2_1')

#### 2. lazy compute cartesian product (N^2 records)

crossjoin on the dataframe with itself.

note that this is a "lazy" operation:

In [0]:
sequence_pairs = sequences_1.crossJoin(sequences_2)
sequence_pairs.count() 

Out[9]: 1000000

#### 3. repartition the product dataset in memory

data partitioning on the product dataset

see https://sparkbyexamples.com/spark/spark-partitioning-understanding

note that repartition is also lazy

several strategies are available to define partitons. A common way is to specify one or more partition columns. This works well if the downstream aggregstions use those columns for "groupby".

In our case it makes sense to use the combination of patient IDs, as we are going to group to group by patient id later, however this will generate N= num of patients partitions, which are too many. 

Thus we limit to 200 random partitions for this exercise.

feel free to experiment with different partitioning strategies, and see what performs better!

In [0]:
partitioned_sequence_pairs = sequence_pairs.repartition(200)

#### 4. save crossjoin file to dbfs

In [0]:
## following command is a Spark _action_: it triggers the actual repartitioning 

## for large datasets, this is an expensive operation, and we only execute it once when preparing the data for repeat analysis

partitioned_sequence_pairs.rdd.getNumPartitions()

Out[11]: 200

In [0]:
partitioned_sequence_pairs.write.mode("overwrite").parquet(crossjoin_dataset)

 ## format("delta").save(sequences_table)

### entry point if partitioned crossjoin datasets are already on FS:

if the crossjoin datasets have been previously saved to DBFS, we can start here by loading them rather than repeat the prep steps above

In [0]:
partitioned_sequence_pairs = spark.read.parquet(crossjoin_dataset)

## Computing distances using UDFs on DataFrames: 

1. compute distances on each partition. this adds a `dlcs` column to each record, holding the distance
- in principle, each computation is independenr from all others
- in Spark this is again lazily evaluated
2. write the resulting dataframe back to disk

In [0]:
partitioned_sequence_pairs_dlcs = partitioned_sequence_pairs.withColumn('dlcs', dlcs_udf('read_2','read_2_1'))

### example aggregations:

no computation has happened yet at this point, but we can illustrate how workers can deliver valuable analytics without the need to centralise the result, i.e., moving all data back to the driver node

Aggregations are optimised by Spark to operate on the partitions, i.e., by having the driver request data from the workers, as needed. 

Thus as long as we never call `collect()` on the computed dataset, we can still be relatively efficient.

#### show K random records from the result dataset

In [0]:
## top-k is relatively inexpensive:

partitioned_sequence_pairs_dlcs.show(10)

+----------+--------------------+------------+--------------------+----+
|patient_id|              read_2|patient_id_1|            read_2_1|dlcs|
+----------+--------------------+------------+--------------------+----+
|   2148782|667..,F25..,J671....|     2124644|62T1.,Eu32z,G20.....| 142|
|   2372235|G83..,F20..,666A....|     5193213|               BB3G.|  24|
|   3964995|H33..,M153.,H120z...|     3836017|N210.,G2...,J520z...|  66|
|   5163684|J521.,9N0V.,F583....|     3250863|N123.,J521.,19C2....|  54|
|   3250007|   N2132,4K2..,22K5.|     5624097|N217.,N2132,E2273...|  42|
|   1951649|1465.,M12z0,M101....|     4574256|   A11..,N11E.,1C2Z.|  34|
|   5022252|N2165,N2160,H01.....|     4915787|K5920,J521.,J5770...|  36|
|   2958453|J15..,J155.,N2151...|     3872619|N211.,N220.,J521....|  52|
|   4177952|H10..,H132.,G20.....|     2756617|Eu32z,Eu320,G33.....|  58|
|   3305721|J521.,1465.,J520....|     2606834|               N2131|  46|
+----------+--------------------+------------+-----

#### compute average, min and max distances across the entire dataset

In [0]:
grouped = partitioned_sequence_pairs_dlcs.select('dlcs').groupBy()

In [0]:
grouped.avg().show()

+---------+
|avg(dlcs)|
+---------+
|59.758692|
+---------+



In [0]:
grouped.max().show()

+---------+
|max(dlcs)|
+---------+
|      282|
+---------+



#### avg distance of each patient from all others

In [0]:
## group by first patient, take avg distance

avg_distance_by_patient = partitioned_sequence_pairs_dlcs.groupby('patient_id').agg(F.avg('dlcs').alias('avg_dist')).orderBy('avg_dist', ascending= False)
avg_distance_by_patient.columns

Out[83]: ['patient_id', 'avg_dist']

In [0]:
display(avg_distance_by_patient)

patient_id,avg_dist
5603439,253.21
2392371,226.404
2396783,209.054
3242528,199.052
4877743,181.502
2862043,176.952
5947646,172.198
4738768,171.72
1390285,171.45
2018408,170.288


In [0]:
display(avg_distance_by_patient.summary())

summary,patient_id,avg_dist
count,1000.0,1000.0
mean,3482146.559,59.75869200000004
stddev,1472713.0617025692,24.80044264000698
min,1007103.0,40.432
25%,2219412.0,43.978
50%,3445337.0,50.748
75%,4823554.0,65.422
max,6023870.0,253.21


select patients who have a small average distance from all others

In [0]:
dense_patients = avg_distance_by_patient.filter(avg_distance_by_patient.avg_dist < 65 - 35/2)

dense_patients.count()

Out[86]: 412

In [0]:
display(dense_patients)

patient_id,avg_dist
3829649,47.474
2368616,47.43
4861984,47.426
5478530,47.378
4867274,47.374
2418385,47.362
5092465,47.358
1333757,47.35
2768655,47.334
2471880,47.334


### illustrate a SQL interface to querying dataframes

In [0]:
partitioned_sequence_pairs_dlcs.createOrReplaceTempView("patient_distances")

In [0]:
spark.sql("SELECT avg(dlcs) FROM patient_distances").show()

+---------+
|avg(dlcs)|
+---------+
|59.758692|
+---------+



In [0]:
spark.sql("SELECT avg(dlcs) AS avg_dist FROM patient_distances GROUP BY patient_id ORDER BY avg_dist DESC. LIMIT 20").show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mParseException[0m                            Traceback (most recent call last)
[0;32m<command-2469345370738056>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0mspark[0m[0;34m.[0m[0msql[0m[0;34m([0m[0;34m"SELECT avg(dlcs) AS avg_dist FROM patient_distances GROUP BY patient_id ORDER BY avg_dist DESC. LIMIT 20"[0m[0;34m)[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [0;34m=[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     47[0m             [0;32mtry[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 48[0;31m                 [0mres[0m [0;34m=[0m [0mfunc[0m[0;34m([0m[0;34m*[0m[0margs[0m[0;34m,

## write result dataset back to disk

writing a partitioned dataset to disk does not require to move its contents back to the driver. 

 -- if the file is naturally partitioned, data may not need to be collected back to the driver

In [0]:
partitioned_sequence_pairs_dlcs.write.parquet(results_dataset, mode='overwrite')

## Appendix: Using rdd.map()

the `rdd.map()` style takes essentially the same time, as under the hood it is actually what is executed on the dataframe

however in this case we need to manually "carry over" the key (patient_id) as the `map()` operation only returns the actual value of the `dlcs()` function

In [0]:
partitioned_sequence_pairs_dlcs  = partitioned_sequence_pairs.rdd.map(lambda row: dlcs(row['read_2'], row['read_2_1'])).collect()

managing RDDs is less intuitive and effectively the Dataframes / SQL API is much more friendly. 

Thus we do not repeat the analytics above here

**WARNING** the `collect()` action forces a complete computation and also transfers all data back to driver. 

This is the most expensive operation, and should be avoided if possible.

In [0]:
partitioned_sequence_pairs_dlcs.collect()

Out[40]: [Row(patient_id=2140102, read_2='C34z.', patient_id_1=4096522, read_2_1='N2124,13H4.,E204.,E2273', dlcs=22),
 Row(patient_id=3103078, read_2='7411G,N110.,N2200,B34..,H1201,J12..,N11zz,E2B..,1465.,1BT..,9H91.,9H92.,9HA0.,Eu320,Eu32.,N2132,19C2.,1V64.,2482.,N05z6', patient_id_1=3045516, read_2_1='M101.,Myu22,E2900,J11..,J15z.', dlcs=108),
 Row(patient_id=2425526, read_2='M12z0,M100.,M101.,M1535,BB3G.', patient_id_1=3318615, read_2_1='M2401,N210.,G5732', dlcs=24),
 Row(patient_id=3984546, read_2='K584.,M153.', patient_id_1=5279063, read_2_1='E200z,E112.,E2B..,Eu32z,N05..,N2133,N05zJ,H120.,N2131,9H92.', dlcs=56),
 Row(patient_id=1864086, read_2='14F1.,M2521,M244.,M111.,G20..', patient_id_1=1480226, read_2_1='N21z2,C34z.,669..,F59..', dlcs=32),
 Row(patient_id=2938581, read_2='N110.,N2132,N21z2,C34..,N100.,N2174,K16y4,G20..,N2113', patient_id_1=2235701, read_2_1='1593.,E2B..,BBe5.,G20..,1BT..,F26..,J510.,H01..,662d.', dlcs=64),
 Row(patient_id=2392875, read_2='BBEF.,N05..', patient

writing the result back to file also forces a complete computation

In [0]:
parquet_file_dlcs = '/FileStore/tables/patients-sequences/patients_df_sampled_1000_dlcs.parquet'
partitioned_sequence_pairs_dlcs.write.parquet(parquet_file_dlcs, mode='overwrite')

## Appendix: using Databricks native Delta Tables

here instead of writing the crossjoin file to parquet (partitioned), we write it to Delta Tables. 

The goal is to check whether this results in faster reads

In [0]:
partitioned_sequences_table = '/FileStore/tables/patients-sequences/patients_df_sampled_10000.delta'

partitioned_sequence_pairs.write.format("delta").mode("overwrite").save(sequences_table)

In [0]:
sequence_pairs_delta = spark.read.format("delta").load(partitioned_sequences_table)

In [0]:
sequence_pairs_delta.count()

Out[135]: 100000000

In [0]:
partitioned_sequence_pairs_dlcs = partitioned_sequence_pairs.withColumn('dlcs', dlcs_udf('read_2','read_2_1'))

In [0]:
partitioned_sequence_pairs_dlcs.show(10000)

+----------+--------------------+------------+--------------------+--------+----+
|patient_id|              read_2|patient_id_1|            read_2_1|part_key|dlcs|
+----------+--------------------+------------+--------------------+--------+----+
|   1009265|E2500,E23..,M12z0...|     3873490|K514.,J521.,G20z....|   105.0| 134|
|   1009265|E2500,E23..,M12z0...|     1343564|   J40..,N11E.,Eu32.|   105.0| 138|
|   1009265|E2500,E23..,M12z0...|     1203423|               9k1A.|   105.0| 150|
|   1009265|E2500,E23..,M12z0...|     1303252|               9k1A.|   105.0| 150|
|   1009265|E2500,E23..,M12z0...|     1358649|C380.,N2174,N05.....|   105.0| 144|
|   1009265|E2500,E23..,M12z0...|     4051346|14B4.,M12z1,N301....|   105.0| 148|
|   1009265|E2500,E23..,M12z0...|     2626390|C380.,N2174,N11D2...|   105.0| 132|
|   1009265|E2500,E23..,M12z0...|     3801063|         1C12.,8D2..|   105.0| 142|
|   1009265|E2500,E23..,M12z0...|     4263529|H33..,J521.,H333....|   105.0| 132|
|   1009265|E250

In [0]:
dlcs_delta_file = '/FileStore/tables/patients-sequences/patients_df_sampled_dlcs_partitioned_10000.delta'
partitioned_sequence_pairs_dlcs.write.format("delta").mode("overwrite").save(dlcs_delta_file)