In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

In [3]:
!tar xf /content/spark-3.1.2-bin-hadoop2.7.tgz

In [4]:
#variáveis do ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] =  "/content/spark-3.1.2-bin-hadoop2.7"

In [5]:
!pip install -q findspark

In [6]:
import findspark
findspark.init(os.environ["SPARK_HOME"])

In [7]:
#criando spark session
from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .master('local')
         .appName('pyspark_ETL_practise')
         .getOrCreate()
)

In [8]:
from google.colab import files
arquivo = files.upload()

Saving backtest_results.csv to backtest_results (1).csv
Saving variables_filtered.csv to variables_filtered (1).csv


In [9]:
#Criando dataframe
df = spark.read.csv('backtest_results.csv', header=True, inferSchema=True)

In [10]:
df.printSchema()
type(df)

root
 |-- _c0: integer (nullable = true)
 |-- Saldo Líquido: double (nullable = true)
 |-- Número de entradas: double (nullable = true)
 |-- Taxa de acerto: double (nullable = true)
 |-- Payoff: double (nullable = true)
 |-- Média de lucro por operação: double (nullable = true)
 |-- Desvio padrão: double (nullable = true)



pyspark.sql.dataframe.DataFrame

In [11]:
#Renomeando as colunas
df = (df.withColumnRenamed("_c0","index")
      .withColumnRenamed("Saldo Líquido","saldo_liquido")
      .withColumnRenamed("Número de entradas","numero_de_entradas")
      .withColumnRenamed("Taxa de acerto","taxa_de_acerto")
      .withColumnRenamed("Média de lucro por operação","media_lucro_operacao")
      .withColumnRenamed("Desvio padrão","desvio_padrao")
)
df.printSchema()

root
 |-- index: integer (nullable = true)
 |-- saldo_liquido: double (nullable = true)
 |-- numero_de_entradas: double (nullable = true)
 |-- taxa_de_acerto: double (nullable = true)
 |-- Payoff: double (nullable = true)
 |-- media_lucro_operacao: double (nullable = true)
 |-- desvio_padrao: double (nullable = true)



In [12]:
df.show()

+-----+-------------+------------------+--------------+------+--------------------+-------------+
|index|saldo_liquido|numero_de_entradas|taxa_de_acerto|Payoff|media_lucro_operacao|desvio_padrao|
+-----+-------------+------------------+--------------+------+--------------------+-------------+
|    0|       834.08|             162.0|          0.59|  1.32|                5.15|        37.56|
|    1|      1654.08|             162.0|          0.68|  1.76|               10.21|        40.82|
|    2|      1499.08|             162.0|          0.68|  1.65|                9.25|         44.6|
|    3|       739.08|             162.0|          0.48|  1.23|                4.56|        50.62|
|    4|      1469.08|             162.0|          0.56|  1.51|                9.07|        55.56|
|    5|      1319.08|             162.0|          0.56|  1.43|                8.14|        58.38|
|    6|       264.08|             162.0|          0.47|  1.08|                1.63|        48.74|
|    7|       819.08

In [13]:
#Exemplo do uso select
df_results = df.select('saldo_liquido', 'taxa_de_acerto', 'media_lucro_operacao', 'numero_de_entradas')
df_results.show(5)

+-------------+--------------+--------------------+------------------+
|saldo_liquido|taxa_de_acerto|media_lucro_operacao|numero_de_entradas|
+-------------+--------------+--------------------+------------------+
|       834.08|          0.59|                5.15|             162.0|
|      1654.08|          0.68|               10.21|             162.0|
|      1499.08|          0.68|                9.25|             162.0|
|       739.08|          0.48|                4.56|             162.0|
|      1469.08|          0.56|                9.07|             162.0|
+-------------+--------------+--------------------+------------------+
only showing top 5 rows



In [14]:
#Mudando o tipo da coluna 'Número de entradas' de 'double' para 'int'
df_results = df_results.withColumn('numero_entradas', df_results['numero_de_entradas'].cast('int')).drop('numero_de_entradas')
df_results.show(5)


+-------------+--------------+--------------------+---------------+
|saldo_liquido|taxa_de_acerto|media_lucro_operacao|numero_entradas|
+-------------+--------------+--------------------+---------------+
|       834.08|          0.59|                5.15|            162|
|      1654.08|          0.68|               10.21|            162|
|      1499.08|          0.68|                9.25|            162|
|       739.08|          0.48|                4.56|            162|
|      1469.08|          0.56|                9.07|            162|
+-------------+--------------+--------------------+---------------+
only showing top 5 rows



In [15]:
#filtrando número de entradas abaixo de 150 e saldo líquido superior a 1000
df_results.filter((df_results.numero_entradas < 150) & (df_results.saldo_liquido > 1000)).show()

+-------------+--------------+--------------------+---------------+
|saldo_liquido|taxa_de_acerto|media_lucro_operacao|numero_entradas|
+-------------+--------------+--------------------+---------------+
|       1584.2|          0.73|               12.19|            130|
|       1454.2|          0.73|               11.19|            130|
|      1411.54|           0.7|               10.78|            131|
|      1291.54|           0.7|                9.86|            131|
|      1233.88|          0.68|                9.35|            132|
|      1113.88|          0.68|                8.44|            132|
|       1909.2|          0.78|               14.69|            130|
|       1864.2|          0.79|               14.34|            130|
|       1379.2|          0.55|               10.61|            130|
|       1434.2|          0.56|               11.03|            130|
|      1666.54|          0.76|               12.72|            131|
|      1686.54|          0.77|               12.

In [16]:
#Usando função de agregação para encontrar número mínimo de entradas e o valor máximo de saldo líquido
from pyspark.sql.functions import min, max, count, col, asc, desc

df_results.agg(min('numero_entradas').alias('min_numero_entradas')).show()

print("Itens com número de entradas igual a 12")
df_results.filter(df_results.numero_entradas == 12).show(5)

print("count por número de entradas")
df_results.groupBy('numero_entradas').count().show(5)

+-------------------+
|min_numero_entradas|
+-------------------+
|                 12|
+-------------------+

Itens com número de entradas igual a 12
+-------------+--------------+--------------------+---------------+
|saldo_liquido|taxa_de_acerto|media_lucro_operacao|numero_entradas|
+-------------+--------------+--------------------+---------------+
|      -246.92|          0.25|              -20.58|             12|
|      -601.92|          0.33|              -50.16|             12|
|      -936.92|          0.33|              -78.08|             12|
|      -166.92|          0.25|              -13.91|             12|
|      -521.92|          0.33|              -43.49|             12|
+-------------+--------------+--------------------+---------------+
only showing top 5 rows

count por número de entradas
+---------------+-----+
|numero_entradas|count|
+---------------+-----+
|            137|   54|
|            126|   54|
|             28|   54|
|            192|  243|
|             1

In [17]:
#Usando função de agregação para encontrar valor máximo de saldo líquido
df_results.agg(max('saldo_liquido').alias('max_saldo_liquido')).show()

print("Maior saldo líquido:")
df_results.filter(df_results.saldo_liquido ==  2614.3).show()

print("Top 5 saldo líquido:")
df_results.orderBy(col('saldo_liquido').desc()).show(5)

+-----------------+
|max_saldo_liquido|
+-----------------+
|           2614.3|
+-----------------+

Maior saldo líquido:
+-------------+--------------+--------------------+---------------+
|saldo_liquido|taxa_de_acerto|media_lucro_operacao|numero_entradas|
+-------------+--------------+--------------------+---------------+
|       2614.3|          0.62|               18.03|            145|
+-------------+--------------+--------------------+---------------+

Top 5 saldo líquido:
+-------------+--------------+--------------------+---------------+
|saldo_liquido|taxa_de_acerto|media_lucro_operacao|numero_entradas|
+-------------+--------------+--------------------+---------------+
|       2614.3|          0.62|               18.03|            145|
|      2495.58|          0.61|               18.22|            137|
|      2475.58|          0.61|               18.07|            137|
|      2466.96|          0.62|               17.13|            144|
|      2440.26|          0.61|          

In [18]:
df_results.createOrReplaceTempView('results')
spark.sql('SELECT saldo_liquido FROM results ORDER BY saldo_liquido desc').show(5)

+-------------+
|saldo_liquido|
+-------------+
|       2614.3|
|      2495.58|
|      2475.58|
|      2466.96|
|      2440.26|
+-------------+
only showing top 5 rows



In [19]:
df_variables_filtered = spark.read.csv('variables_filtered.csv', header=True, inferSchema=True)
df_variables_filtered.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- df_index: integer (nullable = true)
 |-- stop_gain_distance: integer (nullable = true)
 |-- stop_loss_distance: integer (nullable = true)
 |-- RSI_enable: integer (nullable = true)
 |-- RSI_period: integer (nullable = true)
 |-- RSI_value: integer (nullable = true)
 |-- operation_time: integer (nullable = true)
 |-- n_bars_validation: integer (nullable = true)
 |-- extra_range_entry: integer (nullable = true)



In [20]:
df_variables_filtered = df_variables_filtered.drop('_c0')
df_variables_filtered.show(5)

+--------+------------------+------------------+----------+----------+---------+--------------+-----------------+-----------------+
|df_index|stop_gain_distance|stop_loss_distance|RSI_enable|RSI_period|RSI_value|operation_time|n_bars_validation|extra_range_entry|
+--------+------------------+------------------+----------+----------+---------+--------------+-----------------+-----------------+
|       0|                 4|                 4|         1|         9|       30|            10|                5|                0|
|       1|                 4|                10|         1|         9|       30|            10|                5|                0|
|       2|                 4|                16|         1|         9|       30|            10|                5|                0|
|       3|                10|                 4|         1|         9|       30|            10|                5|                0|
|       4|                10|                10|         1|         9|      

In [21]:
#Inner Join para selecionar apenas os itens filtrados do segundo dataset
df_results_filtered = (df.join(df_variables_filtered, df.index == df_variables_filtered.df_index, 'inner')
                .select(df.saldo_liquido, df.Payoff, df_variables_filtered.RSI_enable))
df_results_filtered.show()

+-------------+------+----------+
|saldo_liquido|Payoff|RSI_enable|
+-------------+------+----------+
|       834.08|  1.32|         1|
|      1654.08|  1.76|         1|
|      1499.08|  1.65|         1|
|       739.08|  1.23|         1|
|      1469.08|  1.51|         1|
|      1319.08|  1.43|         1|
|       264.08|  1.08|         1|
|       819.08|  1.27|         1|
|       669.08|  1.21|         1|
|       754.08|  1.29|         0|
|      1509.08|  1.66|         0|
|      1494.08|  1.64|         0|
|       624.08|   1.2|         0|
|      1174.08|  1.39|         0|
|      1124.08|  1.36|         0|
|       114.08|  1.04|         0|
|       519.08|  1.17|         0|
|       469.08|  1.15|         0|
|       834.08|  1.32|         1|
|      1654.08|  1.76|         1|
+-------------+------+----------+
only showing top 20 rows



In [22]:
df_results_filtered.createOrReplaceTempView('results_filtered_table')
spark.sql('SELECT count(*) as filtered_rows_count FROM results_filtered_table').show()

+-------------------+
|filtered_rows_count|
+-------------------+
|               1620|
+-------------------+



In [23]:
spark.sql('SELECT * FROM results_filtered_table ORDER BY Payoff desc').show()

+-------------+------+----------+
|saldo_liquido|Payoff|RSI_enable|
+-------------+------+----------+
|       1909.2|  2.05|         1|
|       1954.2|  2.03|         1|
|       1939.2|  2.03|         1|
|      1771.44|  2.01|         1|
|       1864.2|  1.98|         1|
|      1753.78|  1.96|         1|
|      1821.44|  1.96|         1|
|      1813.78|  1.94|         1|
|      1801.44|  1.94|         1|
|      1793.78|  1.93|         1|
|       1584.2|  1.92|         1|
|      1846.54|   1.9|         1|
|      2154.08|  1.87|         1|
|      1981.96|  1.87|         1|
|      2154.08|  1.87|         1|
|      1768.88|  1.85|         1|
|      2169.08|  1.85|         1|
|      2169.08|  1.85|         0|
|      2169.08|  1.85|         1|
|      2169.08|  1.85|         1|
+-------------+------+----------+
only showing top 20 rows

