### ML PYspark

The first thing to do is to create a .env file in the root of the directory. Add to the file the following two varibles 
ACCESS_KEY, ACCESS_SECRET. 
Check for more detailed explanation here: [dotenv]("https://pypi.org/project/python-dotenv/), he explains how the .env should look like. After that, the variables are add to the os.environ and can be access as a simple dict structure

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum, when, input_file_name
from functools import reduce
import sys
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
import os
from pyspark.ml.classification import RandomForestClassifier
from dotenv import load_dotenv
from sparkmeasure import StageMetrics

In [32]:
import random
## load .env
load_dotenv()


True

## Console Login

The following classes are to handle the spark on the AWS 

In [33]:
from src.s3handler import Sparker, PreProcessing, FeatureEngineering

In [34]:
## Initialize the class
spark = Sparker(os.environ['ACCESS_KEY'],os.environ['ACCESS_SECRET'])

## local session
session = spark._create_local_session()

## Read parquet

In [35]:
parquet_cols = ["xyz","Intensity","Classification","Red","Green","Blue","Infrared","ReturnNumber","NumberOfReturns"]

## Read the parquet and stored it 
df = spark.read_parquet("ubs-datasets",
                    "FRACTAL/data/train/TRAIN-0436_6399-002955400.parquet",
                    read_all=False) \
                    .select(*parquet_cols)

# # Read the list of parquet files
# list_s3 = ["FRACTAL/data/train/TRAIN-1200_6136-008972557.parquet", "FRACTAL/data/train/TRAIN-0436_6399-002955400.parquet"]
# df = spark.read_parquet("ubs-datasets",
#                     list_s3,
#                     read_all=False) \
#                     .select(*parquet_cols)

Reading from: ['s3a://ubs-datasets/FRACTAL/data/train/TRAIN-0436_6399-002955400.parquet']


In [36]:
df.printSchema()
print(f"Number of rows: {df.count()}")

root
 |-- xyz: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- Intensity: integer (nullable = true)
 |-- Classification: short (nullable = true)
 |-- Red: integer (nullable = true)
 |-- Green: integer (nullable = true)
 |-- Blue: integer (nullable = true)
 |-- Infrared: integer (nullable = true)
 |-- ReturnNumber: short (nullable = true)
 |-- NumberOfReturns: short (nullable = true)

Number of rows: 90090


[Stage 1:>                                                          (0 + 1) / 1]

Number of rows: 90090


                                                                                

## Preprocessing & Feature Engineering

In [37]:
df.columns

['xyz',
 'Intensity',
 'Classification',
 'Red',
 'Green',
 'Blue',
 'Infrared',
 'ReturnNumber',
 'NumberOfReturns']

In [38]:
preprocessing = PreProcessing(df)
df = preprocessing.split_xyz()

# ## feature engineering
engfeature = FeatureEngineering(df)
df = engfeature.apply_all()

In [39]:
df.columns

['Intensity',
 'Classification',
 'Red',
 'Green',
 'Blue',
 'Infrared',
 'ReturnNumber',
 'NumberOfReturns',
 'x',
 'y',
 'z',
 'height_above_ground',
 'local_density',
 'local_z_std',
 'local_z_range',
 'roughness',
 'return_ratio',
 'is_single_return',
 'is_last_return',
 'ndvi',
 'green_red_ratio',
 'ndwi']

In [40]:
from pyspark.sql.functions import col, sum as _sum
feature_cols = df.columns
null_counts = df.select([
    _sum(col(c).isNull().cast("int")).alias(c) 
    for c in feature_cols
])
null_counts.show()

[Stage 4:>                                                          (0 + 1) / 1]

+---------+--------------+---+-----+----+--------+------------+---------------+---+---+---+-------------------+-------------+-----------+-------------+---------+------------+----------------+--------------+----+---------------+----+
|Intensity|Classification|Red|Green|Blue|Infrared|ReturnNumber|NumberOfReturns|  x|  y|  z|height_above_ground|local_density|local_z_std|local_z_range|roughness|return_ratio|is_single_return|is_last_return|ndvi|green_red_ratio|ndwi|
+---------+--------------+---+-----+----+--------+------------+---------------+---+---+---+-------------------+-------------+-----------+-------------+---------+------------+----------------+--------------+----+---------------+----+
|        0|             0|  0|    0|   0|       0|           0|              0|  0|  0|  0|                  0|            0|          0|            0|        0|           0|               0|             0|   0|              0|   0|
+---------+--------------+---+-----+----+--------+------------+-----

                                                                                

### Load | Models
Prepare the variable for the models

In [41]:
## Import taskmetrics to see how the model is performing
stagemetrics = StageMetrics(spark.spark)
stagemetrics.begin() 

In [42]:
feature_cols = df.drop("Classification").columns  
assembler = VectorAssembler(inputCols=feature_cols,
                            outputCol="features",
                           # handleInvalid="skip" 
                           ) 

scaler = StandardScaler(inputCol="features", outputCol="scaled_features")


In [43]:
# 3. Define model
rf = RandomForestClassifier(featuresCol="scaled_features", 
                            labelCol="Classification",
                            bootstrap=True, 
                            numTrees=60,
                            maxDepth=10)

### Pipeline

In [44]:
pipeline = Pipeline(stages=[assembler, scaler, rf])

### Train 

In [45]:
model = pipeline.fit(df)

[Stage 83:>                                                         (0 + 1) / 1]

In [46]:
model.transform(df).show(2)

[Stage 86:>                                                         (0 + 1) / 1]

+---------+--------------+-----+-----+-----+--------+------------+---------------+----------+-----------+------+--------------------+-------------+-------------------+-----------------+-------------------+------------+----------------+--------------+-------------------+------------------+------------------+--------------------+--------------------+--------------------+--------------------+----------+
|Intensity|Classification|  Red|Green| Blue|Infrared|ReturnNumber|NumberOfReturns|         x|          y|     z| height_above_ground|local_density|        local_z_std|    local_z_range|          roughness|return_ratio|is_single_return|is_last_return|               ndvi|   green_red_ratio|              ndwi|            features|     scaled_features|       rawPrediction|         probability|prediction|
+---------+--------------+-----+-----+-----+--------+------------+---------------+----------+-----------+------+--------------------+-------------+-------------------+-----------------+-------

                                                                                

## Hyperparameter tunning

In [19]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

## Inference

In [47]:
df_test = spark.read_parquet("ubs-datasets",
                    "FRACTAL/data/train/TRAIN-1200_6136-008972557.parquet",
                    read_all=False) \
                    .select(*parquet_cols)

preprocessing = PreProcessing(df_test)
df_test = preprocessing.split_xyz()
eng_feature = FeatureEngineering(df_test)
df_test = eng_feature.apply_all()

Reading from: ['s3a://ubs-datasets/FRACTAL/data/train/TRAIN-1200_6136-008972557.parquet']


In [48]:
predictions = model.transform(df_test)
model.transform(df_test).show(2)



+--------------+----------+--------------------+
|Classification|Prediction|         Probability|
+--------------+----------+--------------------+
|             4|       5.0|[0.0,5.6102743770...|
|             4|       5.0|[0.0,5.4542493826...|
+--------------+----------+--------------------+
only showing top 2 rows


                                                                                

In [49]:
model.transform(df_test).show(2)



+---------+--------------+-----+-----+-----+--------+------------+---------------+----------+----------+------------------+-------------------+-------------+-----------------+------------------+-------------------+------------+----------------+--------------+-------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|Intensity|Classification|  Red|Green| Blue|Infrared|ReturnNumber|NumberOfReturns|         x|         y|                 z|height_above_ground|local_density|      local_z_std|     local_z_range|          roughness|return_ratio|is_single_return|is_last_return|               ndvi|   green_red_ratio|                ndwi|            features|     scaled_features|       rawPrediction|         probability|prediction|
+---------+--------------+-----+-----+-----+--------+------------+---------------+----------+----------+------------------+-------------------+-------------+-------------

                                                                                

## Evaluation

In [50]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [51]:
evaluator = MulticlassClassificationEvaluator(
    labelCol = 'Classification',
    predictionCol = 'Prediction',
    metricName = 'accuracy'
)

accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy = {accuracy:.2f}")

[Stage 110:>                                                        (0 + 4) / 4]

Test Accuracy: 0.518


                                                                                

In [52]:
stagemetrics.end()
# stagemetrics.print_memory_report()
# stagemetrics.print_report()

In [53]:
print(type(str(stagemetrics.print_memory_report())))


Additional stage-level executor metrics (memory usage info updated at each heartbeat):

Stage 14 JVMHeapMemory maxVal bytes => 446825352 (426.1 MB)
Stage 14 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 16 JVMHeapMemory maxVal bytes => 446825352 (426.1 MB)
Stage 16 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 19 JVMHeapMemory maxVal bytes => 446825352 (426.1 MB)
Stage 19 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 23 JVMHeapMemory maxVal bytes => 446825352 (426.1 MB)
Stage 23 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 24 JVMHeapMemory maxVal bytes => 446825352 (426.1 MB)
Stage 24 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 26 JVMHeapMemory maxVal bytes => 446825352 (426.1 MB)
Stage 26 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 27 JVMHeapMemory maxVal bytes => 446825352 (426.1 MB)
Stage 27 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 29 JVMHeapMemory maxVal bytes => 446825352 (426.1 MB)
Stage 29 OnHeapExe

In [54]:
spark.close()


Spark session stopped.


In [56]:
round(accuracy, 2)

0.52

## Read txt 

aquele que a bea nao gosta

In [67]:
path_train = "/mnt/d/desktop/COPERNICUS/Classes/3-semester/bigdata/list_files/train_files.txt"
path_test = "/mnt/d/desktop/COPERNICUS/Classes/3-semester/bigdata/list_files/test_files.txt"
path_val = "/mnt/d/desktop/COPERNICUS/Classes/3-semester/bigdata/list_files/val_files.txt"
with open(path_train, 'r')  as file:
    lines = file.readlines()
    
with open(path_test, 'r')  as file:
    lines_test = file.readlines()

with open(path_val, 'r')  as file:
    lines_val = file.readlines()    

In [64]:
def retrieve_file_names(lines, percentage = None):
    """
    Match .parquet and return a list. If percentage, then return the sampled list.
    
    Args:
        lines: List containing files name.
    Percentage: float 0-1
        The percentage of the total files
    """
    import re 
    from random import sample
    final_list =[]
    for l in lines:
        split =  l.split()
        if len(split)>=1:
            filename = split[-1]
        match = re.search(r'([A-Z0-9_-]+.parquet)', filename)
        if match:
            final_list.append(filename)
    
    if percentage is not None:
        if percentage >= 1.0:
            raise ValueError("Percentage should be a float value between 0 and 1.")
        total_num = len(final_list)
        perc = int(percentage*total_num)
        return sample(final_list, k=perc)
    
    else: 
        return final_list   


In [78]:
list_train = retrieve_file_names(lines, percentage=0.0001)
list_test = retrieve_file_names(lines_test, percentage=0.0001)
list_val = retrieve_file_names(lines_val, percentage=0.0001)

In [79]:
print(f"Train: {len(list_train)} \n Test : {len(list_test)} \n Val {len(list_val)}")

Train: 8 
 Test : 1 
 Val 1


In [80]:
list_train

['TRAIN-0935_6309-007770476.parquet',
 'TRAIN-1185_6099-009163042.parquet',
 'TRAIN-0736_6275-006984865.parquet',
 'TRAIN-0903_6366-000044086.parquet',
 'TRAIN-0747_6275-006666552.parquet',
 'TRAIN-0970_6345-001579788.parquet',
 'TRAIN-0460_6425-002963500.parquet',
 'TRAIN-1193_6107-008949591.parquet']

In [85]:
[f"train/{file}" for file in list_train]

['train/TRAIN-0935_6309-007770476.parquet',
 'train/TRAIN-1185_6099-009163042.parquet',
 'train/TRAIN-0736_6275-006984865.parquet',
 'train/TRAIN-0903_6366-000044086.parquet',
 'train/TRAIN-0747_6275-006666552.parquet',
 'train/TRAIN-0970_6345-001579788.parquet',
 'train/TRAIN-0460_6425-002963500.parquet',
 'train/TRAIN-1193_6107-008949591.parquet']

In [84]:
f"train/{list_train[0]}"

'train/TRAIN-0935_6309-007770476.parquet'

In [86]:
from random import sample

In [87]:
?sample

[31mSignature:[39m sample(population, k, *, counts=[38;5;28;01mNone[39;00m)
[31mDocstring:[39m
Chooses k unique random elements from a population sequence.

Returns a new list containing elements from the population while
leaving the original population unchanged.  The resulting list is
in selection order so that all sub-slices will also be valid random
samples.  This allows raffle winners (the sample) to be partitioned
into grand prize and second place winners (the subslices).

Members of the population need not be hashable or unique.  If the
population contains repeats, then each occurrence is a possible
selection in the sample.

Repeated elements can be specified one at a time or with the optional
counts parameter.  For example:

    sample(['red', 'blue'], counts=[4, 2], k=5)

is equivalent to:

    sample(['red', 'red', 'red', 'red', 'blue', 'blue'], k=5)

To choose a sample from a range of integers, use range() for the
population argument.  This is especially fast and space