In [1]:
# Bee Hive data https://drive.google.com/file/d/142IBcs6OyQiJxO7owPfkEBFbkrudnh0g/view?usp=sharing

In [2]:
APP = 'BeeHive'

In [3]:
# Install a pip package in the current Jupyter kernel
!{sys.executable} -m pip install -e '../../../Wielder/'
!{sys.executable} -m pip install -e '../'

zsh:1: parse error near `-m'
zsh:1: parse error near `-m'


In [4]:
from pyspark.sql import SparkSession,Row
from pyspark.sql.types import StructType, IntegerType, StringType
from pyspark.sql.functions import split, row_number, udf, col, min, when
from pyspark.sql.window import Window
from pyspark.shell import sqlContext

from pep_data.project import quick_conf
from pep_data.spark.util import field_to_struct

import random

22/11/14 12:57:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.0.3
      /_/

Using Python version 3.10.6 (main, Sep 29 2022 09:14:01)
SparkSession available as 'spark'.


In [5]:
# Create Spark session
spark = SparkSession.builder.appName(APP).getOrCreate()

In [6]:
# Get app configuration from project.conf file
conf = quick_conf()



In [7]:
# Create schema for the data
cols_name = conf[APP]['cols_name']
cols_double = conf[APP]['cols_double']
cols_integer = conf[APP]['cols_integer']

# Create all the fields
fields = [field_to_struct(header, doubles=cols_double, integers=cols_integer) for header in cols_name]

# Create the schema from th e fields
schema = StructType(fields)

In [8]:
# Read the data from the csv using the schema
data_path = conf[APP]['data_path']
df = spark.read.schema(schema).csv(data_path)

#df.show()

In [9]:
# Remove unnecessary columns(the columns with the word remove in them)
cols_to_keep = [x for x in df.columns if 'remove' not in x]
df = df.select(*cols_to_keep)

#df.show()

In [10]:
# Split Bee_ID column to Cycle(Bee value) and Cycle ID(ID value) columns
df_cleaned = df.withColumn('Cycle', split(col('Bee ID'), '_')\
                           .getItem(0))\
               .withColumn('Cycle ID', split(col('Bee ID'), '_')\
                           .getItem(1))

# Change the type of value in Cycle column from string to integer
df_cleaned= df_cleaned.withColumn("Cycle",col("Cycle")\
                                  .cast(IntegerType()))
#df_cleaned.show()

In [11]:
# Sort the data frame by Cycle column and add row number for each row(new column with the name Continuous ID)
w = Window().orderBy('Cycle')
df_cleaned = df_cleaned.withColumn('Continuous ID', row_number()\
                                   .over(w))

# df_cleaned.show()

In [12]:
# Create a dictionary with key = cycle , value = minimum value of Continuous ID of cycle(key)
continuous_min_id_per_cycle = {key : value for key, value  in df_cleaned.groupBy('Cycle').min('Continuous ID').collect()}

# continuous_min_id_per_cycle

22/11/14 12:58:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

In [13]:
# Create sorted list of all distinct values of Cycle column
cycles = sorted([i[0] for i in df_cleaned.select('Cycle').distinct().collect()])

# cycles

                                                                                

In [14]:
# Return parent continuous id according to cycle and n
def assert_parent_bee_id(cycle):
    n = 3

    # Get index  of cycle in cycles list
    cycle_index = cycles.index(cycle)

    # Return parent bee id if cycle is 0
    if  not cycle_index :
        return None

    min_cycle_index = 0

    # Update min_cycle_index according to cycle_index and n
    if cycle_index > n:
        min_cycle_index = cycle_index - n

    # Calculate the minimum value for random parent continuous id
    min_rand_value = continuous_min_id_per_cycle[cycles[min_cycle_index]]

    # Calculate the maximum value for random parent continuous id
    max_rand_vale = continuous_min_id_per_cycle[cycles[cycle_index]] - 1

    # Get random value of parent_continuous_id (from min_rand_value to max_rand_vale)
    parent_continuous_id = random.randint(min_rand_value, max_rand_vale)

    return parent_continuous_id

In [15]:
# TODO group by some columns to create partitions

# Convert assert_parent_bee_id(cycle) to user defined function
assert_parent_bee_id_udf = udf(lambda z: assert_parent_bee_id(z))

# Create new column Parent Continuous ID using the assert_parent_bee_id_udf function and Cycle column
# cache() caches the specified data frame in the memory of your cluster's workers
# If executing multiple actions on the same data frame then cache it
df_cleaned = df_cleaned.withColumn("Parent Continuous ID", assert_parent_bee_id_udf(col('Cycle')))\
                        .cache()

# df_cleaned.show()

22/11/14 12:58:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [16]:
# Create a DataFrame of parent bees
df_parent_bees = df_cleaned.select('Bee ID', 'Continuous ID')
df_parent_bees = df_parent_bees.withColumnRenamed("Bee ID","Parent Bee ID")
df_parent_bees = df_parent_bees.withColumnRenamed("Continuous ID","Temp Continuous ID")
df_parent_bees= df_parent_bees.withColumn("Temp Continuous ID",col("Temp Continuous ID")\
                                  .cast(StringType()))

# df_parent_bees.show()

In [17]:
# Add the parent bee id to each bee id with the use of join
df_beeId_parent_beeId= df_cleaned.join(df_parent_bees, df_cleaned['Parent Continuous ID'] == df_parent_bees['Temp Continuous ID'],'left').drop('Temp Continuous ID')
# df_beeId_parent_beeId.show()

In [18]:
# Split Bee_ID column to Cycle(Bee value) and Cycle ID(ID value) columns
df_beeId_parent_beeId = df_beeId_parent_beeId.withColumn('Parent Cycle', split(col('Parent Bee ID'), '_')\
                           .getItem(0))


# Change the type of value in Cycle column from string to integer
df_beeId_parent_beeId= df_beeId_parent_beeId.withColumn("Parent Cycle",col("Parent Cycle")\
                                  .cast(IntegerType()))
# df_beeId_parent_beeId.show()

In [19]:
# Add oldest Ancestor column
df_beeId_parent_beeId = df_beeId_parent_beeId.withColumn('Oldest Ancestor',
                                                         when(col('Cycle')==0,None).
                                                         otherwise(''))
# df_beeId_parent_beeId.toPandas()

In [20]:
# Add Ancestors column
df_beeId_parent_beeId = df_beeId_parent_beeId.withColumn('Ancestors',
                                                         when(col('Cycle')==0,None).
                                                         otherwise(''))
# df_beeId_parent_beeId.toPandas()

In [21]:
# Assert the values of Ancestors and old Oldest Ancestor colums for the children of cycle 0
def assert_ancestors_and_oldestancestors_columns_children(row, list_cycle_0):
    row_dict = row.asDict()

    for bee in list_cycle_0:
        if row_dict['Parent Continuous ID'] == str(bee['Continuous ID']):
            row_dict['Oldest Ancestor'] = bee['Bee ID']+'-'+str(bee['Continuous ID'])
            row_dict['Ancestors'] = bee['Bee ID']+'-'+str(bee['Continuous ID'])
            break

    newrow = Row(**row_dict)

    return newrow

In [26]:
# Data frame with additional columns with only cycle 0
df_enriched = df_beeId_parent_beeId.filter(col('Cycle')==0)
# Data frame with only cycle 0 children
df_parent_cycle = df_beeId_parent_beeId.filter(col('Parent Cycle')==0)

list_cycle_0 = df_enriched.collect()
# Convert the data frame to rdd and assert the values of Oldest Ancestor and Ancestors columns
df_parent_cycle_rdd = df_parent_cycle.rdd.map(lambda row: assert_ancestors_and_oldestancestors_columns_children(row,list_cycle_0)).cache()
df_parent_cycle = sqlContext.createDataFrame(df_parent_cycle_rdd,schema= df_enriched.schema)
# Union the initial enriched data frame with the result
df_enriched = df_enriched.union(df_parent_cycle)
df_enriched.toPandas()

Unnamed: 0,Bee ID,DaughtersEfficiencyScore,Father SIZE,Father TYPE,X,Y,Z,Cycle,Cycle ID,Continuous ID,Parent Continuous ID,Parent Bee ID,Parent Cycle,Oldest Ancestor,Ancestors
0,0_0,-0.164749,5,107027,27.961115,25.540086,24.798587,0,0,1,,,,,
1,0_9,-0.091118,8,35473,3.616177,15.399937,14.111507,0,9,2,,,,,
2,0_16,-0.278374,9,72732,6.487132,27.961115,23.504056,0,16,3,,,,,
3,0_76,0.019883,6,49069,8.285177,21.881114,10.805612,0,76,4,,,,,
4,0_35,-0.075842,5,187,-1.000000,13.000000,17.000000,0,35,5,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2505,3_2219,-0.311176,15,1864,8.621328,29.868862,18.359813,3,2219,4403,36,0_23,0.0,0_23-36,0_23-36
2506,3_1536,-0.337628,14,174,4.594897,24.424269,25.795047,3,1536,4406,80,0_2363,0.0,0_2363-80,0_2363-80
2507,3_1073,-0.201212,10,21859,5.184537,17.983514,13.244170,3,1073,4440,232,0_2075,0.0,0_2075-232,0_2075-232
2508,3_2072,-1.019298,5,29237,4.947000,30.187871,20.174963,3,2072,4442,238,0_2139,0.0,0_2139-238,0_2139-238


In [27]:
# Assert the values of Ancestors and old Oldest Ancestor colums for the children of cycle n (all cycles except cycle 0)
def assert_ancestors_and_oldestancestors_columns_grandchildren(row, list_n_cycles):

    row_dict = row.asDict()

    for bee in list_n_cycles:
        if row_dict['Parent Continuous ID'] == str(bee['Continuous ID']):
            row_dict['Oldest Ancestor'] = bee['Oldest Ancestor']
            row_dict['Ancestors'] = bee['Ancestors'] + '->' +row_dict['Parent Bee ID']+'-'+str(row_dict['Parent Continuous ID'])
            break

    newrow = Row(**row_dict)

    return newrow

In [28]:
# For all the cycle except the first, get df of children pf cycle c, get list of its parents
# convert df to rdd and use the assert_ancestors_and_oldestancestors_columns_grandchildren to assert
# values to the new columns, union the result with the existing enriched df
n=3
for cycle in cycles[1:-1:]:
    df_cycle = df_beeId_parent_beeId.filter(col('Parent Cycle')==cycle)
    # list_n_cycles = df_enriched.select('Cycle', 'Continuous ID', 'Oldest Ancestor','Ancestors' )\
    #                             .filter(col('Cycle')==cycle).collect()
    list_n_cycles = df_enriched.select('Cycle', 'Continuous ID', 'Oldest Ancestor','Ancestors' )\
                                .filter((col('Parent Cycle')<cycle) & (col('Parent Cycle')>=cycle-n)).collect()

    # Convert the data frame to rdd and assert the values of Oldest Ancestor and Ancestors columns
    df_cycle_rdd = df_cycle.rdd.map(lambda row: assert_ancestors_and_oldestancestors_columns_grandchildren(row,list_n_cycles)).cache()
    df_cycle = sqlContext.createDataFrame(df_cycle_rdd,schema= df_enriched.schema)
    # Union the enriched data frame with the result
    df_enriched = df_enriched.union(df_cycle)
    break

In [29]:
# Save data frame to csv file
file_name = '2.csv'
save_path = conf[APP]['data_save']
df_enriched.coalesce(1).write.option('header',True).\
csv(save_path+file_name)

                                                                                