# Overview of *Ambrosia* ``Splitter`` class Spark data support

This example shows the functionality of the ``Splitter`` class on Spark DataFrames. Synthetic data on MTS KION users metrics is used.

The functionality of the ``Designer`` class on Spark data currently is limited compared to the pandas format. \
See the main ``Splitter`` tutorial on pandas data to learn the full functionality and details of splitting experimental objects into groups.

**Note:** *Ambrosia* now supports only batch spliiting. Real-time splitting tools are under development.

In [2]:
import os

import pandas as pd
import pyspark

from ambrosia.splitter import Splitter

Your CPU supports instructions that this binary was not compiled to use: AVX2
For maximum performance, you can install NMSLIB from sources 
pip install --no-binary :all: nmslib


Build local spark session

In [3]:
os.environ['SPARK_LOCAL_IP'] = '127.0.0.1'
spark = pyspark.sql.SparkSession.builder.master("local[1]").getOrCreate()
spark.sparkContext.setLogLevel('ERROR')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/20 17:38:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/20 17:38:14 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Create Spark DataFrame

In [4]:
kion_dataset = pd.read_csv("./../tests/test_data/kion_data.csv", sep=';')
sdf = spark.createDataFrame(kion_dataset)

In [5]:
kion_dataset.shape

(300000, 5)

In [6]:
sdf.printSchema()

root
 |-- profile_id: long (nullable = true)
 |-- sum_dur: long (nullable = true)
 |-- vod_cnt: long (nullable = true)
 |-- ln_vod_cnt: double (nullable = true)
 |-- bin_col: long (nullable = true)



### Spark hash group split

Unlike pandas data, only the ``"hash'`` method is implemented for spark. \
This method allows to deterministically create groups using the ``salt`` parameter.

Set data and name of column with unique object ids

In [7]:
splitter = Splitter(dataframe=sdf, id_column='profile_id')

Make hash split on 2 groups with specified salt value

In [8]:
sdf_hash_split = splitter.run(groups_size=1000, method='hash', salt='spark322')

23/04/20 17:38:34 WARN TaskSetManager: Stage 0 contains a task of very large size (8236 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

23/04/20 17:38:35 WARN TaskSetManager: Stage 3 contains a task of very large size (8236 KiB). The maximum recommended task size is 1000 KiB.
23/04/20 17:38:36 WARN TaskSetManager: Stage 9 contains a task of very large size (8236 KiB). The maximum recommended task size is 1000 KiB.
23/04/20 17:38:36 WARN TaskSetManager: Stage 12 contains a task of very large size (8236 KiB). The maximum recommended task size is 1000 KiB.


In [10]:
sdf_hash_split.toPandas()

23/04/20 17:38:47 WARN TaskSetManager: Stage 16 contains a task of very large size (8237 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

Unnamed: 0,profile_id,sum_dur,vod_cnt,ln_vod_cnt,bin_col,group
0,559783878399,16243096,26,3.451662,1,A
1,807427182946,55078,3,0.909034,0,A
2,845784297949,31545,1,0.000000,0,A
3,41350284663,1878050,10,2.894374,0,A
4,5082903657,584191,1,0.475820,0,A
...,...,...,...,...,...,...
1995,449871171656,5890763,29,3.699892,1,B
1996,25374705733,3964937,51,4.053246,1,B
1997,368955636652,27693,1,0.000000,0,B
1998,674408525538,7284,1,0.000000,0,B


Now make 5 different groups each of 1000 objects

In [14]:
sdf_hash_split_multi = splitter.run(groups_size=1000,
                                    groups_number=5,
                                    method='hash',
                                    salt='spark322')

In [15]:
hash_split_multi = sdf_hash_split_multi.toPandas()

Five unique groups each of 1000 objects are created

In [20]:
hash_split_multi.group.value_counts()

A    1000
B    1000
C    1000
D    1000
E    1000
Name: group, dtype: int64

Check the distribution of a binary variable

In [16]:
hash_split_multi.groupby('group').agg({"bin_col": "value_counts"}) / 1000

Unnamed: 0_level_0,Unnamed: 1_level_0,bin_col
group,bin_col,Unnamed: 2_level_1
A,0,0.615
A,1,0.385
B,0,0.593
B,1,0.407
C,0,0.598
C,1,0.402
D,0,0.611
D,1,0.389
E,0,0.611
E,1,0.389


And finally, make the split with the same parameters, but with stratification

In [19]:
sdf_strat_hash_split_multi = splitter.run(groups_size=1000,
                                          strat_columns=['bin_col'],
                                          groups_number=5,
                                          method='hash',
                                          salt='spark322')

In [21]:
strat_hash_split_multi = sdf_strat_hash_split_multi.toPandas()

                                                                                

Due to the stratification, the binary value in groups will be distributed as in the source table

In [22]:
strat_hash_split_multi.groupby('group').agg({"bin_col": "value_counts"}) / 1000

Unnamed: 0_level_0,Unnamed: 1_level_0,bin_col
group,bin_col,Unnamed: 2_level_1
A,0,0.609
A,1,0.391
B,0,0.609
B,1,0.391
C,0,0.609
C,1,0.391
D,0,0.609
D,1,0.391
E,0,0.609
E,1,0.391


In [23]:
spark.stop()