<a href="https://colab.research.google.com/github/lucasvx273/Treinos_Gerais_Python_LPS/blob/main/pyspark_train.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Notebook de estudo para a biblioteca Spark

In [None]:
# Instalando o java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
# baixar versão mais recente do spark
!pip install pyspark -q

[K     |████████████████████████████████| 281.4 MB 49 kB/s 
[K     |████████████████████████████████| 199 kB 69.1 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
# baixando versao spark
!wget -q https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz


In [None]:
# Deszipando spark
!tar xf /content/spark-3.3.1-bin-hadoop3.tgz

In [None]:
# Criando variáveis do ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"


In [None]:
# Instalar a lib findspark que ajuda a localizar o Spark no sistema e importar:
!pip install -q findspark

In [None]:
# Importando findspark
import findspark
findspark.init()

In [None]:
# criar Spark session
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master('local')\
        .appName('sparkcolab')\
        .getOrCreate()

In [None]:
# importar um dataset
from google.colab import files
arquivo = files.upload()

Saving output.csv to output.csv


O arquivo que eu selecionei é CSV, as possibilidades de arquivo são diversas, alguns são:

- CSV
- TXT
- PARQUET
- AVRO
- Banco de dados

Agora na tratativa dos dados temos duas formas de trabalhar:

Considerando que estamos em um local aonde temos um Data Lake, como Azure ou AWS... 

- Streaming
> ETL (Spark) Lê os arquivos presentes do Data Lake e ocorre um processamento em tempo real.


- Batch
> Schedule Lê os arquivos presentes do Data Lake que foram programadas

In [None]:
# criar DataFrame

df = spark.read.csv('/content/output.csv', header=False, inferSchema=True, sep=',')

In [None]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



In [None]:
df.show()

+-----+--------------------+
|  _c0|                 _c1|
+-----+--------------------+
|  1OK|[10101472647, 55,...|
|  2OK|[10101472705, 55,...|
|2GALE|['GALE:1010147522...|
|  4OK|[10101483019, 55,...|
|4GALE|['GALE:1010148560...|
|  5OK|[10101490619, 55,...|
|5GALE|['GALE:1010149279...|
|  3OK|[10101480451, 55,...|
|3GALE|['GALE:1010149287...|
|  6OK|[10101495456, 55,...|
|6GALE|['GALE:1010149822...|
+-----+--------------------+



In [None]:
from pyspark.sql.functions import *
import pyspark

split_cols = pyspark.sql.functions.split(df['_c1'], ',')
  
# applying split() using select()
df2 = df.select('_c0',
                split_cols.getItem(0).alias('ID'),
                split_cols.getItem(1).alias('Value'),
                split_cols.getItem(2).alias('Coin'),
                split_cols.getItem(3).alias('Move'),
                split_cols.getItem(4).alias('Candle'),
                split_cols.getItem(5).alias('Result'))
  
# show df3
df2.show()

+-----+--------------------+-----+-------------+-------+------+-------+
|  _c0|                  ID|Value|         Coin|   Move|Candle| Result|
+-----+--------------------+-----+-------------+-------+------+-------+
|  1OK|        [10101472647|   55| 'EURUSD-OTC'| 'call'|     1|  'WIN'|
|  2OK|        [10101472705|   55| 'EURUSD-OTC'|  'put'|     1| 'LOSS'|
|2GALE|['GALE:1010147522...| 82.5| 'EURUSD-OTC'|  'put'|     1| 'LOSS'|
|  4OK|        [10101483019|   55| 'EURUSD-OTC'| 'call'|     1| 'LOSS'|
|4GALE|['GALE:1010148560...| 82.5| 'EURUSD-OTC'| 'call'|     1| 'LOSS'|
|  5OK|        [10101490619|   55| 'EURUSD-OTC'|  'put'|     1| 'LOSS'|
|5GALE|['GALE:1010149279...| 82.5| 'EURUSD-OTC'|  'put'|     1| 'LOSS'|
|  3OK|        [10101480451|   55| 'EURUSD-OTC'|  'put'|     5| 'LOSS'|
|3GALE|['GALE:1010149287...| 82.5| 'EURUSD-OTC'|  'put'|     5|  'WIN'|
|  6OK|        [10101495456|   55| 'EURUSD-OTC'| 'call'|     1| 'LOSS'|
|6GALE|['GALE:1010149822...| 82.5| 'EURUSD-OTC'| 'call'|     1| 

In [None]:
# Mudar tipo da coluna Candle para INTEIRO (INT)
df2 = df2.withColumn('Candle', df2['Candle'].cast('int'))

# Renomeando primeira coluna
df2 = df2.withColumnRenamed('_c0','Numb')

df2.show()

+-----+--------------------+-----+-------------+-------+------+-------+
| Numb|                  ID|Value|         Coin|   Move|Candle| Result|
+-----+--------------------+-----+-------------+-------+------+-------+
|  1OK|        [10101472647|   55| 'EURUSD-OTC'| 'call'|     1|  'WIN'|
|  2OK|        [10101472705|   55| 'EURUSD-OTC'|  'put'|     1| 'LOSS'|
|2GALE|['GALE:1010147522...| 82.5| 'EURUSD-OTC'|  'put'|     1| 'LOSS'|
|  4OK|        [10101483019|   55| 'EURUSD-OTC'| 'call'|     1| 'LOSS'|
|4GALE|['GALE:1010148560...| 82.5| 'EURUSD-OTC'| 'call'|     1| 'LOSS'|
|  5OK|        [10101490619|   55| 'EURUSD-OTC'|  'put'|     1| 'LOSS'|
|5GALE|['GALE:1010149279...| 82.5| 'EURUSD-OTC'|  'put'|     1| 'LOSS'|
|  3OK|        [10101480451|   55| 'EURUSD-OTC'|  'put'|     5| 'LOSS'|
|3GALE|['GALE:1010149287...| 82.5| 'EURUSD-OTC'|  'put'|     5|  'WIN'|
|  6OK|        [10101495456|   55| 'EURUSD-OTC'| 'call'|     1| 'LOSS'|
|6GALE|['GALE:1010149822...| 82.5| 'EURUSD-OTC'| 'call'|     1| 

In [None]:
# filtro

df2.filter(df2.Result.contains('WIN')).show()

+-----+--------------------+-----+-------------+-------+------+------+
| Numb|                  ID|Value|         Coin|   Move|Candle|Result|
+-----+--------------------+-----+-------------+-------+------+------+
|  1OK|        [10101472647|   55| 'EURUSD-OTC'| 'call'|     1| 'WIN'|
|3GALE|['GALE:1010149287...| 82.5| 'EURUSD-OTC'|  'put'|     5| 'WIN'|
|6GALE|['GALE:1010149822...| 82.5| 'EURUSD-OTC'| 'call'|     1| 'WIN'|
+-----+--------------------+-----+-------------+-------+------+------+



In [None]:
# Valor máximo do Value
df2_max = df2.agg(max('Value').alias('Max_Values'))
df2_max.show()

# Valor mínimo do Value
df2_min = df2.agg(min('Value').alias('Max_Values'))
df2_min.show()

+----------+
|Max_Values|
+----------+
|      82.5|
+----------+

+----------+
|Max_Values|
+----------+
|        55|
+----------+



In [None]:
# Contar quantidades de apostas realizadas por Value

df2_n = df2.groupBy('Value').count() 
df2_n.show()

+-----+-----+
|Value|count|
+-----+-----+
| 82.5|    5|
|   55|    6|
+-----+-----+



In [None]:
# Reordenando por Ascendente e Descendente

df2_n.orderBy(col('count').desc()).show()

df2_n.orderBy(col('count').asc()).show()

+-----+-----+
|Value|count|
+-----+-----+
|   55|    6|
| 82.5|    5|
+-----+-----+

+-----+-----+
|Value|count|
+-----+-----+
| 82.5|    5|
|   55|    6|
+-----+-----+

