In [2]:
import os
import io
import pyspark
import numpy as np
import matplotlib.pyplot as plt
import pyspark.sql.functions as F

from pyspark.sql import SparkSession, Window
from pyspark.conf import SparkConf
# from pyspark.context import SparkContext
from pyspark.sql.types import StringType, ArrayType, StructField, StructType, FloatType, DoubleType, IntegerType

from concurrent.futures import ThreadPoolExecutor

%load_ext autoreload
%autoreload 2

In [2]:
# `sparksession is none: typeerror: 'javapackage' object is not 
# callable` can be raised if the pyspark version being used is 4.0.0
# which is not compatible to a python 3.11.8 version

# if we have 24 gb of installed ram and 23 gb usable and have
# 8 cores in our CPU we can utilize this memory to partition 
# it across all 8 cores of our cpu for concurrent processing 
# in spark. We need to also take note of background processes 
# in our task manager taking up our memory so if need be we 
# have to end these background processes that take up too much 
# memory in order to free up space for our spark driver memory 
# and spark executor memory

# lets say we have 8 cores per node/CPU and currently 23gb usable ram
# we can partition this 23gb ram across all 8 cores of the CPU
# since there are other background processes we can reserve 1 core
# for this as well as 1gb of ram, and so we will have only 7 cores 
# available and 22gb of ram

# executors utilize cores the ff. are different kinds of executor
# sizes
# executor 1: [<core 1>, <core 2>, ..., <core 7>] where 22gb of ram is
# spread out across the executors. Since there is only a single executor 
# here executor will only have 22gb of memory and then this memory will 
# be divided into its individual cores which currently have 7 so 22 / 7 
# is 3gb of memory per core

# the main idea is we can have any number of executors so long as there
# are any number of cores but we cannot have any number of cores for
# any number of executors. If there are 7 cores we can have at most
# 7 executors and divide the ram across these executors and their cores
# themselves. 

# executor 1: [<core 1>]
# executor 2: [<core 2>]
# executor 3: [<core 3>]
# executor 4: [<core 4>]
# executor 5: [<core 5>]
# executor 6: [<core 6>]
# executor 1: [<core 7>]
# where the 22fb of ram we have is spread out across these executors
# if we have 7 executors we will have 22 / 7 or 3gb. Therefore 3gb will
# be the memory of each executor with one actually 4gb as 3gb + 3gb + 
# 3gb + 3gb + 3gb + 3gb + 4gb = 22gb. And 

# again our starting memory and cores is 24gb and 8 cores
# we will minus 1gb and 1core for yarn/hadoop processes
# making it 23gb and 7 cores. Yarn application master
# can take either 1gb of ram or 1 core therefore it may make
# our total memory and cores 22gb and 7 cores or 23gb and 6 cores;
# say we picked the former. Now we choose our number of executors
# which can be the mid range of our number of cores. Say we want
# 3 executors then each executor will have 22gb / 3 or 7gb, 7gb, 
# 8gb respectively for all 3 executors.
# executor 1 (7gb): []
# executor 2 (7gb): []
# executor 3 (8gb): []
# we also take into consideration memory overhead for each
# executor which is `memory per executor` - `max(384mb, 10 % of spark.executor.memory)`
# after calculation our executor memories will now have the ff.
# 7000mb - max(384mb, 10% of 7gb is 0.7gb or 700mb) = 6300mb or 6.3gb
# 7000mb - max(384mb, 10% of 7gb is 0.7gb or 700mb) = 6300mb or 6.3gb
# 8000mb - max(384mb, 10% of 8gb is 0.8gb or 800mb) = 7200mb or 7.2gb
# executor 1 (6.3gb): []
# executor 2 (6.3gb): []
# executor 3 (7.2gb): []
# since we havee 7 cores we can divide these cores across all these
# executors. if distributed evenly each executor will have 2, 2, and 3
# cores respectively


# driver memory default is 1g
# executor memory default is 1g
# executor cores default is 1
# sql execution arrow maxRecordsPerBatch default 10000 
# maximum number of records that can be written to a single ArrowRecordBatch in memory

# spark = SparkSession.builder.appName("app")\
    # .config("spark.driver.memory", "16g")\
    # .config("spark.executor.memory", "4g")\
    # .config("spark.executor.cores", "2")\
    # .config("spark.executor.instances", "3")\
    # .config("spark.sql.execution.arrow.maxRecordsPerBatch", "100")\
    # .getOrCreate()

spark = SparkSession.builder.appName("app")\
    .getOrCreate()

In [None]:
# # cloud
# BRONZE_FOLDER_NAME = "sgppipelinesa-bronze"
# URL = "abfss://{FOLDER_NAME}@sgppipelinesa.dfs.core.windows.net"
# BRONZE_DATA_DIR = os.path.join(URL, "").replace("\\", "/")
# BRONZE_DATA_DIR

# local
BRONZE_FOLDER_NAME = "bronze"
DATA_DIR = "../../include/data"
BRONZE_DATA_DIR = os.path.join("{DATA_DIR}", "{FOLDER_NAME}").replace("\\", "/")
BRONZE_DATA_DIR

'{DATA_DIR}/{FOLDER_NAME}'

In [5]:
# sample_folder = folder_infos[-1].path
# sample_folder

In [6]:
# sample_folder.strip('/').split('/')[-1]

In [7]:
# type(folder_infos[-1])

In [8]:
# dbutils.fs.ls(folder_infos[-1].path)

In [16]:
# # cloud
# file_infos = [file_info.path for file_info in dbutils.fs.ls(BRONZE_DATA_PATH)]
# file_infos

# local
file_infos = [
    os.path.join(BRONZE_DATA_DIR.format(DATA_DIR=DATA_DIR, FOLDER_NAME=BRONZE_FOLDER_NAME), file_info).replace("\\", "/") 
    for file_info in 
    os.listdir(BRONZE_DATA_DIR.format(DATA_DIR=DATA_DIR, FOLDER_NAME=BRONZE_FOLDER_NAME))
]
file_infos

[]

In [10]:
# labels_df = spark.read.format('text')\
#     .option("lineSep", "\n")\
#     .load(os.path.join(BRONZE_DATA_PATH, "1337ad-20170321-ajg", "etc", "README"))
# labels_df.show()

In [11]:
label_paths = [
    os.path.join(file_info, "etc", "README").replace("\\", "/") 
    for file_info in file_infos 
    if os.path.exists(os.path.join(file_info, "etc", "README").replace("\\", "/"))
]

In [12]:
len(label_paths)

0

In [13]:
# cloud
labels_df = spark.read.format("text")\
    .option("lineSep", "\n")\
    .load(label_paths)\
    .select("*", "_metadata.file_path")

# local
# labels_df = spark.read.format("text")\
#     .option("lineSep", "\n")\
#     .load([os.path.join(BRONZE_DATA_PATH, file_info, "etc", "README") for file_info in file_infos])

In [14]:
labels_df.cache()

DataFrame[value: string, file_path: string]

In [15]:
labels_df.show()

+-----+---------+
|value|file_path|
+-----+---------+
+-----+---------+



In [16]:
# labels_df.count()

In [17]:
# labels_df.withColumn("filePath", F.input_file_name()).show()

In [18]:
# labels_df.withColumn("filePath", F.input_file_name()).where(
#     F.lower(F.col("value")).contains("gender")
# ).collect()

In [19]:
# local
# labels_df = labels_df.withColumn("filePath", F.input_file_name())

In [5]:
labels_df = labels_df.where(F.lower(F.col("value")).contains("gender"))

+-----+---------+---------+-----+---------+
|value|file_path|subjectId|rowId|partition|
+-----+---------+---------+-----+---------+
+-----+---------+---------+-----+---------+



# Clean value columns

In [53]:
labels_df = labels_df.withColumn(
    "value", 
    # extract only the gender of the subject in meta data
    F.regexp_replace(
        F.lower(F.col("value")), 
        r"(gender)|[:;\[\]\t\n\s]+", 
        ""
    )
)
# labels_df.show()

In [54]:
labels_df = labels_df.withColumn(
    "value",
    # sometimes the gender may be in a different language
    # e.g. the 'male' in german may have the string start
    # with 'mä' so we should return male if this is the case
    # and vice versa for females translated to a different
    # language 
    F.when(
        F.col("value").startswith("ma") | F.col("value").startswith("mä"),
        "male"
    ).when(
        F.col("value").startswith("fem") | F.col("value").startswith("wei"),
        "female"
    ).otherwise(
        "unknown"
    )
)
# labels_df.show()

# clean filePath column

In [55]:
# labels_df.withColumn(
#     "subjectId",
#     F.element_at(
#         # splits the filepath from 'file:///c:/Users/LARRY/Documents/Scripts/.../bronze/1337ad-20170321-ajg/etc/README
#         # to array of the directory tree of the files path e.g. 
#         # ['file:', ..., 'Scripts', ..., 'bronze', '<subject id>, 'etc', 'readme']
#         # so in order to extract subject id or the file name we have to 
#         # get the 3rd to the last element
#         F.split(
#             F.col("file_path"),
#             r"\/"
#         ),
#         -3
#     )
# ).collect()

In [56]:
labels_df = labels_df.withColumn(
    "subjectId",
    F.element_at(
        # splits the filepath from 'file:///c:/Users/LARRY/Documents/Scripts/.../bronze/1337ad-20170321-ajg/etc/README
        # to array of the directory tree of the files path e.g. 
        # ['file:', ..., 'Scripts', ..., 'bronze', '<subject id>, 'etc', 'readme']
        # so in order to extract subject id or the file name we have to 
        # get the 3rd to the last element
        F.split(
            F.col("file_path"),
            r"\/"
        ),
        -3
    )
)

In [57]:
labels_df.cache()

DataFrame[value: string, file_path: string, subjectId: string, rowId: int, partition: int]

+-----+---------+---------+-----+---------+
|value|file_path|subjectId|rowId|partition|
+-----+---------+---------+-----+---------+
+-----+---------+---------+-----+---------+



In [23]:
male_labels_df = labels_df.where(F.col("value") == "male")
male_labels_df.cache()

DataFrame[value: string, file_path: string, subjectId: string]

In [24]:
female_labels_df = labels_df.where(F.col("value") == "female")
female_labels_df.cache()

DataFrame[value: string, file_path: string, subjectId: string]

In [25]:
train_male_labels_df, val_male_labels_df, test_male_labels_df = male_labels_df.randomSplit(weights=[0.7, 0.15, 0.15], seed=0)

In [26]:
train_female_labels_df, val_female_labels_df, test_female_labels_df = female_labels_df.randomSplit(weights=[0.7, 0.15, 0.15], seed=0)

In [27]:
train_male_labels_df.cache()
val_male_labels_df.cache()
test_male_labels_df.cache()
train_female_labels_df.cache()
val_female_labels_df.cache()
test_female_labels_df.cache()

DataFrame[value: string, file_path: string, subjectId: string]

In [28]:
train_labels_df = train_male_labels_df.unionByName(train_female_labels_df)
train_labels_df.cache()

DataFrame[value: string, file_path: string, subjectId: string]

In [29]:
val_labels_df = val_male_labels_df.unionByName(val_female_labels_df)
val_labels_df.cache()

DataFrame[value: string, file_path: string, subjectId: string]

In [30]:
test_labels_df = test_male_labels_df.unionByName(test_female_labels_df)
test_labels_df.cache()

DataFrame[value: string, file_path: string, subjectId: string]

In [31]:
# create an ID column so that when this is saved
# randomly we can order the dataframe again in the 
# next second stage transformation
id_window = Window.orderBy(F.col("subjectId"))
train_labels_df = train_labels_df.withColumn("rowId", F.row_number().over(id_window) - 1)
val_labels_df = val_labels_df.withColumn("rowId", F.row_number().over(id_window) - 1)
test_labels_df = test_labels_df.withColumn("rowId", F.row_number().over(id_window) - 1)
train_labels_df.cache()
val_labels_df.cache()
test_labels_df.cache()

DataFrame[value: string, file_path: string, subjectId: string, rowId: int]

In [32]:
n_partitions = 10
train_labels_df = train_labels_df.withColumn("partition", F.col("rowId") % n_partitions)
val_labels_df = val_labels_df.withColumn("partition", F.col("rowId") % n_partitions)
test_labels_df = test_labels_df.withColumn("partition", F.col("rowId") % n_partitions)
train_labels_df.cache()
val_labels_df.cache()
test_labels_df.cache()

DataFrame[value: string, file_path: string, subjectId: string, rowId: int, partition: int]

In [33]:
train_labels_df.show()

+------+--------------------+--------------------+-----+---------+
| value|           file_path|           subjectId|rowId|partition|
+------+--------------------+--------------------+-----+---------+
|female|file:/c:/Users/LA...| 1337ad-20170321-ajg|    0|        0|
|female|file:/c:/Users/LA...| 1337ad-20170321-tkg|    1|        1|
|female|file:/c:/Users/LA...| 1337ad-20170321-ynk|    2|        2|
|  male|file:/c:/Users/LA...|23yipikaye-201008...|    3|        3|
|  male|file:/c:/Users/LA...|2old2play-2011060...|    4|        4|
|  male|file:/c:/Users/LA...|2old2play-2011060...|    5|        5|
|  male|file:/c:/Users/LA...|314piwm-20130617-xuo|    6|        6|
|  male|file:/c:/Users/LA...|     AT-20130718-lws|    7|        7|
|  male|file:/c:/Users/LA...|  Aaron-20080318-kdl|    8|        8|
|  male|file:/c:/Users/LA...|  Aaron-20080318-lbb|    9|        9|
|  male|file:/c:/Users/LA...|  Aaron-20080318-lbk|   10|        0|
|  male|file:/c:/Users/LA...|  Aaron-20080318-liy|   11|      

In [34]:
# # cloud
# SILVER_FOLDER_NAME = "sgppipelinesa-silver"
# SUB_FOLDER_NAME = "stage-01"
# SILVER_DATA_DIR = os.path.join(URL.format(FOLDER_NAME=SILVER_FOLDER_NAME), SUB_FOLDER_NAME)
# SILVER_DATA_DIR

# local
SILVER_FOLDER_NAME = "silver"
SUB_FOLDER_NAME = "stage-01"
SILVER_DATA_DIR = os.path.join(DATA_DIR, os.path.join(SILVER_FOLDER_NAME, SUB_FOLDER_NAME))
SILVER_DATA_DIR

'../include/data/silver\\stage-01'

In [None]:
train_labels_df.write\
.option("compression", "snappy")\
.mode("overwrite")\
.partitionBy("partition")\
.parquet(os.path.join(SILVER_DATA_DIR, "train", "labels.parquet"))

In [37]:
val_labels_df.write\
.option("compression", "snappy")\
.mode("overwrite")\
.partitionBy("partition")\
.parquet(os.path.join(SILVER_DATA_DIR, "val", "labels.parquet"))

In [38]:
test_labels_df.write\
.option("compression", "snappy")\
.mode("overwrite")\
.partitionBy("partition")\
.parquet(os.path.join(SILVER_DATA_DIR, "test", "labels.parquet"))

In [39]:
train_labels_df.unpersist()
val_labels_df.unpersist()
test_labels_df.unpersist()

DataFrame[value: string, file_path: string, subjectId: string, rowId: int, partition: int]