# Env

In [9]:
from pyspark.sql import SparkSession
import sys, os
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..', '..')))
from src.processing.process_data import (
    format_date, format_bdi, format_spec,
    to_numeric, remove_duplicates, remove_outliers,
    remove_nan, norm_numeric, norm_cat
)
from sklearn.preprocessing import StandardScaler
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, to_date, regexp_replace
from pyspark.ml.feature import (
    StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
)
from pyspark.ml import Pipeline

# Dataframe

In [7]:
spark = SparkSession\
    .builder\
    .master('local[*]')\
    .appName('load')\
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/09 15:27:14 WARN Utils: Your hostname, Giordano, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/11/09 15:27:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/09 15:27:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark\
    .read\
    .parquet('../../datalake/Bovespa')\
    .filter('year >= 1994 AND month >= 7')

                                                                                

# Analysis

In [None]:
numeric_columns = [
    'Opening Price', 'Max. Price', 'Min. Price',
    'Mean Price', 'Last Trade Price', 'Best Purshase Order Price',
    'Best Purshase Sale Price', 'Numbor Of Trades', 'Number Of Traded Stocks',
    'Volume Of Traded Stocks'
]

df = remove_duplicates(df)
df = to_numeric(df, numeric_columns)
df = remove_outliers(df, numeric_columns)
df = format_date(df)
df = format_bdi(df)
df = format_spec(df)
df.show(5)

25/11/09 14:23:48 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
ERROR:root:KeyboardInterrupt while sending command.                             
Traceback (most recent call last):
  File "/home/giordano/projects/tde-analytics-b3/.venv/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/giordano/projects/tde-analytics-b3/.venv/lib/python3.11/site-packages/py4j/clientserver.py", line 535, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/giordano/.pyenv/versions/3.11.13/lib/python3.11/socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt
[Stage 42:>                                 

KeyboardInterrupt: 

In [7]:
df\
    .coalesce(1)\
    .write\
    .csv(
        '../../datalake/serving/bovespa_plano_real.csv',
        mode='overwrite',
        header=True,
        sep=';'
    )

                                                                                

# Machine Learning

In [12]:
folder = '../../datalake/serving/bovespa_plano_real.csv'
file_name = [f for f in os.listdir(folder) if f.endswith('.csv')][0]
df = spark.read.csv(
        '../../datalake/serving/bovespa_plano_real.csv',
        header=True,
        sep=';'
    )
df = df.toDF(*[c.replace(".", "").replace(" ", "_") for c in df.columns])

numeric_columns = [
    'Opening_Price', 'Max_Price', 'Min_Price',
    'Mean_Price', 'Last_Trade_Price', 'Best_Purshase_Order_Price',
    'Best_Purshase_Sale_Price', 'Numbor_Of_Trades', 'Number_Of_Traded_Stocks',
    'Volume_Of_Traded_Stocks'
]
cat_cols = [
    "Currency", "Market_Type", "Trade_Name",
    "Specification_Name", "BDI_Code_Name"
]
for c in numeric_columns:
    df = df.withColumn(c, regexp_replace(col(c), ",", ".").cast("double"))
indexers = [
    StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep") 
    for c in cat_cols
]
encoders = [
    OneHotEncoder(inputCol=f"{c}_idx", outputCol=f"{c}_vec") 
    for c in cat_cols
]

feature_cols = [
    *numeric_columns,
    *[f"{c}_vec" for c in cat_cols]
]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features_raw"
)

scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withStd=True,
    withMean=False
)

pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler])
model_prep = pipeline.fit(df)
df_prep = model_prep.transform(df)

df_prep.show()

                                                                                

+-------------+------------+--------+----------------+-----------+-----------+-------------+---------------------------+--------+-------------+---------+---------+----------+----------------+-------------------------+------------------------+----------------+-----------------------+-----------------------+-------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------+--------------------------+-------------------------------------------------------------------------+-------------------+-------------------+----+-----+-------------------+--------------------+--------------------+------------+---------------+--------------+----------------------+-----------------+-------------+---------------+------------------+----------------------+-----------------+--------------------+--------------------+
|Register_Type|Trading_Date|BDI_Code|Negociation_Code|Market_Type| Trade_Name|Specification|Forw

In [15]:
train_df, test_df = df_prep.randomSplit([0.8, 0.2], seed=42)

train_df.write.mode("overwrite").parquet("../../datalake/serving/train_bovespa_plano_real.parquet")
test_df.write.mode("overwrite").parquet("../../datalake/serving/test_bovespa_plano_real.parquet")

25/11/09 15:42:50 WARN DAGScheduler: Broadcasting large task binary with size 1058.7 KiB
25/11/09 15:43:05 WARN DAGScheduler: Broadcasting large task binary with size 1058.7 KiB
                                                                                